...

Source file src/golang.conradwood.net/go-easyops/server/server.go

Documentation: golang.conradwood.net/go-easyops/server

     1  package server
     2  
     3  import (
     4  	"crypto/tls"
     5  	"crypto/x509"
     6  	"errors"
     7  	"flag"
     8  	"fmt"
     9  	"net"
    10  	"net/http"
    11  	"os"
    12  	"os/signal"
    13  	"strings"
    14  	"sync"
    15  	"syscall"
    16  	"time"
    17  
    18  	pm "github.com/prometheus/client_golang/prometheus"
    19  	"github.com/prometheus/client_golang/prometheus/promhttp"
    20  	au "golang.conradwood.net/apis/auth"
    21  	echo "golang.conradwood.net/apis/getestservice"
    22  	pb "golang.conradwood.net/apis/registry"
    23  	"golang.conradwood.net/go-easyops/auth"
    24  	ar "golang.conradwood.net/go-easyops/authremote"
    25  	"golang.conradwood.net/go-easyops/certificates"
    26  	"golang.conradwood.net/go-easyops/client"
    27  	"golang.conradwood.net/go-easyops/cmdline"
    28  	"golang.conradwood.net/go-easyops/common"
    29  	easyhttp "golang.conradwood.net/go-easyops/http"
    30  	pp "golang.conradwood.net/go-easyops/profiling"
    31  	"golang.conradwood.net/go-easyops/prometheus"
    32  	"golang.conradwood.net/go-easyops/standalone"
    33  	"golang.conradwood.net/go-easyops/tokens"
    34  	"golang.conradwood.net/go-easyops/utils"
    35  	"golang.org/x/net/context"
    36  	"google.golang.org/grpc"
    37  	"google.golang.org/grpc/codes"
    38  	"google.golang.org/grpc/credentials"
    39  	"google.golang.org/grpc/reflection"
    40  	"google.golang.org/grpc/status"
    41  )
    42  
    43  const (
    44  	COOKIE_NAME = "Auth-Token"
    45  )
    46  
    47  var (
    48  	auto_kill                      = flag.Bool("ge_autokill_instance_on_port", false, "if true, kill an instance on that grpc port before starting service")
    49  	never_register_service_as_user = flag.Bool("ge_never_register_service_as_user", false, "if true, do not register service as user, even if it is run locally with a user token")
    50  	reg_tags                       = flag.String("ge_routing_tags", "", "comma seperated list of key-value pairs. For example -tags=foo=bar,foobar=true")
    51  	debug_internal_serve           = flag.Bool("ge_debug_internal_server", false, "debug the server @ https://.../internal/... (serving metrics amongst other things)")
    52  	deployDescriptor               = flag.String("ge_deployment_descriptor", "", "The deployment path by which other programs can refer to this deployment. expected is: a path of the format: \"V1:namespace/groupname/repository/buildid\"")
    53  	register_refresh               = flag.Int("ge_register_refresh", 10, "registration refresh interval in `seconds`")
    54  	serverDefs                     = make(map[string]*serverDef)
    55  	knownServices                  []*serverDef // all services, even not known ones
    56  	stopped                        bool
    57  	ticker                         *time.Ticker
    58  	promHandler                    http.Handler
    59  	//promReg         = prometheus.NewRegistry()
    60  	stdMetrics        = NewServerMetrics()
    61  	startedPreviously = false
    62  	starterLock       sync.Mutex
    63  	rgclient          pb.RegistryClient
    64  	startup_complete  = false
    65  )
    66  
    67  type UserCache struct {
    68  	UserID  string
    69  	created time.Time
    70  }
    71  
    72  type Register func(server *grpc.Server) error
    73  
    74  // server interface
    75  type Server interface {
    76  	AddTag(key, value string)
    77  }
    78  
    79  func init() {
    80  	if cmdline.IsStandalone() {
    81  		return
    82  	}
    83  	// start period re-registration
    84  	ticker = time.NewTicker(time.Duration(*register_refresh) * time.Second)
    85  	go func() {
    86  		for _ = range ticker.C {
    87  			reRegister()
    88  		}
    89  	}()
    90  }
    91  
    92  func NewTCPServerDef(name string) *serverDef {
    93  	sd := NewServerDef().(*serverDef)
    94  	sd.tags = make(map[string]string)
    95  	sd.types = sd.types[:0]
    96  	sd.types = append(sd.types, pb.Apitype_tcp)
    97  	sd.name = name
    98  	return sd
    99  }
   100  
   101  func NewHTMLServerDef(name string) *serverDef {
   102  	sd := NewServerDef().(*serverDef)
   103  	sd.tags = make(map[string]string)
   104  	sd.types = sd.types[:0]
   105  	sd.types = append(sd.types, pb.Apitype_html)
   106  	sd.name = name
   107  	return sd
   108  }
   109  
   110  func NewServerDef() ServerDef {
   111  	res := &serverDef{}
   112  	res.tags = make(map[string]string)
   113  	res.registered_id = ""
   114  	/*
   115  		res.Key = Privatekey
   116  		res.Certificate = Certificate
   117  		res.CA = Ca
   118  	*/
   119  	res.deployPath = deploymentPath()
   120  	res.types = append(res.types, pb.Apitype_status)
   121  	res.types = append(res.types, pb.Apitype_grpc)
   122  	res.registerService = true
   123  	return res
   124  }
   125  func deploymentPath() string {
   126  	if *deployDescriptor != "" {
   127  		return (*deployDescriptor)[3:]
   128  	}
   129  	return ""
   130  }
   131  
   132  func stopping(res chan bool) {
   133  	starterLock.Lock()
   134  	if stopped {
   135  		starterLock.Unlock()
   136  		return
   137  	}
   138  	stopped = true
   139  	starterLock.Unlock()
   140  	pp.ProfilingStop()
   141  	fancyPrintf("Server shutdown - deregistering services\n")
   142  
   143  	c := client.GetRegistryClient()
   144  	/*
   145  		opts := []grpc.DialOption{grpc.WithInsecure()}
   146  		rconn, err := grpc.Dial(cmdline.GetRegistryAddress(), opts...)
   147  		if err != nil {
   148  			fancyPrintf("failed to dial registry server: %v", err)
   149  			return
   150  		}
   151  		defer rconn.Close()
   152  		c = pb.NewRegistryClient(rconn)
   153  	*/
   154  	// value is a serverdef
   155  	for _, sd := range knownServices {
   156  		fancyPrintf("Deregistering Service \"%s\"\n", sd.toString())
   157  		ctx := context_Background()
   158  		ctx, _ = context.WithTimeout(ctx, time.Duration(2)*time.Second) // don't hang on shutdown
   159  
   160  		//		ctx := authremote.Context()
   161  		_, err := c.V2DeregisterService(ctx, &pb.DeregisterServiceRequest{ProcessID: sd.registered_id})
   162  		if err != nil {
   163  			fancyPrintf("Failed to deregister Service \"%s\": %s\n", sd.toString(), err)
   164  		}
   165  	}
   166  }
   167  
   168  func addTags(sd *serverDef) {
   169  	if *reg_tags == "" {
   170  		return
   171  	}
   172  	vals := strings.Split(*reg_tags, ",")
   173  	for _, v := range vals {
   174  		kv := strings.SplitN(v, "=", 2)
   175  		if len(kv) != 2 {
   176  			s := fmt.Sprintf("Invalid keyvalue tag: \"%s\" - it splits into %d parts instead of 2\n", v, len(kv))
   177  			panic(s)
   178  		}
   179  		tk := kv[0]
   180  		tv := kv[1]
   181  		fmt.Printf("Adding tag \"%s\" with value \"%s\"\n", tk, tv)
   182  		sd.AddTag(tk, tv)
   183  	}
   184  
   185  }
   186  
   187  // this is our typical gRPC server startup
   188  // it sets ourselves up with our own certificates
   189  // which is set for THIS SERVER, so installed/maintained
   190  // together with the server (rather than as part of this software)
   191  // it also configures the rpc server to expect a token to identify
   192  // the user in the rpc metadata call
   193  func ServerStartup(sd ServerDef) error {
   194  	def := sd.(*serverDef)
   195  	var conn net.Listener
   196  	var err error
   197  	if !*auto_kill {
   198  		// do this first to catch the common "address in use" error early before registration of other stuff
   199  		conn, err = net.Listen("tcp", fmt.Sprintf(":%d", def.port))
   200  		if err != nil {
   201  			return err
   202  		}
   203  	}
   204  	//	fmt.Printf("[go-easyops] Starting ipc...\n")
   205  
   206  	start_ipc()
   207  	ipc_send_startup(def)
   208  
   209  	//	fmt.Printf("[go-easyops] ipc started.\n")
   210  
   211  	if !def.port_set {
   212  		fmt.Printf("WARNING! server port variable assignment detected. This is deprecated. Instead, use SetPort(). In future your code will not compile.\n")
   213  		fmt.Printf("Program will continue in 3 seconds\n")
   214  		time.Sleep(time.Duration(3) * time.Second)
   215  	}
   216  	if *auto_kill {
   217  		ht := easyhttp.NewDirectClient()
   218  		hr := ht.Get(fmt.Sprintf("https://localhost:%d/internal/pleaseshutdown", def.port))
   219  		if hr.IsSuccess() {
   220  			for {
   221  				ht := easyhttp.NewDirectClient()
   222  				hr := ht.Get(fmt.Sprintf("https://localhost:%d/internal/pleaseshutdown", def.port))
   223  				if hr.IsSuccess() {
   224  					break
   225  				}
   226  				time.Sleep(time.Duration(300) * time.Millisecond)
   227  			}
   228  		}
   229  		fmt.Printf("Autokill on port %d complete\n", def.port)
   230  
   231  		// do this first to catch the common "address in use" error early before registration of other stuff
   232  		conn, err = net.Listen("tcp", fmt.Sprintf(":%d", def.port))
   233  		if err != nil {
   234  			return err
   235  		}
   236  	}
   237  
   238  	addTags(def)
   239  	go client.GetSignatureFromAuth() // init pubkey
   240  	go error_handler_startup()
   241  	var tk string
   242  	started := time.Now()
   243  	for {
   244  		if client.GotSig() {
   245  			break
   246  		}
   247  		time.Sleep(time.Duration(100) * time.Millisecond)
   248  		if time.Since(started) > time.Duration(4)*time.Second {
   249  			fmt.Printf("[go-easyops] WARNING could not retrieve signature in time\n")
   250  			break
   251  		}
   252  	}
   253  	tokname := ""
   254  	tokname = "service"
   255  	tkservice := tokens.GetServiceTokenParameter()
   256  	var u *au.User
   257  	if !cmdline.IsStandalone() {
   258  		tk = tkservice
   259  		if !cmdline.Datacenter() {
   260  			tks := tokens.GetUserTokenParameter()
   261  			if tks != "" {
   262  				tokname = "user"
   263  				tk = tks
   264  			}
   265  		}
   266  		var su *au.SignedUser
   267  		if !def.noAuth {
   268  			if tk == "" {
   269  				fancyPrintf("*********** AUTHENTICATION CONFIGURATION ERROR ******************\n")
   270  				fancyPrintf("Cannot connect to a server without %s token.\n", tokname)
   271  				//os.Exit(10)
   272  			}
   273  			su = ar.SignedGetByToken(context_Background(), tk)
   274  			if su == nil {
   275  				fancyPrintf("*********** AUTHENTICATION CONFIGURATION ERROR ******************\n")
   276  				fancyPrintf("The authentication %s token is not valid.\n", tokname)
   277  				fancyPrintf("Token: \"%s\"\n", tk)
   278  				//os.Exit(10)
   279  			}
   280  			u = common.VerifySignedUser(su)
   281  
   282  		}
   283  		if u != nil {
   284  			if u.ServiceAccount {
   285  				def.local_service = su
   286  			} else {
   287  				if *never_register_service_as_user {
   288  					fancyPrintf("NOT Registering as a user-specific service (disabled by commandline)\n")
   289  				} else {
   290  					fancyPrintf("Registering as a user-specific service, because it is running as:\n")
   291  					auth.PrintUser(u)
   292  					def.asUser = su
   293  				}
   294  			}
   295  		}
   296  	}
   297  	startOnce()
   298  	c := make(chan os.Signal, 2)
   299  	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
   300  	k := make(chan bool, 10)
   301  	go func() {
   302  		<-c
   303  		go stopping(k)
   304  		select {
   305  		case <-k:
   306  			//
   307  		case <-time.After(time.Duration(5) * time.Second):
   308  			//
   309  		}
   310  		os.Exit(0)
   311  	}()
   312  	stopped = false
   313  	defer stopping(k)
   314  	listenAddr := fmt.Sprintf(":%d", def.port)
   315  	s := ""
   316  	if u != nil {
   317  		def.service_user_id = u.ID
   318  		s = fmt.Sprintf(" #%s [%s]", u.ID, auth.Description(u))
   319  	}
   320  	fancyPrintf("Starting server%s on %s\n", s, listenAddr)
   321  
   322  	if def.tags != nil && len(def.tags) > 0 {
   323  		fancyPrintf("Routing tags: %v\n", def.tags)
   324  	}
   325  
   326  	BackendCert := certificates.Certificate()
   327  	BackendKey := certificates.Privatekey()
   328  	ImCert := certificates.Ca()
   329  	cert, err := tls.X509KeyPair(BackendCert, BackendKey)
   330  	if err != nil {
   331  		return fmt.Errorf("failed to parse certificate: %v\n", err)
   332  	}
   333  	roots := x509.NewCertPool()
   334  	FrontendCert := certificates.Certificate()
   335  	roots.AppendCertsFromPEM(FrontendCert)
   336  	roots.AppendCertsFromPEM(ImCert)
   337  
   338  	creds := credentials.NewServerTLSFromCert(&cert)
   339  	var grpcServer *grpc.Server
   340  	// Create the gRPC server with the credentials
   341  	grpcServer = grpc.NewServer(grpc.Creds(creds),
   342  		grpc.UnaryInterceptor(def.UnaryAuthInterceptor),
   343  		grpc.StreamInterceptor(def.StreamAuthInterceptor),
   344  	)
   345  
   346  	grpc.EnableTracing = true
   347  	// callback to the callers' specific intialisation
   348  	// (set by the caller of this function)
   349  	if def.register != nil {
   350  		def.register(grpcServer)
   351  	}
   352  	if err != nil {
   353  		fancyPrintf("Serverstartup: failed to register service on startup: %s\n", err)
   354  		return fmt.Errorf("grpc register error: %s", err)
   355  	}
   356  	if len(grpcServer.GetServiceInfo()) > 1 {
   357  		return fmt.Errorf("cannot register multiple(%d) names", len(grpcServer.GetServiceInfo()))
   358  	}
   359  	if def.name == "" {
   360  		for name, _ := range grpcServer.GetServiceInfo() {
   361  			def.name = name
   362  		}
   363  	}
   364  	if def.name == "" {
   365  		fmt.Println("Got no server name!")
   366  		return errors.New("Missing servername")
   367  	}
   368  
   369  	serverDefs[def.name] = def
   370  	common.AddExportedServiceName(def.name)
   371  
   372  	if def.registerService {
   373  		fancyPrintf("Adding service %s to registry...\n", def.name)
   374  		AddRegistry(def)
   375  	}
   376  	// something odd?
   377  	if !def.public {
   378  		reflection.Register(grpcServer)
   379  	}
   380  	start_profiling(def)
   381  	// Serve and Listen
   382  	// Blocking call!
   383  	err = startHttpServe(def, grpcServer, conn)
   384  	if err != nil {
   385  		return err
   386  	}
   387  	// Create the channel to listen on
   388  	// I don't think this is ever called!
   389  	fancyPrintf("INTERNAL BUG - we should have never, ever arrived here\n")
   390  	return nil
   391  }
   392  
   393  func startHttpServe(sd *serverDef, grpcServer *grpc.Server, conn net.Listener) error {
   394  	var err error
   395  	mux := http.NewServeMux()
   396  	if !sd.public {
   397  		mux.HandleFunc("/internal/service-info/", func(w http.ResponseWriter, req *http.Request) {
   398  			serveServiceInfo(w, req, sd)
   399  		})
   400  		mux.HandleFunc("/internal/pleaseshutdown", func(w http.ResponseWriter, req *http.Request) {
   401  			pleaseShutdown(w, req, grpcServer)
   402  		})
   403  		mux.HandleFunc("/internal/health", func(w http.ResponseWriter, req *http.Request) {
   404  			healthzHandler(w, req, sd)
   405  		})
   406  		mux.HandleFunc("/internal/help", func(w http.ResponseWriter, req *http.Request) {
   407  			helpHandler(w, req, sd)
   408  		})
   409  		mux.HandleFunc("/internal/clearcache", func(w http.ResponseWriter, req *http.Request) {
   410  			clearCacheHandler(w, req)
   411  		})
   412  		mux.HandleFunc("/internal/parameters", func(w http.ResponseWriter, req *http.Request) {
   413  			paraHandler(w, req, sd)
   414  		})
   415  
   416  		nm, _ := prometheus.NonstandMetricNames(pm.DefaultRegisterer.(*pm.Registry))
   417  		if len(nm) > 0 {
   418  			for _, n := range nm {
   419  				fmt.Printf("Reg: \"%s\"\n", n)
   420  			}
   421  			panic("something registered outside go-easyops and will not be exposed")
   422  		}
   423  		gatherer := prometheus.GetGatherer()
   424  		h := promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})
   425  		mux.Handle("/internal/service-info/metrics", h)
   426  		//	mux.Handle("/internal/service-info/metrics", promhttp.Handler())
   427  	}
   428  
   429  	// set startup for certmanager thing
   430  	BuiltinCert = certificates.Certificate()
   431  	BuiltinKey = certificates.Privatekey()
   432  	BuiltinTLSCert, err = tls.X509KeyPair(BuiltinCert, BuiltinKey)
   433  	utils.Bail("failed to create tls cert", err)
   434  
   435  	BackendCert := certificates.Certificate()
   436  	BackendKey := certificates.Privatekey()
   437  	cert, err := tls.X509KeyPair(BackendCert, BackendKey)
   438  
   439  	srv := &http.Server{
   440  		Addr:    fmt.Sprintf(":%d", sd.port),
   441  		Handler: grpcHandlerFunc(grpcServer, mux),
   442  		TLSConfig: &tls.Config{
   443  			GetCertificate:     getcert,
   444  			Certificates:       []tls.Certificate{cert},
   445  			NextProtos:         []string{"h2"},
   446  			InsecureSkipVerify: true,
   447  		},
   448  	}
   449  
   450  	fancyPrintf("grpc on port: %d\n", sd.port)
   451  	go callback_attempt(sd)
   452  	startup_complete = true
   453  	err = srv.Serve(tls.NewListener(conn, srv.TLSConfig))
   454  	fancyPrintf("Serve failed: %v\n", err)
   455  	return err
   456  }
   457  
   458  // attempt to http call into the server to trigger server_started callback
   459  func callback_attempt(sd *serverDef) {
   460  	url := fmt.Sprintf("https://localhost:%d/internal/health", sd.port)
   461  	for {
   462  		//fmt.Printf("Testing %s\n", url)
   463  		hr := easyhttp.NewDirectClient().Get(url)
   464  		if hr.Error() == nil {
   465  			break
   466  		}
   467  		time.Sleep(time.Duration(100) * time.Millisecond)
   468  	}
   469  	fmt.Printf("[go-easyops] Server started on port %d\n", sd.port)
   470  	if sd.callback != nil {
   471  		sd.callback()
   472  	}
   473  }
   474  
   475  // this function is called by http and works out wether it's a grpc or http-serve request
   476  func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
   477  	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   478  		path := r.URL.Path
   479  		if strings.HasPrefix(path, "/internal/debug") {
   480  			if *debug_internal_serve {
   481  				fancyPrintf("Serving debug path %s\n", path)
   482  			}
   483  			debugHandler(w, r)
   484  		} else if strings.HasPrefix(path, "/internal/clearcache") {
   485  			clearCacheHandler(w, r)
   486  		} else if strings.HasPrefix(path, "/internal/") {
   487  			if *debug_internal_serve {
   488  				fancyPrintf("Serving path %s\n", path)
   489  			}
   490  			otherHandler.ServeHTTP(w, r)
   491  		} else {
   492  			grpcServer.ServeHTTP(w, r)
   493  		}
   494  	})
   495  }
   496  
   497  // mostly for autodeployer
   498  func UnregisterPortRegistry(port []int) error {
   499  	var err error
   500  	client := client.GetRegistryClient()
   501  	/*
   502  		opts := []grpc.DialOption{grpc.WithInsecure()}
   503  		conn, err := grpc.Dial(cmdline.GetRegistryAddress(), opts...)
   504  		if err != nil {
   505  			fancyPrintf("failed to dial registry server: %v", err)
   506  			return err
   507  		}
   508  		defer conn.Close()
   509  		client := pb.NewRegistryClient(conn)
   510  	*/
   511  	var ps []int32
   512  	for _, p := range port {
   513  		ps = append(ps, int32(p))
   514  	}
   515  	psr := pb.ProcessShutdownRequest{Port: ps}
   516  	_, err = client.InformProcessShutdown(context_Background(), &psr)
   517  	return err
   518  }
   519  
   520  func find(port int, name string) *serverDef {
   521  	for _, sd := range knownServices {
   522  		if sd.port == port && sd.name == name {
   523  			return sd
   524  		}
   525  	}
   526  	return nil
   527  }
   528  
   529  func AddRegistry(sd *serverDef) (string, error) {
   530  	if find(sd.port, sd.name) == nil {
   531  		knownServices = append(knownServices, sd)
   532  	}
   533  
   534  	req := pb.ServiceLocation{}
   535  	req.Service = &pb.ServiceDescription{}
   536  	req.Service.Name = sd.name
   537  	req.Service.Path = sd.deployPath
   538  	sa := &pb.ServiceAddress{Port: int32(sd.port)}
   539  	req.Address = []*pb.ServiceAddress{sa}
   540  
   541  	rsr := &pb.RegisterServiceRequest{
   542  		ProcessID:   cmdline.GetInstanceID(),
   543  		Port:        uint32(sd.port),
   544  		ApiType:     sd.types,
   545  		ServiceName: sd.name,
   546  		Pid:         cmdline.GetPid(),
   547  		RoutingInfo: &pb.RoutingInfo{},
   548  		UserID:      sd.service_user_id,
   549  		Health:      GetHealth(),
   550  	}
   551  	if sd.asUser != nil {
   552  		rsr.RoutingInfo.RunningAs = common.VerifySignedUser(sd.asUser)
   553  	}
   554  	if sd.tags != nil {
   555  		rsr.RoutingInfo.Tags = sd.tags
   556  	}
   557  	if cmdline.IsStandalone() {
   558  		return standalone.RegisterService(rsr)
   559  	}
   560  	if rgclient == nil {
   561  		rgclient = client.GetRegistryClient()
   562  	}
   563  	resp, err := rgclient.V2RegisterService(context_Background(), rsr)
   564  	if err != nil {
   565  		fancyPrintf("RegisterService(%s) failed: %s\n", req.Service.Name, err)
   566  		return "", err
   567  	}
   568  	if resp == nil {
   569  		fmt.Println("Registration failed with no error provided.")
   570  	}
   571  	sd.registered_id = rsr.ProcessID
   572  	//	fancyPrintf("Response to register service: %v\n", resp)
   573  	//	fancyPrintf("Registered: %s\n", sd.registered_id)
   574  	return sd.registered_id, nil
   575  }
   576  
   577  func reRegister() {
   578  	// register any that are not yet registered
   579  	for _, sd := range knownServices {
   580  		AddRegistry(sd)
   581  	}
   582  }
   583  
   584  func getServerDefByName(name string) *serverDef {
   585  	return serverDefs[name]
   586  }
   587  func MethodNameFromUnaryInfo(info *grpc.UnaryServerInfo) string {
   588  	full := info.FullMethod
   589  	if full[0] == '/' {
   590  		full = full[1:]
   591  	}
   592  	ns := strings.SplitN(full, "/", 2)
   593  	if len(ns) < 2 {
   594  		return ""
   595  	}
   596  	res := ns[1]
   597  	if res[0] == '/' {
   598  		res = res[1:]
   599  	}
   600  	return ns[1]
   601  }
   602  func ServiceNameFromUnaryInfo(info *grpc.UnaryServerInfo) string {
   603  	full := info.FullMethod
   604  	if full[0] == '/' {
   605  		full = full[1:]
   606  	}
   607  	ns := strings.SplitN(full, "/", 2)
   608  	return ns[0]
   609  }
   610  
   611  func targetName(name string) string {
   612  	x := strings.Split(name, ".")
   613  	return x[0]
   614  }
   615  
   616  func isInternalService(name string) bool {
   617  	if name == "grpc.reflection.v1alpha.ServerReflection" {
   618  		return true
   619  	}
   620  	return false
   621  }
   622  
   623  func startOnce() {
   624  	starterLock.Lock()
   625  	if startedPreviously {
   626  		starterLock.Unlock()
   627  		return
   628  	}
   629  	startedPreviously = true
   630  	starterLock.Unlock()
   631  	pp.ProfilingCheckStart()
   632  }
   633  
   634  /***************************************************
   635  * convenience function to register stuff with the registry
   636  * useful to register long-running clients, for example
   637  * this allows for metrics to be exposed and scraped automatically
   638  * uses a default RPC
   639  ***************************************************/
   640  func StartFakeService(name string) {
   641  	port, err := getFreePort()
   642  	if err != nil {
   643  		s := fmt.Sprintf("Failed to get a free port: %s", err)
   644  		fmt.Println(s)
   645  		panic(s)
   646  	}
   647  	sd := NewServerDef().(*serverDef)
   648  	sd.SetPort(port)
   649  	sd.SetRegister(
   650  		func(server *grpc.Server) error {
   651  			e := new(echoServer)
   652  			echo.RegisterEchoServiceServer(server, e)
   653  			return nil
   654  		})
   655  
   656  	sd.name = name
   657  	go ServerStartup(sd)
   658  }
   659  
   660  type echoServer struct{}
   661  
   662  func (e *echoServer) Ping(ctx context.Context, req *echo.PingRequest) (*echo.PingResponse, error) {
   663  	fancyPrintf("I was pinged\n")
   664  	resp := &echo.PingResponse{Response: req}
   665  	return resp, nil
   666  }
   667  
   668  // ugly race-condition-hack to find a free port
   669  func getFreePort() (int, error) {
   670  	addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
   671  	if err != nil {
   672  		return 0, err
   673  	}
   674  
   675  	l, err := net.ListenTCP("tcp", addr)
   676  	if err != nil {
   677  		return 0, err
   678  	}
   679  	defer l.Close()
   680  	return l.Addr().(*net.TCPAddr).Port, nil
   681  }
   682  
   683  func incFailure(service string, method string, err error) {
   684  	status := status.Convert(err)
   685  	var code codes.Code
   686  	if status != nil {
   687  		code = status.Code()
   688  	}
   689  	grpc_failed_requests.With(prometheus.Labels{"method": method, "servicename": service, "grpccode": fmt.Sprintf("%d", code)}).Inc()
   690  }
   691  

View as plain text