...

Source file src/golang.conradwood.net/go-easyops/client/client.go

Documentation: golang.conradwood.net/go-easyops/client

     1  /*
     2  This package facilitates making load-balanced, fail-over, authenticated gRPC calls to server. (it also provides shortcuts to objectstore get/put)
     3  
     4  Typically, in the yacloud, a new client is constructed via the proto package. For example
     5  
     6  	import "golang.conradwood.net/apis/getestservice"
     7  	...
     8  	getestservice.GetEchoClient()...
     9  
    10  # Load balancing
    11  
    12  The client will maintain a list of available targets for grpc calls. Each service has a unique list of targets. The list is periodically and aysnchronously updated by polling the registry.
    13  
    14  If no targets are available, a call to this service will be blocked for some time, then fail. Once failed, all subsequent calls will be failed immediately until a target becomes available.
    15  This allows for some basic recovery for circular service dependencies on boot. Whilst it is considered bad practice, it is a pattern commonly found and thus go-easyops attempts to make it work as well as can be expected. (Better to avoid circular dependencies altogether!!)
    16  
    17  RPC calls are not retried - if they fail (for any reason) they will not be sent to a different server.
    18  
    19  # Routing
    20  
    21  This client implements several and distinct features to determine where to route rpc calls too.
    22    - Round-Robin for multiple targets
    23    - By User: to user specific services
    24    - By Context-Tag: arbitrary tags in the context
    25  
    26  In the absense of both user specific services and context-tags, a simple round-robin strategy is implemented for multiple targets.
    27  
    28  # Routing - by user
    29  
    30  A service may be registered with a useraccount instead of a service account (the default if started by a user on the command line. Also see command line flag -ge_disable_user_token). The client determines the current useraccount from the context used to invoke the target. If a service running as the same user as is in the context, the client will route the rpc call to the service running as this user.
    31  
    32  This is intended for debugging and "live" development. The user object is typically created and propagated and the edge of the system, for example at the webserver proxy. Thus, while developing or debugging any rpc server it is often useful to route some (and only some) calls into the development version. Developers should always start their work-in-progress servers under their own useraccount. Thus all calls the Developer makes are routed to their laptop. Subsequent calls go back into the infrastructure, but remain restricted to the useraccount, thus can be considered safe (subject to bugs in the backends of course).
    33  
    34  Note: this is not intended for general production use. For various reasons, it is a really bad idea to fire up rpc servers specific to each user. Instead the rpc server should handle multiple users. The user's token for authentication is considered private, like a password.
    35  
    36  # Routing - by Context-Tag
    37  
    38  A context-tag is a key with a value .
    39  
    40  A service may register itself with one or more tags. A Context may carry one or more tags. On each rpc call the list of targets is iterated through. Any service that has all tags with exactly matching values will be considered for routing, all others dismissed. If after, the first iteration, one or more services remain, those will be used for routing in a round-robin fashion.
    41  
    42  If no exact match is found, the context tags are inspected for "FallbackToPlain" option (see ctx package). If set, the list of all services with exactly 0 tags will be used for round-robin. If not set the rpc will be failed with "no target available".
    43  
    44  Note: Whilst context-tags are often quite useful, their use is generally discouraged, especially for large sets of servers. It is intented to be used for a standardized, quick and reasonably efficient means to route low-volume (~20/s or less) calls to remote rpc servers. In small setups this can often be useful to send information to remote clusters for remote-processing. (using a tag, for example, cluster=lhr, cluster=fra, cluster=lgw, cluster=cdn etc...)
    45  
    46  # Routing - directly
    47  
    48  One can bypass the fail-over and connection management altogether with functions such as ConnectWithIP. This is intented for circumstances where a standardised approach (round-robin/user/context) is unsuitable. Beware of dragons: This approach requires development of load-balancers, fail-over strategies, monitoring, recovery, start-up delays and many other features the default routing strategy implements. The complexity is often underestimated, but soon does become significant. (That is the point of the routing implementations above, really)
    49  
    50  # Standalone operation
    51  
    52  Standalone operation means that no other services are required to make rpc calls between a client and server. Whilst this is very limited (e.g. no load-balancing, no fail-over and NO AUTHENTICATION, it is useful for quickly testing a binary). Both, server and client must be started in standalone mode.
    53  
    54  Also see command line parameter -ge_standalone and package server
    55  
    56  # ObjectStore
    57  
    58  Mostly because an extra package for objectstore seems overkill, this package also provides function to get/put objects into the objectstore.
    59  */
    60  package client
    61  
    62  import (
    63  	"context"
    64  	"crypto/tls"
    65  	"crypto/x509"
    66  	"errors"
    67  	"fmt"
    68  	"net"
    69  	"runtime/debug"
    70  	"strings"
    71  	"sync"
    72  	"time"
    73  
    74  	pb "golang.conradwood.net/apis/registry"
    75  	"golang.conradwood.net/go-easyops/certificates"
    76  	"google.golang.org/grpc/credentials"
    77  	"google.golang.org/grpc/metadata"
    78  )
    79  
    80  var (
    81  	got_client_creds = false
    82  	client_creds     credentials.TransportCredentials
    83  
    84  	cert      = []byte{1, 2, 3}
    85  	errorList []*errorCache
    86  	errorLock sync.Mutex
    87  	reg       pb.RegistryClient
    88  )
    89  
    90  type errorCache struct {
    91  	servicename string
    92  	lastOccured time.Time
    93  	lastPrinted time.Time
    94  }
    95  
    96  func GetRegistryClient() pb.RegistryClient {
    97  	if reg == nil {
    98  		reg = pb.NewRegistryClient(Connect("registry.Registry"))
    99  	}
   100  	return reg
   101  }
   102  
   103  // opens a tcp connection to a servicename
   104  func DialTCPWrapper(serviceName string) (net.Conn, error) {
   105  	if strings.Contains(serviceName, "/") {
   106  		s := fmt.Sprintf("Error: The parameter for DialTCPWrapper needs a servicename. not a path. You passed in %s, which looks very much like a path. The \"old-style\" picoservices required a path at this function, but go-framework does not. Did you recently upgrade and did not upgrade a config?\n", serviceName)
   107  		debug.PrintStack()
   108  		return nil, errors.New(s)
   109  	}
   110  	if reg == nil {
   111  		reg = pb.NewRegistryClient(Connect("registry.Registry"))
   112  	}
   113  	ctx := getContext()
   114  	//ctx := context.WithTimeout(context.Background(), time.Duration(10)*time.Second)
   115  	targets, err := reg.GetTarget(ctx, &pb.GetTargetRequest{Name: serviceName, ApiType: pb.Apitype_tcp})
   116  	if err != nil {
   117  		return nil, err
   118  	}
   119  	list := targets.Service
   120  	if len(list) == 0 {
   121  		return nil, fmt.Errorf("No tcp connection by service %s", serviceName)
   122  	}
   123  	if len(list[0].Location.Address) == 0 {
   124  		return nil, fmt.Errorf("No tcp location found for name %s - is it running?", serviceName)
   125  	}
   126  	hostname_s := list[0].Location.Address[0].Host
   127  	port_s := fmt.Sprintf("%d", list[0].Location.Address[0].Port)
   128  
   129  	adr := net.JoinHostPort(hostname_s, port_s)
   130  	conn, err := net.Dial("tcp", adr)
   131  	if err != nil {
   132  		return nil, err
   133  	}
   134  
   135  	return conn, err
   136  }
   137  
   138  func hasApi(ar []pb.Apitype, lf pb.Apitype) bool {
   139  	for _, a := range ar {
   140  		if a == lf {
   141  			return true
   142  		}
   143  	}
   144  	return false
   145  }
   146  
   147  // get the Client Credentials we use to connect to other RPCs
   148  func GetClientCreds() credentials.TransportCredentials {
   149  	if got_client_creds {
   150  		return client_creds
   151  	}
   152  	roots := x509.NewCertPool()
   153  
   154  	frontendCert := certificates.Certificate()
   155  
   156  	roots.AppendCertsFromPEM(frontendCert)
   157  	ImCert := certificates.Ca() //ioutil.ReadFile(*clientca)
   158  	roots.AppendCertsFromPEM(ImCert)
   159  
   160  	pk := certificates.Privatekey()
   161  
   162  	cert, err := tls.X509KeyPair(frontendCert, pk)
   163  	//	cert, err := tls.LoadX509KeyPair(*clientcrt, *clientkey)
   164  	if err != nil {
   165  		fmt.Printf("Failed to create client certificates: %s\n", err)
   166  		fmt.Printf("key:\n%s\n", string(pk))
   167  		return nil
   168  	}
   169  	// verify using the server address in the certificte, not the ACTUAL address
   170  	creds := credentials.NewTLS(&tls.Config{
   171  		ServerName:         certificates.ServerName(),
   172  		Certificates:       []tls.Certificate{cert},
   173  		RootCAs:            roots,
   174  		InsecureSkipVerify: true,
   175  	})
   176  	client_creds = creds
   177  	got_client_creds = true
   178  	return creds
   179  }
   180  
   181  func getErrorCacheByName(name string) *errorCache {
   182  	errorLock.Lock()
   183  	defer errorLock.Unlock()
   184  	for _, ec := range errorList {
   185  		if ec.servicename == name {
   186  			return ec
   187  		}
   188  	}
   189  	ec := &errorCache{servicename: name,
   190  		lastOccured: time.Now(),
   191  	}
   192  	errorList = append(errorList, ec)
   193  	return ec
   194  }
   195  
   196  func printError(path string, msg string) {
   197  	e := getErrorCacheByName(path)
   198  	if e == nil {
   199  		fmt.Println(msg)
   200  		return
   201  	}
   202  	if !e.needsPrinting() {
   203  		return
   204  	}
   205  	fmt.Println(msg)
   206  }
   207  
   208  // returns true if this needs printing
   209  // resets counter if it returns true
   210  func (e *errorCache) needsPrinting() bool {
   211  	now := time.Now()
   212  	if now.Sub(e.lastPrinted) < (time.Duration(5) * time.Minute) {
   213  		return false
   214  	}
   215  	e.lastPrinted = now
   216  	return false
   217  }
   218  
   219  // given an inbound Context (e.g. in an RPC call) this creates a new outbound
   220  // context suitable to call other servers
   221  // we keep user & org intact.
   222  // we add/override 'service' token (with our token)
   223  func DIS_OutboundContext(inbound context.Context) context.Context {
   224  	md, ok := metadata.FromIncomingContext(inbound)
   225  	if !ok {
   226  		fmt.Printf("[go-easyops] WARNING -> inbound context has no metadata authentication\n")
   227  		md = metadata.Pairs()
   228  	}
   229  	md = md.Copy()
   230  	return metadata.NewOutgoingContext(inbound, md)
   231  
   232  }
   233  
   234  // get instances for a service currently being connected to (that is, it will return 0 for services which have not been dialled (yet)
   235  func GetConnectedInstanceCount(servicelookupid string) int {
   236  	for _, r := range resolvers {
   237  		fmt.Printf("lookup=%s, target=%s\n", servicelookupid, r.target)
   238  	}
   239  	return 0
   240  
   241  }
   242  

View as plain text