...

Source file src/golang.conradwood.net/go-easyops/client/fancy_resolver.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  // this does the actual resolving with our registry
     4  
     5  import (
     6  	"context"
     7  	//	"crypto/tls"
     8  	"flag"
     9  	"fmt"
    10  	pb "golang.conradwood.net/apis/registry"
    11  	"golang.conradwood.net/go-easyops/common"
    12  	"golang.conradwood.net/go-easyops/prometheus"
    13  	"golang.conradwood.net/go-easyops/utils"
    14  	"google.golang.org/grpc"
    15  	"google.golang.org/grpc/attributes"
    16  	"google.golang.org/grpc/resolver"
    17  	"net"
    18  	"strconv"
    19  	"strings"
    20  	"sync"
    21  	"time"
    22  )
    23  
    24  const (
    25  	RESOLVER_ATTRIBUTE_SERVICE_ADDRESS = "service_address"
    26  )
    27  
    28  var (
    29  	query_for_proxies = flag.Bool("ge_support_proxies", true, "if true, supports routing via and to registrymultiplexer proxies")
    30  	reglock           sync.Mutex
    31  	proxyTargetLock   sync.Mutex
    32  	proxiedTargets    = make(map[string]*ProxyTarget)      // serviceid -> proxytarget
    33  	registryclients   = make(map[string]pb.RegistryClient) // map of "ip:port" -> registry
    34  	resolv_chan       = make(chan int, 50)                 // buffered channel
    35  	resolvers         []*FancyResolver
    36  	totalQueryCtr     = prometheus.NewCounterVec(
    37  		prometheus.CounterOpts{
    38  			Name: "grpc_loadbalancer_registry_queries",
    39  			Help: "counter incremented each time the loadbalancer queries the registry",
    40  		},
    41  		[]string{"servicename"},
    42  	)
    43  )
    44  
    45  type ProxyTarget struct {
    46  	Target    *pb.Target
    47  	created   time.Time
    48  	lastused  time.Time
    49  	goingaway bool
    50  	tcpConn   net.Conn
    51  	tlsConn   net.Conn
    52  }
    53  
    54  func (p *ProxyTarget) key() string {
    55  	return fmt.Sprintf("%s_%s_%d_%s_%s",
    56  		p.Target.ServiceName,
    57  		p.Target.IP,
    58  		p.Target.Port,
    59  		p.Target.RoutingInfo.GatewayID,
    60  		p.Target.Partition,
    61  	)
    62  }
    63  
    64  func init() {
    65  	go resolver_thread()
    66  	resolver.Register(&FancyResolverBuilder{})
    67  	prometheus.MustRegister(totalQueryCtr)
    68  }
    69  
    70  type FancyResolverBuilder struct {
    71  }
    72  
    73  // this scheme matches the url
    74  func (f *FancyResolverBuilder) Scheme() string {
    75  	return "go-easyops"
    76  }
    77  func (f *FancyResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    78  	//fmt.Printf("Target: %#v\n", target)
    79  	authority := target.URL.Host
    80  	//	authority := "foo-bar"
    81  	//fmt.Printf("Authority: %s\n", authority)
    82  	// authority is the servicename, e.g. "helloworld.HelloWorld")
    83  	if authority == "" {
    84  		fmt.Printf("[go-easyops] invalid build options\n")
    85  		fmt.Printf("[go-easyops] target: %#v\n", target)
    86  		fmt.Printf("[go-easyops] con: %#v\n", cc)
    87  		panic("no target authority")
    88  	}
    89  	var registry string
    90  	if !strings.Contains(target.Endpoint(), "@") {
    91  		panic(fmt.Sprintf("Invalid url - no registry in resolver. is \"%s\", missing @host:ip", target.Endpoint()))
    92  	}
    93  	rs := strings.Split(target.Endpoint(), "@")
    94  	registry = rs[len(rs)-1]
    95  	res := &FancyResolver{cc: cc, target: authority, registry: registry}
    96  	fancyPrintf(res, "fancy_resolver(): Request to build resolver for %#v\n", target)
    97  
    98  	common.AddServiceName(res.target)
    99  	resolvers = append(resolvers, res)
   100  	resolv_chan <- 1
   101  	return res, nil
   102  }
   103  
   104  type FancyResolver struct {
   105  	registry                 string
   106  	target                   string
   107  	cc                       resolver.ClientConn
   108  	noInstanceWarningPrinted bool
   109  	instances                int
   110  	lastScanned              time.Time
   111  }
   112  
   113  func (f *FancyResolver) ResolveNow(opts resolver.ResolveNowOptions) {
   114  	fancyPrintf(f, "ResolveNow() on target %s with opts: %#v\n", f.target, opts)
   115  	resolv_chan <- 1
   116  
   117  }
   118  
   119  func (f *FancyResolver) Close() {
   120  	return
   121  }
   122  
   123  // called sync by the resolver_thread
   124  func (f *FancyResolver) ActionResolve() {
   125  
   126  	fancyPrintf(f, "fancy_resolver(): Resolving %s\n", f.target)
   127  	regs, err := f.queryForInstances()
   128  	if err != nil {
   129  		fancyPrintf(f, "Error resolving: %s\n", err)
   130  		f.cc.ReportError(err)
   131  		return
   132  	}
   133  	f.instances = len(regs)
   134  	f.updateCounters(len(regs))
   135  	f.blockedWarning(len(regs))
   136  	var ra []resolver.Address
   137  	for _, a := range regs {
   138  		rad := resolver.Address{
   139  			ServerName: "go-easyops-server-name",
   140  			Addr:       fmt.Sprintf("%s%s:%d", DIRECT_PREFIX, a.IP, a.Port),
   141  			Attributes: attributes.New(RESOLVER_ATTRIBUTE_SERVICE_ADDRESS, a),
   142  		}
   143  		if a.RoutingInfo != nil && a.RoutingInfo.GatewayID != "" && *query_for_proxies {
   144  			pt := &ProxyTarget{Target: a, created: time.Now()}
   145  			proxiedTargets[pt.key()] = pt
   146  			rad.Addr = fmt.Sprintf("%s%s", PROXY_PREFIX, pt.key())
   147  		}
   148  
   149  		ra = append(ra, rad)
   150  		//	fancyPrintf(f,"fancy_resolver(): service \"%s\" on address: %s\n", r.target, a)
   151  	}
   152  	f.cc.UpdateState(resolver.State{Addresses: ra})
   153  }
   154  
   155  // update prometheus counters for got or not got instances
   156  func (f *FancyResolver) updateCounters(adrcount int) {
   157  	if adrcount == 0 {
   158  		blockCtr.With(prometheus.Labels{"servicename": f.target}).Inc()
   159  	}
   160  	// done in "queryForActiveInstances"
   161  	//	totalQueryCtr.With(prometheus.Labels{"servicename": f.target}).Inc()
   162  
   163  }
   164  
   165  // print a warning (or cancellation of warning) if no instances are found for a service
   166  func (f *FancyResolver) blockedWarning(adrcount int) {
   167  	if adrcount == 0 && !f.noInstanceWarningPrinted {
   168  		fmt.Printf("WARNING - no instances for \"%s\"\n", f.target)
   169  		f.noInstanceWarningPrinted = true
   170  	}
   171  	if adrcount != 0 && f.noInstanceWarningPrinted {
   172  		fmt.Printf("WARNING CANCEL - %d instances for \"%s\"\n", adrcount, f.target)
   173  		f.noInstanceWarningPrinted = false
   174  	}
   175  }
   176  
   177  /********************* this thread monitors the registry and provides regular updates ***********/
   178  func resolver_thread() {
   179  	interval := defaultInterval() // update sleep interval to match flag
   180  	for {
   181  		select {
   182  		case _ = <-resolv_chan:
   183  		//
   184  		case <-time.After(interval):
   185  			//
   186  		}
   187  
   188  		if len(resolvers) == 0 {
   189  			continue
   190  		}
   191  
   192  		interval = defaultInterval() // update sleep interval in case flags change
   193  		//	fancyPrintf(f,"fancy_resolver(): resolving...\n")
   194  		for _, r := range resolvers {
   195  			if r.instances != 0 && (time.Since(r.lastScanned) < defaultInterval()) {
   196  				// don't scan resolver - it has been scanned recently
   197  				continue
   198  			}
   199  			r.ActionResolve() // get potential instances from registry for this resolvers' target
   200  			if r.instances == 0 {
   201  				// if we have a resolver w/o backends, query that one more frequently
   202  				interval = time.Duration(1) * time.Second
   203  			}
   204  		}
   205  	}
   206  }
   207  
   208  func defaultInterval() time.Duration {
   209  	return *normal_sleep_time
   210  }
   211  
   212  // get the ip:port listings from the registry for this service
   213  func (f *FancyResolver) queryForInstances() ([]*pb.Target, error) {
   214  	return queryServiceInstances(f.registry, f.target)
   215  }
   216  
   217  /*
   218  	func hasGRPC(r *pb.Registration) bool {
   219  		for _, a := range r.Target.ApiType {
   220  			if a == pb.Apitype_grpc {
   221  				return true
   222  			}
   223  		}
   224  		return false
   225  	}
   226  */
   227  func getRegistryClient(registryAddress string) (pb.RegistryClient, error) {
   228  	r := registryclients[registryAddress]
   229  	if r != nil {
   230  		return r, nil
   231  	}
   232  	reglock.Lock()
   233  	defer reglock.Unlock()
   234  
   235  	// connect to registry
   236  	//	fmt.Printf("[go-easyops] Connecting to registry at %s...\n", registryAddress)
   237  	var err error
   238  	// try to use tls first
   239  	conn := withTLS(registryAddress)
   240  	if conn == nil {
   241  		fmt.Printf("[go-easyops] Failed to connect to registry (%s) via TLS, falling back to non-tls\n", registryAddress)
   242  		conn, err = grpc.Dial(
   243  			registryAddress,
   244  			//grpc.WithTransportCredentials(GetClientCreds()),
   245  			grpc.WithInsecure(),
   246  			//			grpc.WithUnaryInterceptor(unaryClientInterceptor),
   247  			//			grpc.WithStreamInterceptor(unaryStreamInterceptor),
   248  			grpc.WithTimeout(time.Duration(CONST_CALL_TIMEOUT)*time.Second),
   249  		)
   250  	}
   251  	if err != nil {
   252  		fmt.Printf("Failed to connect to registry at %s: %s\n", registryAddress, utils.ErrorString(err))
   253  		return nil, err
   254  	}
   255  	registryClient := pb.NewRegistryClient(conn)
   256  	registryclients[registryAddress] = registryClient
   257  	return registryClient, nil
   258  }
   259  
   260  // this is quite a hack. Through tribal knowledge we know that the registry
   261  // exposes RPC as non tls. on "port+1" however it exposes it via TLS
   262  // so we try to connect to that first.
   263  func withTLS(address string) *grpc.ClientConn {
   264  	xs := strings.Split(address, ":")
   265  	if len(xs) < 2 {
   266  		return nil
   267  	}
   268  	xx, err := strconv.Atoi(xs[1])
   269  	if err != nil {
   270  		fmt.Printf("weird registry, not a number \"%s\": %s\n", address, err)
   271  		return nil
   272  	}
   273  	np := fmt.Sprintf("%s:%d", xs[0], xx+1)
   274  	conn, err := grpc.Dial(
   275  		np,
   276  		grpc.WithTransportCredentials(GetClientCreds()),
   277  		//			grpc.WithUnaryInterceptor(unaryClientInterceptor),
   278  		//			grpc.WithStreamInterceptor(unaryStreamInterceptor),
   279  		grpc.WithTimeout(time.Duration(CONST_CALL_TIMEOUT)*time.Second),
   280  	)
   281  	if err != nil {
   282  		fmt.Printf("unable to dial registry with TLS: %s", err)
   283  		return nil
   284  	}
   285  	return conn
   286  }
   287  
   288  func GetProxyTarget(ctx context.Context, serviceid string) (*ProxyTarget, error) {
   289  	proxyTargetLock.Lock()
   290  	defer proxyTargetLock.Unlock()
   291  	pt := proxiedTargets[serviceid]
   292  	if pt == nil {
   293  		return nil, fmt.Errorf("Proxy ID %s is not known here", serviceid)
   294  	}
   295  	if pt.tlsConn == nil {
   296  		tcs := fmt.Sprintf("%s:%d", pt.Target.IP, pt.Target.Port)
   297  		scs := fmt.Sprintf("\"%s\"@%s", pt.Target.ServiceName, tcs)
   298  		fmt.Printf("Dialing proxy-connection %s\n", scs)
   299  		conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", tcs)
   300  		if err != nil {
   301  			fmt.Printf("Failed to connect %s: %s\n", scs, err)
   302  			return nil, err
   303  		}
   304  		err = pt.Start(conn)
   305  		if err != nil {
   306  			conn.Close()
   307  			fmt.Printf("Failed to start connection %s: %s\n", scs, err)
   308  			return nil, err
   309  		}
   310  		//		fmt.Printf("[go-easyops] WARNING client requested serviceid \"%s\", which is not resolvable\n", serviceid)
   311  		return pt, nil
   312  	}
   313  	pt.lastused = time.Now()
   314  	return pt, nil
   315  }
   316  
   317  // send the initialisation sequence
   318  func (p *ProxyTarget) Start(c net.Conn) error {
   319  	//	tc := tls.Client(c, &tls.Config{InsecureSkipVerify: true})
   320  
   321  	s, err := utils.Marshal(p.Target.RoutingInfo)
   322  	if err != nil {
   323  		return err
   324  	}
   325  	buf := []byte("C" + s + "\n")
   326  	_, err = c.Write(buf)
   327  	if err != nil {
   328  		return err
   329  	}
   330  	//	p.tlsConn = tc
   331  	p.tcpConn = c
   332  	fmt.Printf("Started tcp connection for %s\n", p.Target.ServiceName)
   333  	return err
   334  }
   335  func (f *FancyResolverBuilder) ServiceName() string {
   336  	return "fancyresolverbuilder"
   337  }
   338  func (f *FancyResolver) ServiceName() string {
   339  	return f.target
   340  }
   341  

View as plain text