1 package server
2
3 import (
4 "crypto/tls"
5 "crypto/x509"
6 "errors"
7 "flag"
8 "fmt"
9 "net"
10 "net/http"
11 "os"
12 "os/signal"
13 "strings"
14 "sync"
15 "syscall"
16 "time"
17
18 pm "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 au "golang.conradwood.net/apis/auth"
21 echo "golang.conradwood.net/apis/getestservice"
22 pb "golang.conradwood.net/apis/registry"
23 "golang.conradwood.net/go-easyops/auth"
24 ar "golang.conradwood.net/go-easyops/authremote"
25 "golang.conradwood.net/go-easyops/certificates"
26 "golang.conradwood.net/go-easyops/client"
27 "golang.conradwood.net/go-easyops/cmdline"
28 "golang.conradwood.net/go-easyops/common"
29 easyhttp "golang.conradwood.net/go-easyops/http"
30 pp "golang.conradwood.net/go-easyops/profiling"
31 "golang.conradwood.net/go-easyops/prometheus"
32 "golang.conradwood.net/go-easyops/standalone"
33 "golang.conradwood.net/go-easyops/tokens"
34 "golang.conradwood.net/go-easyops/utils"
35 "golang.org/x/net/context"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/reflection"
40 "google.golang.org/grpc/status"
41 )
42
43 const (
44 COOKIE_NAME = "Auth-Token"
45 )
46
47 var (
48 auto_kill = flag.Bool("ge_autokill_instance_on_port", false, "if true, kill an instance on that grpc port before starting service")
49 never_register_service_as_user = flag.Bool("ge_never_register_service_as_user", false, "if true, do not register service as user, even if it is run locally with a user token")
50 reg_tags = flag.String("ge_routing_tags", "", "comma seperated list of key-value pairs. For example -tags=foo=bar,foobar=true")
51 debug_internal_serve = flag.Bool("ge_debug_internal_server", false, "debug the server @ https://.../internal/... (serving metrics amongst other things)")
52 deployDescriptor = flag.String("ge_deployment_descriptor", "", "The deployment path by which other programs can refer to this deployment. expected is: a path of the format: \"V1:namespace/groupname/repository/buildid\"")
53 register_refresh = flag.Int("ge_register_refresh", 10, "registration refresh interval in `seconds`")
54 serverDefs = make(map[string]*serverDef)
55 knownServices []*serverDef
56 stopped bool
57 ticker *time.Ticker
58 promHandler http.Handler
59
60 stdMetrics = NewServerMetrics()
61 startedPreviously = false
62 starterLock sync.Mutex
63 rgclient pb.RegistryClient
64 startup_complete = false
65 )
66
67 type UserCache struct {
68 UserID string
69 created time.Time
70 }
71
72 type Register func(server *grpc.Server) error
73
74
75 type Server interface {
76 AddTag(key, value string)
77 }
78
79 func init() {
80 if cmdline.IsStandalone() {
81 return
82 }
83
84 ticker = time.NewTicker(time.Duration(*register_refresh) * time.Second)
85 go func() {
86 for _ = range ticker.C {
87 reRegister()
88 }
89 }()
90 }
91
92 func NewTCPServerDef(name string) *serverDef {
93 sd := NewServerDef().(*serverDef)
94 sd.tags = make(map[string]string)
95 sd.types = sd.types[:0]
96 sd.types = append(sd.types, pb.Apitype_tcp)
97 sd.name = name
98 return sd
99 }
100
101 func NewHTMLServerDef(name string) *serverDef {
102 sd := NewServerDef().(*serverDef)
103 sd.tags = make(map[string]string)
104 sd.types = sd.types[:0]
105 sd.types = append(sd.types, pb.Apitype_html)
106 sd.name = name
107 return sd
108 }
109
110 func NewServerDef() ServerDef {
111 res := &serverDef{}
112 res.tags = make(map[string]string)
113 res.registered_id = ""
114
119 res.deployPath = deploymentPath()
120 res.types = append(res.types, pb.Apitype_status)
121 res.types = append(res.types, pb.Apitype_grpc)
122 res.registerService = true
123 return res
124 }
125 func deploymentPath() string {
126 if *deployDescriptor != "" {
127 return (*deployDescriptor)[3:]
128 }
129 return ""
130 }
131
132 func stopping(res chan bool) {
133 starterLock.Lock()
134 if stopped {
135 starterLock.Unlock()
136 return
137 }
138 stopped = true
139 starterLock.Unlock()
140 pp.ProfilingStop()
141 fancyPrintf("Server shutdown - deregistering services\n")
142
143 c := client.GetRegistryClient()
144
154
155 for _, sd := range knownServices {
156 fancyPrintf("Deregistering Service \"%s\"\n", sd.toString())
157 ctx := context_Background()
158 ctx, _ = context.WithTimeout(ctx, time.Duration(2)*time.Second)
159
160
161 _, err := c.V2DeregisterService(ctx, &pb.DeregisterServiceRequest{ProcessID: sd.registered_id})
162 if err != nil {
163 fancyPrintf("Failed to deregister Service \"%s\": %s\n", sd.toString(), err)
164 }
165 }
166 }
167
168 func addTags(sd *serverDef) {
169 if *reg_tags == "" {
170 return
171 }
172 vals := strings.Split(*reg_tags, ",")
173 for _, v := range vals {
174 kv := strings.SplitN(v, "=", 2)
175 if len(kv) != 2 {
176 s := fmt.Sprintf("Invalid keyvalue tag: \"%s\" - it splits into %d parts instead of 2\n", v, len(kv))
177 panic(s)
178 }
179 tk := kv[0]
180 tv := kv[1]
181 fmt.Printf("Adding tag \"%s\" with value \"%s\"\n", tk, tv)
182 sd.AddTag(tk, tv)
183 }
184
185 }
186
187
188
189
190
191
192
193 func ServerStartup(sd ServerDef) error {
194 def := sd.(*serverDef)
195 var conn net.Listener
196 var err error
197 if !*auto_kill {
198
199 conn, err = net.Listen("tcp", fmt.Sprintf(":%d", def.port))
200 if err != nil {
201 return err
202 }
203 }
204
205
206 start_ipc()
207 ipc_send_startup(def)
208
209
210
211 if !def.port_set {
212 fmt.Printf("WARNING! server port variable assignment detected. This is deprecated. Instead, use SetPort(). In future your code will not compile.\n")
213 fmt.Printf("Program will continue in 3 seconds\n")
214 time.Sleep(time.Duration(3) * time.Second)
215 }
216 if *auto_kill {
217 ht := easyhttp.NewDirectClient()
218 hr := ht.Get(fmt.Sprintf("https://localhost:%d/internal/pleaseshutdown", def.port))
219 if hr.IsSuccess() {
220 for {
221 ht := easyhttp.NewDirectClient()
222 hr := ht.Get(fmt.Sprintf("https://localhost:%d/internal/pleaseshutdown", def.port))
223 if hr.IsSuccess() {
224 break
225 }
226 time.Sleep(time.Duration(300) * time.Millisecond)
227 }
228 }
229 fmt.Printf("Autokill on port %d complete\n", def.port)
230
231
232 conn, err = net.Listen("tcp", fmt.Sprintf(":%d", def.port))
233 if err != nil {
234 return err
235 }
236 }
237
238 addTags(def)
239 go client.GetSignatureFromAuth()
240 go error_handler_startup()
241 var tk string
242 started := time.Now()
243 for {
244 if client.GotSig() {
245 break
246 }
247 time.Sleep(time.Duration(100) * time.Millisecond)
248 if time.Since(started) > time.Duration(4)*time.Second {
249 fmt.Printf("[go-easyops] WARNING could not retrieve signature in time\n")
250 break
251 }
252 }
253 tokname := ""
254 tokname = "service"
255 tkservice := tokens.GetServiceTokenParameter()
256 var u *au.User
257 if !cmdline.IsStandalone() {
258 tk = tkservice
259 if !cmdline.Datacenter() {
260 tks := tokens.GetUserTokenParameter()
261 if tks != "" {
262 tokname = "user"
263 tk = tks
264 }
265 }
266 var su *au.SignedUser
267 if !def.noAuth {
268 if tk == "" {
269 fancyPrintf("*********** AUTHENTICATION CONFIGURATION ERROR ******************\n")
270 fancyPrintf("Cannot connect to a server without %s token.\n", tokname)
271
272 }
273 su = ar.SignedGetByToken(context_Background(), tk)
274 if su == nil {
275 fancyPrintf("*********** AUTHENTICATION CONFIGURATION ERROR ******************\n")
276 fancyPrintf("The authentication %s token is not valid.\n", tokname)
277 fancyPrintf("Token: \"%s\"\n", tk)
278
279 }
280 u = common.VerifySignedUser(su)
281
282 }
283 if u != nil {
284 if u.ServiceAccount {
285 def.local_service = su
286 } else {
287 if *never_register_service_as_user {
288 fancyPrintf("NOT Registering as a user-specific service (disabled by commandline)\n")
289 } else {
290 fancyPrintf("Registering as a user-specific service, because it is running as:\n")
291 auth.PrintUser(u)
292 def.asUser = su
293 }
294 }
295 }
296 }
297 startOnce()
298 c := make(chan os.Signal, 2)
299 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
300 k := make(chan bool, 10)
301 go func() {
302 <-c
303 go stopping(k)
304 select {
305 case <-k:
306
307 case <-time.After(time.Duration(5) * time.Second):
308
309 }
310 os.Exit(0)
311 }()
312 stopped = false
313 defer stopping(k)
314 listenAddr := fmt.Sprintf(":%d", def.port)
315 s := ""
316 if u != nil {
317 def.service_user_id = u.ID
318 s = fmt.Sprintf(" #%s [%s]", u.ID, auth.Description(u))
319 }
320 fancyPrintf("Starting server%s on %s\n", s, listenAddr)
321
322 if def.tags != nil && len(def.tags) > 0 {
323 fancyPrintf("Routing tags: %v\n", def.tags)
324 }
325
326 BackendCert := certificates.Certificate()
327 BackendKey := certificates.Privatekey()
328 ImCert := certificates.Ca()
329 cert, err := tls.X509KeyPair(BackendCert, BackendKey)
330 if err != nil {
331 return fmt.Errorf("failed to parse certificate: %v\n", err)
332 }
333 roots := x509.NewCertPool()
334 FrontendCert := certificates.Certificate()
335 roots.AppendCertsFromPEM(FrontendCert)
336 roots.AppendCertsFromPEM(ImCert)
337
338 creds := credentials.NewServerTLSFromCert(&cert)
339 var grpcServer *grpc.Server
340
341 grpcServer = grpc.NewServer(grpc.Creds(creds),
342 grpc.UnaryInterceptor(def.UnaryAuthInterceptor),
343 grpc.StreamInterceptor(def.StreamAuthInterceptor),
344 )
345
346 grpc.EnableTracing = true
347
348
349 if def.register != nil {
350 def.register(grpcServer)
351 }
352 if err != nil {
353 fancyPrintf("Serverstartup: failed to register service on startup: %s\n", err)
354 return fmt.Errorf("grpc register error: %s", err)
355 }
356 if len(grpcServer.GetServiceInfo()) > 1 {
357 return fmt.Errorf("cannot register multiple(%d) names", len(grpcServer.GetServiceInfo()))
358 }
359 if def.name == "" {
360 for name, _ := range grpcServer.GetServiceInfo() {
361 def.name = name
362 }
363 }
364 if def.name == "" {
365 fmt.Println("Got no server name!")
366 return errors.New("Missing servername")
367 }
368
369 serverDefs[def.name] = def
370 common.AddExportedServiceName(def.name)
371
372 if def.registerService {
373 fancyPrintf("Adding service %s to registry...\n", def.name)
374 AddRegistry(def)
375 }
376
377 if !def.public {
378 reflection.Register(grpcServer)
379 }
380 start_profiling(def)
381
382
383 err = startHttpServe(def, grpcServer, conn)
384 if err != nil {
385 return err
386 }
387
388
389 fancyPrintf("INTERNAL BUG - we should have never, ever arrived here\n")
390 return nil
391 }
392
393 func startHttpServe(sd *serverDef, grpcServer *grpc.Server, conn net.Listener) error {
394 var err error
395 mux := http.NewServeMux()
396 if !sd.public {
397 mux.HandleFunc("/internal/service-info/", func(w http.ResponseWriter, req *http.Request) {
398 serveServiceInfo(w, req, sd)
399 })
400 mux.HandleFunc("/internal/pleaseshutdown", func(w http.ResponseWriter, req *http.Request) {
401 pleaseShutdown(w, req, grpcServer)
402 })
403 mux.HandleFunc("/internal/health", func(w http.ResponseWriter, req *http.Request) {
404 healthzHandler(w, req, sd)
405 })
406 mux.HandleFunc("/internal/help", func(w http.ResponseWriter, req *http.Request) {
407 helpHandler(w, req, sd)
408 })
409 mux.HandleFunc("/internal/clearcache", func(w http.ResponseWriter, req *http.Request) {
410 clearCacheHandler(w, req)
411 })
412 mux.HandleFunc("/internal/parameters", func(w http.ResponseWriter, req *http.Request) {
413 paraHandler(w, req, sd)
414 })
415
416 nm, _ := prometheus.NonstandMetricNames(pm.DefaultRegisterer.(*pm.Registry))
417 if len(nm) > 0 {
418 for _, n := range nm {
419 fmt.Printf("Reg: \"%s\"\n", n)
420 }
421 panic("something registered outside go-easyops and will not be exposed")
422 }
423 gatherer := prometheus.GetGatherer()
424 h := promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})
425 mux.Handle("/internal/service-info/metrics", h)
426
427 }
428
429
430 BuiltinCert = certificates.Certificate()
431 BuiltinKey = certificates.Privatekey()
432 BuiltinTLSCert, err = tls.X509KeyPair(BuiltinCert, BuiltinKey)
433 utils.Bail("failed to create tls cert", err)
434
435 BackendCert := certificates.Certificate()
436 BackendKey := certificates.Privatekey()
437 cert, err := tls.X509KeyPair(BackendCert, BackendKey)
438
439 srv := &http.Server{
440 Addr: fmt.Sprintf(":%d", sd.port),
441 Handler: grpcHandlerFunc(grpcServer, mux),
442 TLSConfig: &tls.Config{
443 GetCertificate: getcert,
444 Certificates: []tls.Certificate{cert},
445 NextProtos: []string{"h2"},
446 InsecureSkipVerify: true,
447 },
448 }
449
450 fancyPrintf("grpc on port: %d\n", sd.port)
451 go callback_attempt(sd)
452 startup_complete = true
453 err = srv.Serve(tls.NewListener(conn, srv.TLSConfig))
454 fancyPrintf("Serve failed: %v\n", err)
455 return err
456 }
457
458
459 func callback_attempt(sd *serverDef) {
460 url := fmt.Sprintf("https://localhost:%d/internal/health", sd.port)
461 for {
462
463 hr := easyhttp.NewDirectClient().Get(url)
464 if hr.Error() == nil {
465 break
466 }
467 time.Sleep(time.Duration(100) * time.Millisecond)
468 }
469 fmt.Printf("[go-easyops] Server started on port %d\n", sd.port)
470 if sd.callback != nil {
471 sd.callback()
472 }
473 }
474
475
476 func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
477 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
478 path := r.URL.Path
479 if strings.HasPrefix(path, "/internal/debug") {
480 if *debug_internal_serve {
481 fancyPrintf("Serving debug path %s\n", path)
482 }
483 debugHandler(w, r)
484 } else if strings.HasPrefix(path, "/internal/clearcache") {
485 clearCacheHandler(w, r)
486 } else if strings.HasPrefix(path, "/internal/") {
487 if *debug_internal_serve {
488 fancyPrintf("Serving path %s\n", path)
489 }
490 otherHandler.ServeHTTP(w, r)
491 } else {
492 grpcServer.ServeHTTP(w, r)
493 }
494 })
495 }
496
497
498 func UnregisterPortRegistry(port []int) error {
499 var err error
500 client := client.GetRegistryClient()
501
511 var ps []int32
512 for _, p := range port {
513 ps = append(ps, int32(p))
514 }
515 psr := pb.ProcessShutdownRequest{Port: ps}
516 _, err = client.InformProcessShutdown(context_Background(), &psr)
517 return err
518 }
519
520 func find(port int, name string) *serverDef {
521 for _, sd := range knownServices {
522 if sd.port == port && sd.name == name {
523 return sd
524 }
525 }
526 return nil
527 }
528
529 func AddRegistry(sd *serverDef) (string, error) {
530 if find(sd.port, sd.name) == nil {
531 knownServices = append(knownServices, sd)
532 }
533
534 req := pb.ServiceLocation{}
535 req.Service = &pb.ServiceDescription{}
536 req.Service.Name = sd.name
537 req.Service.Path = sd.deployPath
538 sa := &pb.ServiceAddress{Port: int32(sd.port)}
539 req.Address = []*pb.ServiceAddress{sa}
540
541 rsr := &pb.RegisterServiceRequest{
542 ProcessID: cmdline.GetInstanceID(),
543 Port: uint32(sd.port),
544 ApiType: sd.types,
545 ServiceName: sd.name,
546 Pid: cmdline.GetPid(),
547 RoutingInfo: &pb.RoutingInfo{},
548 UserID: sd.service_user_id,
549 Health: GetHealth(),
550 }
551 if sd.asUser != nil {
552 rsr.RoutingInfo.RunningAs = common.VerifySignedUser(sd.asUser)
553 }
554 if sd.tags != nil {
555 rsr.RoutingInfo.Tags = sd.tags
556 }
557 if cmdline.IsStandalone() {
558 return standalone.RegisterService(rsr)
559 }
560 if rgclient == nil {
561 rgclient = client.GetRegistryClient()
562 }
563 resp, err := rgclient.V2RegisterService(context_Background(), rsr)
564 if err != nil {
565 fancyPrintf("RegisterService(%s) failed: %s\n", req.Service.Name, err)
566 return "", err
567 }
568 if resp == nil {
569 fmt.Println("Registration failed with no error provided.")
570 }
571 sd.registered_id = rsr.ProcessID
572
573
574 return sd.registered_id, nil
575 }
576
577 func reRegister() {
578
579 for _, sd := range knownServices {
580 AddRegistry(sd)
581 }
582 }
583
584 func getServerDefByName(name string) *serverDef {
585 return serverDefs[name]
586 }
587 func MethodNameFromUnaryInfo(info *grpc.UnaryServerInfo) string {
588 full := info.FullMethod
589 if full[0] == '/' {
590 full = full[1:]
591 }
592 ns := strings.SplitN(full, "/", 2)
593 if len(ns) < 2 {
594 return ""
595 }
596 res := ns[1]
597 if res[0] == '/' {
598 res = res[1:]
599 }
600 return ns[1]
601 }
602 func ServiceNameFromUnaryInfo(info *grpc.UnaryServerInfo) string {
603 full := info.FullMethod
604 if full[0] == '/' {
605 full = full[1:]
606 }
607 ns := strings.SplitN(full, "/", 2)
608 return ns[0]
609 }
610
611 func targetName(name string) string {
612 x := strings.Split(name, ".")
613 return x[0]
614 }
615
616 func isInternalService(name string) bool {
617 if name == "grpc.reflection.v1alpha.ServerReflection" {
618 return true
619 }
620 return false
621 }
622
623 func startOnce() {
624 starterLock.Lock()
625 if startedPreviously {
626 starterLock.Unlock()
627 return
628 }
629 startedPreviously = true
630 starterLock.Unlock()
631 pp.ProfilingCheckStart()
632 }
633
634
640 func StartFakeService(name string) {
641 port, err := getFreePort()
642 if err != nil {
643 s := fmt.Sprintf("Failed to get a free port: %s", err)
644 fmt.Println(s)
645 panic(s)
646 }
647 sd := NewServerDef().(*serverDef)
648 sd.SetPort(port)
649 sd.SetRegister(
650 func(server *grpc.Server) error {
651 e := new(echoServer)
652 echo.RegisterEchoServiceServer(server, e)
653 return nil
654 })
655
656 sd.name = name
657 go ServerStartup(sd)
658 }
659
660 type echoServer struct{}
661
662 func (e *echoServer) Ping(ctx context.Context, req *echo.PingRequest) (*echo.PingResponse, error) {
663 fancyPrintf("I was pinged\n")
664 resp := &echo.PingResponse{Response: req}
665 return resp, nil
666 }
667
668
669 func getFreePort() (int, error) {
670 addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
671 if err != nil {
672 return 0, err
673 }
674
675 l, err := net.ListenTCP("tcp", addr)
676 if err != nil {
677 return 0, err
678 }
679 defer l.Close()
680 return l.Addr().(*net.TCPAddr).Port, nil
681 }
682
683 func incFailure(service string, method string, err error) {
684 status := status.Convert(err)
685 var code codes.Code
686 if status != nil {
687 code = status.Code()
688 }
689 grpc_failed_requests.With(prometheus.Labels{"method": method, "servicename": service, "grpccode": fmt.Sprintf("%d", code)}).Inc()
690 }
691
View as plain text