...

Source file src/golang.conradwood.net/go-easyops/client/dialer.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  import (
     4  	_ "context"
     5  	"flag"
     6  	"fmt"
     7  	"os"
     8  	"strings"
     9  	"time"
    10  
    11  	"golang.conradwood.net/go-easyops/cmdline"
    12  	"golang.conradwood.net/go-easyops/common"
    13  	"golang.conradwood.net/go-easyops/prometheus"
    14  	"golang.conradwood.net/go-easyops/utils"
    15  	"google.golang.org/grpc"
    16  )
    17  
    18  const (
    19  	fancy_balancer_json = `{  "loadBalancingConfig": [ { "fancybalancer": {} } ] }`
    20  	use_fancy_balancer  = true
    21  )
    22  
    23  var (
    24  	def_client          = &easyops_client{}
    25  	known_not_auth_rpcs = []string{
    26  		"registry.Registry.V2GetTarget",
    27  		"auth.AuthenticationService.GetPublicSigningKey",
    28  		"auth.AuthenticationService.SignedGetByToken",
    29  		"registry.Registry.V2RegisterService",
    30  	}
    31  	// I think part of a refactoring, the metrics below
    32  	// should move into a metrics package, together with
    33  	// the server metrics.
    34  	// then we should have a single metric:
    35  	// "grpc_requests_total{direction="sent|received"}
    36  	// cnw
    37  	grpc_client_sent = prometheus.NewCounterVec(
    38  		prometheus.CounterOpts{
    39  			Name: "grpc_requests_sent",
    40  			Help: "V=1 unit=ops total number of grpc requests sent by this instance",
    41  		},
    42  		[]string{"servicename", "method"},
    43  	)
    44  	grpc_client_failed = prometheus.NewCounterVec(
    45  		prometheus.CounterOpts{
    46  			Name: "grpc_requests_sent_failed",
    47  			Help: "V=1 unit=ops total number of grpc requests sent by this instance and failed",
    48  		},
    49  		[]string{"servicename", "method"},
    50  	)
    51  	dialer_debug = flag.Bool("ge_debug_dialer", false, "set to true to debug the grpc dialer")
    52  )
    53  
    54  type easyops_client struct {
    55  }
    56  
    57  func init() {
    58  	prometheus.MustRegister(grpc_client_sent, grpc_client_failed)
    59  	utils.Client_connector = def_client
    60  }
    61  
    62  // opens a tcp connection to an ip (no loadbalancing obviously)
    63  func ConnectWithIPNoBlock(ip string) (*grpc.ClientConn, error) {
    64  	return connectWithIPOptions(ip, false)
    65  }
    66  
    67  // opens a tcp connection to an ip:port (ip syntax matches argument to net.Dial())
    68  func ConnectWithIP(ip string) (*grpc.ClientConn, error) {
    69  	return connectWithIPOptions(ip, true)
    70  }
    71  func connectWithIPOptions(servicename string, block bool) (*grpc.ClientConn, error) {
    72  	if *dialer_debug {
    73  		fmt.Println("[go-easyops] DialService (connectWithIPOptions): Dialling " + servicename + " and blocking until successful connection...")
    74  	}
    75  
    76  	var err error
    77  	var conn *grpc.ClientConn
    78  	if block {
    79  		conn, err = grpc.Dial(
    80  			servicename,
    81  			grpc.WithBlock(),
    82  			grpc.WithTransportCredentials(GetClientCreds()),
    83  			grpc.WithUnaryInterceptor(ClientMetricsUnaryInterceptor),
    84  			grpc.WithStreamInterceptor(unaryStreamInterceptor),
    85  		)
    86  	} else {
    87  		conn, err = grpc.Dial(
    88  			servicename,
    89  			grpc.WithTransportCredentials(GetClientCreds()),
    90  			grpc.WithUnaryInterceptor(ClientMetricsUnaryInterceptor),
    91  			grpc.WithStreamInterceptor(unaryStreamInterceptor),
    92  		)
    93  	}
    94  	if err != nil {
    95  		return nil, err
    96  	}
    97  
    98  	if *dialer_debug {
    99  		fmt.Printf("Connected to %s\n", servicename)
   100  	}
   101  
   102  	return conn, nil
   103  
   104  }
   105  func (ec *easyops_client) Connect(serviceNameOrPath string) *grpc.ClientConn {
   106  	return Connect(serviceNameOrPath)
   107  }
   108  func Connect(serviceNameOrPath string) *grpc.ClientConn {
   109  	return ConnectAt(cmdline.GetClientRegistryAddress(), serviceNameOrPath)
   110  }
   111  
   112  // this initiates a balancer for a service and returns an address list. this is not actually balanced, but the
   113  // fancyaddresslist does maintain the list of active targets.
   114  func ConnectNoBalanceAt(registryadr string, serviceNameOrPath string) (*FancyAddressList, error) {
   115  	_, err := dialService(registryadr, serviceNameOrPath)
   116  	if err != nil {
   117  		return nil, err
   118  	}
   119  
   120  	// this is, of course a bit of a hack. really it should be a channel and so on
   121  	started := time.Now()
   122  	for {
   123  		if time.Since(started) > time.Duration(8)*time.Second {
   124  			return nil, fmt.Errorf("Unable to dial service \"%s\" - timeout after %0.1fs", serviceNameOrPath, time.Since(started).Seconds())
   125  		}
   126  		for _, fal := range GetAllFancyAddressLists() {
   127  			//			fmt.Printf("Looking for \"%s\" - is it \"%s\"?\n", serviceNameOrPath, fal.ServiceName())
   128  			if fal.ServiceName() == serviceNameOrPath {
   129  				return fal, nil
   130  			}
   131  		}
   132  		time.Sleep(time.Duration(750) * time.Millisecond)
   133  	}
   134  }
   135  func ConnectNoBalance(serviceNameOrPath string) (*FancyAddressList, error) {
   136  	return ConnectNoBalanceAt(cmdline.GetClientRegistryAddress(), serviceNameOrPath)
   137  }
   138  
   139  // convenience method to get a loadbalanced connection to a service
   140  // use path or servicename (path prefered, it contains the version)
   141  // unless it successfullly connects it will NOT return
   142  // (it will either terminate the process or loop)
   143  func ConnectAt(registryadr string, serviceNameOrPath string) *grpc.ClientConn {
   144  	common.AddBlockedServiceName(serviceNameOrPath)
   145  	conn, err := dialService(registryadr, serviceNameOrPath)
   146  	// an error in this case reflects a LOCAL error, such as
   147  	// no route to host or out-of-memory.
   148  	// if a service is not available at the time of the call
   149  	// it will block until one becomes available.
   150  	// since it is a local error it is appropriate to exit.
   151  	// a system administrator has to repair the machine before
   152  	// the software can continue.
   153  	if err != nil {
   154  		fmt.Printf("Failed to dial %s: %s\n", serviceNameOrPath, err)
   155  		os.Exit(10)
   156  	}
   157  	if *dialer_debug {
   158  		fmt.Printf("[go-easyops]Connected to %s\n", serviceNameOrPath)
   159  	}
   160  	common.RemoveBlockedServiceName(serviceNameOrPath)
   161  	return conn
   162  }
   163  
   164  // connect to a service which we KNOW requires no authentication and no signature.
   165  // it is public because of implementation details, but should not be used by clients of goeasyops
   166  func ConnectAtNoAuth(registryadr string, serviceNameOrPath string) *grpc.ClientConn {
   167  	common.AddBlockedServiceName(serviceNameOrPath)
   168  	conn, err := dialService_noauth(registryadr, serviceNameOrPath)
   169  	// an error in this case reflects a LOCAL error, such as
   170  	// no route to host or out-of-memory.
   171  	// if a service is not available at the time of the call
   172  	// it will block until one becomes available.
   173  	// since it is a local error it is appropriate to exit.
   174  	// a system administrator has to repair the machine before
   175  	// the software can continue.
   176  	if err != nil {
   177  		fmt.Printf("Failed to dial %s: %s\n", serviceNameOrPath, err)
   178  		os.Exit(10)
   179  	}
   180  	if *dialer_debug {
   181  		fmt.Printf("[go-easyops]Connected to %s\n", serviceNameOrPath)
   182  	}
   183  	common.RemoveBlockedServiceName(serviceNameOrPath)
   184  	return conn
   185  }
   186  
   187  // opens a tcp connection to a path.
   188  func dialService(registry string, serviceName string) (*grpc.ClientConn, error) {
   189  	GetSignatureFromAuth() // this is triggered here, because we _must_ have a valid signature later. if it has been called earlier it is a noop
   190  	return dialService_noauth(registry, serviceName)
   191  }
   192  
   193  // dial a service that does not require authententication (no signature required)
   194  func dialService_noauth(registry string, serviceName string) (*grpc.ClientConn, error) {
   195  	if *dialer_debug {
   196  		fmt.Println("[go-easyops] DialService: Dialling with dialService() " + serviceName + " and blocking until successful connection...")
   197  	}
   198  
   199  	var err error
   200  	var conn *grpc.ClientConn
   201  	conn, err = grpc.Dial(
   202  		"go-easyops://"+serviceName+"/"+serviceName+"@"+registry, // "go-easyops://" url scheme registered in fancy_resolver.go
   203  		grpc.WithContextDialer(CustomDialer),                     // custom dialer to distinguish between direct and proxy tcp connections
   204  		grpc.WithBlock(),                                         // do not return until at least one connection is up
   205  		//grpc.WithBalancerName("fancybalancer"),                   // "fancybalancer" registered in fancy_balancer.go
   206  		grpc.WithDefaultServiceConfig(fancy_balancer_json),
   207  		grpc.WithTransportCredentials(GetClientCreds()),          // transport credentials: default hardcoded certificates
   208  		grpc.WithUnaryInterceptor(ClientMetricsUnaryInterceptor), // this is called for every unary RPC
   209  		grpc.WithStreamInterceptor(unaryStreamInterceptor),       // this is called for every stream RPC
   210  	)
   211  
   212  	if err != nil {
   213  		return nil, err
   214  	}
   215  
   216  	if *dialer_debug {
   217  		fmt.Printf("Connected to %s\n", serviceName)
   218  	}
   219  
   220  	return conn, nil
   221  }
   222  
   223  // given a fqdn like so:
   224  // "/auth.AuthenticationService/GetByToken"
   225  // it'll return service and method as strings
   226  func splitMethodAndService(fqdn string) (string, string, error) {
   227  	ms := strings.Split(fqdn, "/")
   228  	if len(ms) != 3 {
   229  		return "", "", fmt.Errorf("%s is not a valid service name (contains %d parts instead of 3)", fqdn, len(ms))
   230  	}
   231  	return ms[1], ms[2], nil
   232  }
   233  func isKnownNotAuthRPCs(s, m string) bool {
   234  	sn := fmt.Sprintf("%s.%s", s, m)
   235  	for _, k := range known_not_auth_rpcs {
   236  		if k == sn {
   237  			return true
   238  		}
   239  	}
   240  	return false
   241  }
   242  

View as plain text