1 /* 2 This package facilitates making load-balanced, fail-over, authenticated gRPC calls to server. (it also provides shortcuts to objectstore get/put) 3 4 Typically, in the yacloud, a new client is constructed via the proto package. For example 5 6 import "golang.conradwood.net/apis/getestservice" 7 ... 8 getestservice.GetEchoClient()... 9 10 # Load balancing 11 12 The client will maintain a list of available targets for grpc calls. Each service has a unique list of targets. The list is periodically and aysnchronously updated by polling the registry. 13 14 If no targets are available, a call to this service will be blocked for some time, then fail. Once failed, all subsequent calls will be failed immediately until a target becomes available. 15 This allows for some basic recovery for circular service dependencies on boot. Whilst it is considered bad practice, it is a pattern commonly found and thus go-easyops attempts to make it work as well as can be expected. (Better to avoid circular dependencies altogether!!) 16 17 RPC calls are not retried - if they fail (for any reason) they will not be sent to a different server. 18 19 # Routing 20 21 This client implements several and distinct features to determine where to route rpc calls too. 22 - Round-Robin for multiple targets 23 - By User: to user specific services 24 - By Context-Tag: arbitrary tags in the context 25 26 In the absense of both user specific services and context-tags, a simple round-robin strategy is implemented for multiple targets. 27 28 # Routing - by user 29 30 A service may be registered with a useraccount instead of a service account (the default if started by a user on the command line. Also see command line flag -ge_disable_user_token). The client determines the current useraccount from the context used to invoke the target. If a service running as the same user as is in the context, the client will route the rpc call to the service running as this user. 31 32 This is intended for debugging and "live" development. The user object is typically created and propagated and the edge of the system, for example at the webserver proxy. Thus, while developing or debugging any rpc server it is often useful to route some (and only some) calls into the development version. Developers should always start their work-in-progress servers under their own useraccount. Thus all calls the Developer makes are routed to their laptop. Subsequent calls go back into the infrastructure, but remain restricted to the useraccount, thus can be considered safe (subject to bugs in the backends of course). 33 34 Note: this is not intended for general production use. For various reasons, it is a really bad idea to fire up rpc servers specific to each user. Instead the rpc server should handle multiple users. The user's token for authentication is considered private, like a password. 35 36 # Routing - by Context-Tag 37 38 A context-tag is a key with a value . 39 40 A service may register itself with one or more tags. A Context may carry one or more tags. On each rpc call the list of targets is iterated through. Any service that has all tags with exactly matching values will be considered for routing, all others dismissed. If after, the first iteration, one or more services remain, those will be used for routing in a round-robin fashion. 41 42 If no exact match is found, the context tags are inspected for "FallbackToPlain" option (see ctx package). If set, the list of all services with exactly 0 tags will be used for round-robin. If not set the rpc will be failed with "no target available". 43 44 Note: Whilst context-tags are often quite useful, their use is generally discouraged, especially for large sets of servers. It is intented to be used for a standardized, quick and reasonably efficient means to route low-volume (~20/s or less) calls to remote rpc servers. In small setups this can often be useful to send information to remote clusters for remote-processing. (using a tag, for example, cluster=lhr, cluster=fra, cluster=lgw, cluster=cdn etc...) 45 46 # Routing - directly 47 48 One can bypass the fail-over and connection management altogether with functions such as ConnectWithIP. This is intented for circumstances where a standardised approach (round-robin/user/context) is unsuitable. Beware of dragons: This approach requires development of load-balancers, fail-over strategies, monitoring, recovery, start-up delays and many other features the default routing strategy implements. The complexity is often underestimated, but soon does become significant. (That is the point of the routing implementations above, really) 49 50 # Standalone operation 51 52 Standalone operation means that no other services are required to make rpc calls between a client and server. Whilst this is very limited (e.g. no load-balancing, no fail-over and NO AUTHENTICATION, it is useful for quickly testing a binary). Both, server and client must be started in standalone mode. 53 54 Also see command line parameter -ge_standalone and package server 55 56 # ObjectStore 57 58 Mostly because an extra package for objectstore seems overkill, this package also provides function to get/put objects into the objectstore. 59 */ 60 package client 61 62 import ( 63 "context" 64 "crypto/tls" 65 "crypto/x509" 66 "errors" 67 "fmt" 68 "net" 69 "runtime/debug" 70 "strings" 71 "sync" 72 "time" 73 74 pb "golang.conradwood.net/apis/registry" 75 "golang.conradwood.net/go-easyops/certificates" 76 "google.golang.org/grpc/credentials" 77 "google.golang.org/grpc/metadata" 78 ) 79 80 var ( 81 got_client_creds = false 82 client_creds credentials.TransportCredentials 83 84 cert = []byte{1, 2, 3} 85 errorList []*errorCache 86 errorLock sync.Mutex 87 reg pb.RegistryClient 88 ) 89 90 type errorCache struct { 91 servicename string 92 lastOccured time.Time 93 lastPrinted time.Time 94 } 95 96 func GetRegistryClient() pb.RegistryClient { 97 if reg == nil { 98 reg = pb.NewRegistryClient(Connect("registry.Registry")) 99 } 100 return reg 101 } 102 103 // opens a tcp connection to a servicename 104 func DialTCPWrapper(serviceName string) (net.Conn, error) { 105 if strings.Contains(serviceName, "/") { 106 s := fmt.Sprintf("Error: The parameter for DialTCPWrapper needs a servicename. not a path. You passed in %s, which looks very much like a path. The \"old-style\" picoservices required a path at this function, but go-framework does not. Did you recently upgrade and did not upgrade a config?\n", serviceName) 107 debug.PrintStack() 108 return nil, errors.New(s) 109 } 110 if reg == nil { 111 reg = pb.NewRegistryClient(Connect("registry.Registry")) 112 } 113 ctx := getContext() 114 //ctx := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) 115 targets, err := reg.GetTarget(ctx, &pb.GetTargetRequest{Name: serviceName, ApiType: pb.Apitype_tcp}) 116 if err != nil { 117 return nil, err 118 } 119 list := targets.Service 120 if len(list) == 0 { 121 return nil, fmt.Errorf("No tcp connection by service %s", serviceName) 122 } 123 if len(list[0].Location.Address) == 0 { 124 return nil, fmt.Errorf("No tcp location found for name %s - is it running?", serviceName) 125 } 126 hostname_s := list[0].Location.Address[0].Host 127 port_s := fmt.Sprintf("%d", list[0].Location.Address[0].Port) 128 129 adr := net.JoinHostPort(hostname_s, port_s) 130 conn, err := net.Dial("tcp", adr) 131 if err != nil { 132 return nil, err 133 } 134 135 return conn, err 136 } 137 138 func hasApi(ar []pb.Apitype, lf pb.Apitype) bool { 139 for _, a := range ar { 140 if a == lf { 141 return true 142 } 143 } 144 return false 145 } 146 147 // get the Client Credentials we use to connect to other RPCs 148 func GetClientCreds() credentials.TransportCredentials { 149 if got_client_creds { 150 return client_creds 151 } 152 roots := x509.NewCertPool() 153 154 frontendCert := certificates.Certificate() 155 156 roots.AppendCertsFromPEM(frontendCert) 157 ImCert := certificates.Ca() //ioutil.ReadFile(*clientca) 158 roots.AppendCertsFromPEM(ImCert) 159 160 pk := certificates.Privatekey() 161 162 cert, err := tls.X509KeyPair(frontendCert, pk) 163 // cert, err := tls.LoadX509KeyPair(*clientcrt, *clientkey) 164 if err != nil { 165 fmt.Printf("Failed to create client certificates: %s\n", err) 166 fmt.Printf("key:\n%s\n", string(pk)) 167 return nil 168 } 169 // verify using the server address in the certificte, not the ACTUAL address 170 creds := credentials.NewTLS(&tls.Config{ 171 ServerName: certificates.ServerName(), 172 Certificates: []tls.Certificate{cert}, 173 RootCAs: roots, 174 InsecureSkipVerify: true, 175 }) 176 client_creds = creds 177 got_client_creds = true 178 return creds 179 } 180 181 func getErrorCacheByName(name string) *errorCache { 182 errorLock.Lock() 183 defer errorLock.Unlock() 184 for _, ec := range errorList { 185 if ec.servicename == name { 186 return ec 187 } 188 } 189 ec := &errorCache{servicename: name, 190 lastOccured: time.Now(), 191 } 192 errorList = append(errorList, ec) 193 return ec 194 } 195 196 func printError(path string, msg string) { 197 e := getErrorCacheByName(path) 198 if e == nil { 199 fmt.Println(msg) 200 return 201 } 202 if !e.needsPrinting() { 203 return 204 } 205 fmt.Println(msg) 206 } 207 208 // returns true if this needs printing 209 // resets counter if it returns true 210 func (e *errorCache) needsPrinting() bool { 211 now := time.Now() 212 if now.Sub(e.lastPrinted) < (time.Duration(5) * time.Minute) { 213 return false 214 } 215 e.lastPrinted = now 216 return false 217 } 218 219 // given an inbound Context (e.g. in an RPC call) this creates a new outbound 220 // context suitable to call other servers 221 // we keep user & org intact. 222 // we add/override 'service' token (with our token) 223 func DIS_OutboundContext(inbound context.Context) context.Context { 224 md, ok := metadata.FromIncomingContext(inbound) 225 if !ok { 226 fmt.Printf("[go-easyops] WARNING -> inbound context has no metadata authentication\n") 227 md = metadata.Pairs() 228 } 229 md = md.Copy() 230 return metadata.NewOutgoingContext(inbound, md) 231 232 } 233 234 // get instances for a service currently being connected to (that is, it will return 0 for services which have not been dialled (yet) 235 func GetConnectedInstanceCount(servicelookupid string) int { 236 for _, r := range resolvers { 237 fmt.Printf("lookup=%s, target=%s\n", servicelookupid, r.target) 238 } 239 return 0 240 241 } 242