...

Source file src/golang.conradwood.net/go-easyops/client/fancy_addresslist.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"golang.conradwood.net/apis/registry"
     7  	"golang.conradwood.net/go-easyops/auth"
     8  	"google.golang.org/grpc"
     9  	"google.golang.org/grpc/balancer"
    10  	"google.golang.org/grpc/connectivity"
    11  	"google.golang.org/grpc/resolver"
    12  	"strings"
    13  	"sync"
    14  	"time"
    15  )
    16  
    17  /*
    18  this is essentially a list of addresses.
    19  the balancer removes/adds/updates addresses and the
    20  picker reads/chooses/sorts them.
    21  this struct synchronises access between them.
    22  
    23  The list is semi-uptodate, that means, it is cached, but updated if go-easyops determines that the registry
    24  has better information than its cache.
    25  The Addresses in this list are still subject to the filtering done in the registry. The Registry "prefers" certain targets, for example, higher buildids
    26  */
    27  type FancyAddressList struct {
    28  	Name      string
    29  	addresses []*FancyAddr
    30  }
    31  
    32  type FancyAddr struct {
    33  	addr      string
    34  	subcon    balancer.SubConn
    35  	state     connectivity.State
    36  	removed   bool
    37  	Target    *registry.Target
    38  	grpc_con  *grpc.ClientConn // only used if client calls Connect() on this
    39  	dial_lock sync.Mutex
    40  }
    41  
    42  // a key that can be used in maps to find this particular fancyaddress.
    43  func (fa *FancyAddr) Key() string {
    44  	return "faddr_" + fa.addr
    45  }
    46  
    47  // address, including port, e.g. 10.1.1.1:6000
    48  func (fa *FancyAddr) Address() string {
    49  	return fa.addr
    50  }
    51  func (fa *FancyAddr) String() string {
    52  	return fmt.Sprintf("%s: %s[%s] removed=%v", fa.Target.ServiceName, fa.addr, fa.state.String(), fa.removed)
    53  }
    54  
    55  // return true if this is _actually_ available. e.g. a TCP reset will cause this connection to be "not ready", but still be listed in the registry and caches
    56  func (fa *FancyAddr) IsReady() bool {
    57  	return fa.state == connectivity.Ready
    58  }
    59  
    60  func (fa *FancyAddr) disconnect() {
    61  	if fa.grpc_con == nil {
    62  		return
    63  	}
    64  	fa.grpc_con.Close()
    65  	fa.grpc_con = nil
    66  }
    67  
    68  /*
    69   open and maintain a connection to this peer. This can help to build custom load-balancers, but is not intented for general-use.
    70   use with caution - using this method required in-depth knowledge of grpc and go-easyops
    71  */
    72  
    73  func (fa *FancyAddr) Connection() (*grpc.ClientConn, error) {
    74  	if fa.grpc_con != nil {
    75  		return fa.grpc_con, nil
    76  	}
    77  	fa.dial_lock.Lock()
    78  	defer fa.dial_lock.Unlock()
    79  	if fa.grpc_con != nil {
    80  		return fa.grpc_con, nil
    81  	}
    82  	dialstring := fmt.Sprintf("%s", fa.addr)
    83  	fmt.Printf("[go-easyops] dialling \"%s\"...\n", dialstring)
    84  	ctx, cnc := context.WithTimeout(context.Background(), time.Duration(2)*time.Second)
    85  	defer cnc()
    86  	gc, err := grpc.DialContext(ctx, dialstring, grpc.WithBlock(),
    87  		grpc.WithTransportCredentials(GetClientCreds()),
    88  		grpc.WithUnaryInterceptor(ClientMetricsUnaryInterceptor),
    89  		grpc.WithStreamInterceptor(unaryStreamInterceptor),
    90  	)
    91  	if err != nil {
    92  		return nil, err
    93  	}
    94  	fa.grpc_con = gc
    95  	return gc, nil
    96  }
    97  func (fal *FancyAddressList) Count() int {
    98  	return len(fal.addresses)
    99  }
   100  func (fal *FancyAddressList) IsEmpty() bool {
   101  	return len(fal.addresses) == 0
   102  }
   103  
   104  // called by the balancer when a FancyAddr has been updated. (or anyone updating FancyAddr)
   105  // we may need to clear some caches (now or in future...)
   106  func (fal *FancyAddressList) Updated() {
   107  }
   108  
   109  // perhaps should check/panic on duplicates here?
   110  func (fal *FancyAddressList) Add(f *FancyAddr) {
   111  	fal.addresses = append(fal.addresses, f)
   112  	fal.Updated()
   113  }
   114  func (fal *FancyAddressList) remove(f *FancyAddr) {
   115  	var nr []*FancyAddr
   116  	for _, fa := range fal.addresses {
   117  		if fa.Key() == f.Key() {
   118  			continue
   119  		}
   120  		nr = append(nr, fa)
   121  	}
   122  	fal.addresses = nr
   123  }
   124  
   125  // returns the fancyaddress that matches the key. see fancyaddress.Key(). this might return nil if no such fancyaddress is known (any more)
   126  func (fal *FancyAddressList) ByKey(key string) *FancyAddr {
   127  	for _, fa := range fal.addresses {
   128  		if fa.Key() == key {
   129  			return fa
   130  		}
   131  	}
   132  	return nil
   133  }
   134  
   135  // removes all addresses which are NOT in the array and returns the removed ones
   136  func (fal *FancyAddressList) RequiredList(addresses []resolver.Address) []*FancyAddr {
   137  	var res []*FancyAddr
   138  	removed := false
   139  	for _, fa := range fal.addresses {
   140  		stillgood := false
   141  		for _, r := range addresses {
   142  			if r.Addr == fa.addr {
   143  				stillgood = true
   144  				break
   145  			}
   146  		}
   147  		if stillgood {
   148  			continue
   149  		}
   150  		fancyPrintf(fal, "balancer: removed %s\n", fa.addr)
   151  		fa.removed = true
   152  		removed = true
   153  		fa.disconnect()
   154  	}
   155  	if removed {
   156  		var fa []*FancyAddr
   157  		for _, foa := range fal.addresses {
   158  			if foa.removed {
   159  				res = append(res, foa)
   160  				continue
   161  			}
   162  			fa = append(fa, foa)
   163  		}
   164  		fal.addresses = fa
   165  		fal.Updated()
   166  	}
   167  	return res
   168  }
   169  
   170  /******************************** find entries by various keys *************************/
   171  
   172  func (fal *FancyAddressList) ByAddr(adr string) *FancyAddr {
   173  	for _, fa := range fal.addresses {
   174  		if fa.addr == adr {
   175  			return fa
   176  		}
   177  	}
   178  	return nil
   179  }
   180  
   181  func (fal *FancyAddressList) BySubCon(sc balancer.SubConn) *FancyAddr {
   182  	var fa *FancyAddr
   183  	for _, a := range fal.addresses {
   184  		if a.subcon == sc {
   185  			fa = a
   186  			break
   187  		}
   188  	}
   189  	return fa
   190  }
   191  
   192  // return all addresses the fancyaddresslist knows about.
   193  func (fal *FancyAddressList) AllAddresses() []*FancyAddr {
   194  	var valids []*FancyAddr
   195  	for _, a := range fal.addresses {
   196  		valids = append(valids, a)
   197  	}
   198  	return valids
   199  }
   200  
   201  // return all "ready" addresses (those with a TCP connection in ready state)
   202  func (fal *FancyAddressList) AllReadyAddresses() []*FancyAddr {
   203  	var valids []*FancyAddr
   204  	for _, a := range fal.addresses {
   205  		if !a.IsReady() {
   206  			continue
   207  		}
   208  		valids = append(valids, a)
   209  	}
   210  	return valids
   211  }
   212  
   213  // return addresses with 0 tags
   214  func (fal *FancyAddressList) ByWithoutTags() []*FancyAddr {
   215  	var valids []*FancyAddr
   216  	// filter addresses to include only those which contain required all tags
   217  	for _, a := range fal.addresses {
   218  		if a.Target == nil {
   219  			continue
   220  		}
   221  		if a.Target.RoutingInfo == nil || a.Target.RoutingInfo.Tags == nil || len(a.Target.RoutingInfo.Tags) == 0 {
   222  			valids = append(valids, a)
   223  		}
   224  	}
   225  	return valids
   226  }
   227  
   228  /*
   229  called for _every_ rpc call when ge_honour_tags flag is true, adjusts the
   230  list of matches by checking whether the addresses matches all the routing tags
   231  supplied.
   232  if no tags are supplied, return _ALL_ targets (including those with tags)
   233  */
   234  
   235  func (fal *FancyAddressList) ByMatchingTags(tags map[string]string) []*FancyAddr {
   236  	fancyPrintf(fal, "Filtering (%d) addresses by tags\n", len(fal.addresses))
   237  	if len(tags) == 0 {
   238  		// no point iterating over all addresses if we have no tags
   239  		// also - we treat "empty list" the same as "no tags specified", that is, return all connections instead of none
   240  		fancyPrintf(fal, "empty list for filterbytags!")
   241  		return fal.addresses
   242  	}
   243  	var valids []*FancyAddr
   244  	// filter addresses to include only those which contain required all tags
   245  	for _, a := range fal.addresses {
   246  		valid := true
   247  		if a.Target == nil || a.Target.RoutingInfo == nil || a.Target.RoutingInfo.Tags == nil {
   248  			fancyPrintf(fal, "tag in %s does have special routing\n", a.addr)
   249  			continue
   250  		}
   251  		for k, v := range tags {
   252  			tgv := a.Target.RoutingInfo.Tags[k]
   253  			if tgv != v {
   254  				fancyPrintf(fal, "tag in %s does not match. \"%s\" != \"%s\"\n", a.addr, tgv, v)
   255  				valid = false
   256  				break
   257  			}
   258  		}
   259  		if valid {
   260  			valids = append(valids, a)
   261  		}
   262  	}
   263  	return valids
   264  }
   265  
   266  // get all those without routinginfo or no routinginfo.user
   267  func (fal *FancyAddressList) ByNoUserRoutingInfo() []*FancyAddr {
   268  	var res []*FancyAddr
   269  	for _, a := range fal.addresses {
   270  		ri := a.Target.RoutingInfo
   271  		if ri != nil && ri.RunningAs != nil {
   272  			continue
   273  		}
   274  		res = append(res, a)
   275  	}
   276  	return res
   277  }
   278  
   279  // get all those with a routinginfo RunningAs user
   280  func (fal *FancyAddressList) ByUser(userid string) []*FancyAddr {
   281  	var res []*FancyAddr
   282  	for _, a := range fal.addresses {
   283  		ri := a.Target.RoutingInfo
   284  		if ri == nil || ri.RunningAs == nil {
   285  			continue
   286  		}
   287  		if ri.RunningAs.ID != userid {
   288  			continue
   289  		}
   290  		res = append(res, a)
   291  	}
   292  	return res
   293  }
   294  
   295  func (fal *FancyAddressList) readyOnly(in []*FancyAddr) []*FancyAddr {
   296  	var valids []*FancyAddr
   297  	bal_state_lock.Lock()
   298  	for _, fa := range in {
   299  		if fa.state != connectivity.Ready {
   300  			continue
   301  		}
   302  		valids = append(valids, fa)
   303  	}
   304  	bal_state_lock.Unlock()
   305  	return valids
   306  
   307  }
   308  
   309  /*
   310  	this is called for _every_ rpc call. it should be performance optimised
   311  	this returns a list of addresses for the picker to pick from.
   312  	this function is what the picker calls. if loadbalancing is implemented by the user, this
   313  	function should be used
   314  
   315  the rules are:
   316  First: Never return any addresses which are not in connectivty state READY.
   317  Then from the remaining addresses (in ready state), follow these rules:
   318  1. If we have 0 addresses with routinginfo for a user, return all.
   319  2. if context has no user, return those without routinginfo.user
   320  3. if 1 or more addresses have a routinginfo.user that matches user in context, return only those
   321   4. otherwise return those without routinguser.info
   322  */
   323  func (fal *FancyAddressList) SelectValid(ctx context.Context) []*FancyAddr {
   324  	nro := fal.ByNoUserRoutingInfo()
   325  	if len(nro) == len(fal.addresses) {
   326  		// ALL addresses have routinginfo, so we have 0 addresses WITHOUT routinginfo
   327  		fancyPrintf(fal, "all addresses for %s have routinginfo\n", fal.Name)
   328  		return fal.readyOnly(nro)
   329  	}
   330  	u := auth.GetUser(ctx)
   331  	if u == nil {
   332  		// user has no context, return those without routinginfo
   333  		fancyPrintf(fal, "user-less rpc\n")
   334  		if len(nro) == 0 && len(fal.addresses) > 0 {
   335  			fmt.Printf("[go-easyops] Warning - of %d targets, all require a user in outbound context (but none provided)\n", len(fal.addresses))
   336  		}
   337  		return fal.readyOnly(nro)
   338  	}
   339  	bu := fal.ByUser(u.ID)
   340  	if len(bu) == 0 {
   341  		// none for this user, return those without routinginfo
   342  		fancyPrintf(fal, "no connections specifically for user %s\n", u.Email)
   343  		return fal.readyOnly(nro)
   344  	}
   345  	fancyPrintf(fal, "%d connections specifically for user %s\n", len(bu), u.Email)
   346  	return fal.readyOnly(bu)
   347  }
   348  
   349  // servicename, e.g. "registry.Registry"
   350  func (fal *FancyAddressList) ServiceName() string {
   351  	s := fal.Name
   352  	idx := strings.Index(s, "@")
   353  	if idx != -1 {
   354  		s = s[:idx]
   355  	}
   356  	return s
   357  }
   358  
   359  func GetAllFancyAddressLists() []*FancyAddressList {
   360  	var res []*FancyAddressList
   361  	for _, bal := range balancers {
   362  		if bal.addresslist != nil {
   363  			res = append(res, bal.addresslist)
   364  		}
   365  	}
   366  	return res
   367  }
   368  

View as plain text