...

Source file src/golang.conradwood.net/go-easyops/client/fancy_balancer.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  import (
     4  	"flag"
     5  	"fmt"
     6  	"strings"
     7  	"sync"
     8  	"time"
     9  
    10  	"golang.conradwood.net/apis/registry"
    11  	"golang.conradwood.net/go-easyops/auth"
    12  	"golang.conradwood.net/go-easyops/utils"
    13  	"google.golang.org/grpc/balancer"
    14  	"google.golang.org/grpc/connectivity"
    15  	"google.golang.org/grpc/resolver"
    16  )
    17  
    18  func init() {
    19  	balancer.Register(&FancyBuilder{})
    20  	go balancer_thread()
    21  }
    22  
    23  var (
    24  	balancers      []*FancyBalancer
    25  	bal_lock       sync.Mutex
    26  	bal_state_lock sync.Mutex
    27  	maxblock       = flag.Float64("ge_max_block", 30, "max `seconds` to block rpcs for if backends are not available (fail afterwards)")
    28  )
    29  
    30  /*********** the builder for our balancer *****************/
    31  type FancyBuilder struct {
    32  }
    33  
    34  // Build creates a new balancer for the (new) ClientConn.
    35  func (f *FancyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    36  	fancyPrintf(f, "Building Balancer for %s\n", opts.Target.Endpoint())
    37  	// name is the dialler address, e.g. "helloworld.HelloWorld@localhost:5000"
    38  	fal := &FancyAddressList{Name: opts.Target.Endpoint()}
    39  	cc.UpdateState(balancer.State{
    40  		ConnectivityState: connectivity.Ready,
    41  		Picker:            &FancyPicker{addresslist: fal}, // not failing - initially we wait
    42  	})
    43  	// target is the servicename, e.g. "helloworld.HelloWorld"
    44  	idx := strings.Index(fal.Name, "@")
    45  	if idx == -1 {
    46  		panic(fmt.Sprintf("unsupported target: %s", fal.Name))
    47  	}
    48  	target := fal.Name[:idx]
    49  	res := &FancyBalancer{cc: cc,
    50  		target:       target, //opts.Target.Authority,
    51  		blockedSince: time.Now(),
    52  		addresslist:  fal,
    53  	}
    54  	fancyPrintf(f, "Built balancer for target \"%s\"\n", res.target)
    55  	if res.target == "" {
    56  		s := fmt.Sprintf("cannot create fancy-balancer without servicename (opts=%#v). Dial must be in the format 'go-easyops://servicename/servicename@registry'", opts)
    57  		panic(s)
    58  	}
    59  	// looks a bit dumb. we really should reuse slots from closed ones
    60  	bal_lock.Lock()
    61  	balancers = append(balancers, res)
    62  	bal_lock.Unlock()
    63  	return res
    64  }
    65  
    66  // Name returns the name of balancers built by this builder.
    67  // It will be used to pick balancers (for example in service config).
    68  func (f *FancyBuilder) Name() string {
    69  	return "fancybalancer"
    70  }
    71  
    72  /*********** the balancer *****************/
    73  
    74  type FancyBalancer struct {
    75  	cc           balancer.ClientConn
    76  	target       string
    77  	addresslist  *FancyAddressList
    78  	closed       bool
    79  	failing      bool
    80  	blockedSince time.Time
    81  }
    82  
    83  // EXPERIMENTAL: this is the new-style grpc callback
    84  func (f *FancyBalancer) ResolverError(err error) {
    85  	if *debug_fancy {
    86  		utils.NotImpl("resolver error")
    87  	}
    88  	fmt.Printf("[go-easyops] Resolver reported an error, which is not handled yet: %s\n", err)
    89  	//	panic("[go-easyops] compiled with incompatible grpc library version")
    90  }
    91  
    92  // EXPERIMENTAL: this is the new-style grpc callback, called by the resolver when a state changes
    93  // it feeds us new addresses
    94  func (f *FancyBalancer) UpdateClientConnState(bc balancer.ClientConnState) error {
    95  	fancyPrintf(f, "balancer: updateclientconnstate (ResolverState: %d addresses)\n", len(bc.ResolverState.Addresses))
    96  	f.HandleResolvedAddrs(bc.ResolverState.Addresses, nil)
    97  	/*
    98  		f.cc.UpdateState(balancer.State{
    99  			ConnectivityState: connectivity.Ready,
   100  			Picker:            f.Picker(),
   101  		})
   102  	*/
   103  	return nil
   104  }
   105  
   106  // EXPERIMENTAL: this is the new-style grpc callback
   107  func (f *FancyBalancer) UpdateSubConnState(sc balancer.SubConn, bc balancer.SubConnState) {
   108  	fancyPrintf(f, "balancer: updatesubconnstate\n")
   109  	f.HandleSubConnStateChange(sc, bc.ConnectivityState)
   110  }
   111  
   112  // DEPRECATED - old version of grpc
   113  // HandleSubConnStateChange is called by gRPC when the connectivity state
   114  // of sc has changed.
   115  // Balancer is expected to aggregate all the state of SubConn and report
   116  // that back to gRPC.
   117  // Balancer should also generate and update Pickers when its internal state has
   118  // been changed by the new state.
   119  func (f *FancyBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
   120  	fa := f.addresslist.BySubCon(sc)
   121  	if fa == nil {
   122  		fancyPrintf(f, "balancer: SubConnState on a subconnection we don't know (%#v)!\n", sc)
   123  		//		panic("what to do with this subconnection??")
   124  		f.cc.UpdateState(balancer.State{
   125  			ConnectivityState: connectivity.Ready,
   126  			Picker:            f.Picker(),
   127  		})
   128  
   129  		return
   130  	}
   131  
   132  	bal_state_lock.Lock()
   133  	oldstate := fa.state
   134  	fa.state = state
   135  	bal_state_lock.Unlock()
   136  
   137  	fancyPrintf(f, "balancer: Handlesubstate service %s at %s transitioned from %s to %s\n", f.target, fa.addr, oldstate.String(), state.String())
   138  	f.failing = false
   139  	f.cc.UpdateState(balancer.State{
   140  		ConnectivityState: connectivity.Ready,
   141  		Picker:            f.Picker(),
   142  	})
   143  }
   144  
   145  // DEPRECATED - old version of grpc
   146  // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
   147  // balancers.
   148  // Balancer can create new SubConn or remove SubConn with the addresses.
   149  // An empty address slice and a non-nil error will be passed if the resolver returns
   150  // non-nil error to gRPC.
   151  // Note that each address MUST have an attribute with a registry.ServiceAddress
   152  // because we cannot transport all the information in just ip/port
   153  func (f *FancyBalancer) HandleResolvedAddrs(addresses []resolver.Address, err error) {
   154  	//	fancyPrintf(f,"balancer: HandleResolveAddrs addressed = %d (err=%v)\n", len(addresses), err)
   155  
   156  	// create new ones:
   157  	added := false
   158  	for _, resolverAddr := range addresses {
   159  		var sa *registry.Target
   160  		if resolverAddr.Attributes != nil {
   161  			o := resolverAddr.Attributes.Value(RESOLVER_ATTRIBUTE_SERVICE_ADDRESS)
   162  			if o != nil {
   163  				sa = o.(*registry.Target)
   164  			}
   165  		}
   166  		if sa == nil {
   167  			// see note above. serviceAddress is required!
   168  			s := fmt.Sprintf("fancy balancer received a very unfancy address without registry.ServiceAddress attribute for \"%s\". Weird resolver?", f.target)
   169  			panic(s)
   170  		}
   171  		rf := ""
   172  		ri := sa.RoutingInfo
   173  		if ri != nil {
   174  			u := ri.RunningAs
   175  			if u != nil {
   176  				rf = fmt.Sprintf("runningas=%s/#%s", auth.Description(u), u.ID)
   177  			}
   178  		}
   179  		fr := f.addresslist.ByAddr(resolverAddr.Addr)
   180  		if fr != nil {
   181  			fr.Target = sa
   182  			f.addresslist.Updated()
   183  			fancyPrintf(f, "balancer: %s, conn %s known as state %s\n", f.target, resolverAddr.Addr, fr.state.String())
   184  			continue
   185  		}
   186  		fancyPrintf(f, "balancer: New Address %s (%s)\n", resolverAddr.Addr, rf)
   187  		// not yet known - create a new sub connection
   188  		sco, err := f.cc.NewSubConn([]resolver.Address{resolverAddr}, balancer.NewSubConnOptions{})
   189  		if err != nil {
   190  			fancyPrintf(f, "Failed to create subconn: %s\n", err)
   191  			continue
   192  		}
   193  		//	sc = append(sc, sco)
   194  		f.addresslist.Add(&FancyAddr{
   195  			state: connectivity.Ready, // docs say use CONNECTING here, but that never calls the picker nor the stateupdate. how does that work?
   196  			//			state:  connectivity.Idle,
   197  			//state:  connectivity.Connecting,
   198  			addr:   resolverAddr.Addr,
   199  			subcon: sco,
   200  			Target: sa,
   201  		})
   202  		added = true
   203  	}
   204  	// we also need to remove connections which are no longer valid for this service:
   205  	remlist := f.addresslist.RequiredList(addresses)
   206  	for _, fa := range remlist {
   207  		f.cc.RemoveSubConn(fa.subcon)
   208  	}
   209  	removed := len(remlist) != 0
   210  	if !added && !removed {
   211  		fancyPrintf(f, "balancer: no state change for \"%s\"\n", f.target)
   212  		f.cc.UpdateState(balancer.State{ // got to send it anyway (connections might have changed)
   213  			ConnectivityState: connectivity.Ready,
   214  			Picker:            f.Picker(),
   215  		})
   216  		return
   217  	}
   218  	f.failing = false
   219  	fancyPrintf(f, "balancer: Sending state update for \"%s\", we got %d subconnections now\n", f.target, f.addresslist.Count())
   220  
   221  	if f.addresslist.IsEmpty() {
   222  		f.blockedSince = time.Now()
   223  		f.cc.UpdateState(balancer.State{
   224  			ConnectivityState: connectivity.Ready,
   225  			//ConnectivityState: connectivity.TransientFailure,
   226  			Picker: f.Picker(),
   227  		})
   228  		return
   229  	}
   230  	f.cc.UpdateState(balancer.State{
   231  		ConnectivityState: connectivity.Ready,
   232  		Picker:            f.Picker(),
   233  	})
   234  
   235  }
   236  
   237  // Close closes the balancer. The balancer is not required to call
   238  // ClientConn.RemoveSubConn for its existing SubConns.
   239  func (f *FancyBalancer) Close() {
   240  	f.closed = true
   241  	bal_lock.Lock()
   242  	defer bal_lock.Unlock()
   243  	var res []*FancyBalancer
   244  	// looks a bit dumb. we really should reuse slots from closed ones
   245  	for _, b := range balancers {
   246  		if b.closed {
   247  			continue
   248  		}
   249  		res = append(res, b)
   250  	}
   251  	balancers = res
   252  	fancyPrintf(f, "Close\n")
   253  }
   254  
   255  // create a new picker
   256  func (f *FancyBalancer) Picker() *FancyPicker {
   257  	res := &FancyPicker{addresslist: f.addresslist}
   258  	return res
   259  }
   260  
   261  /********************************* thread checking for hung pickers ************************
   262  we want RPCs to fail (rather than indefinitely hang)
   263  Behaviour should be like this:
   264  * if we have a "transient failure" we report it as such
   265  * if the "transient failure" remains for some time, we start failing RPCs
   266  * (this is to avoid them all queueing up and blocking up our service(s))
   267  */
   268  
   269  func balancer_thread() {
   270  	for {
   271  		bal_lock.Lock()
   272  		x := balancers
   273  		bal_lock.Unlock()
   274  		for _, b := range x {
   275  			b.Check()
   276  		}
   277  		time.Sleep(time.Duration(1) * time.Second)
   278  	}
   279  
   280  }
   281  
   282  // periodically called by go routine, checks if it's blocking for too long
   283  func (f *FancyBalancer) Check() {
   284  	for _, a := range f.addresslist.addresses {
   285  		if a.state == connectivity.Idle {
   286  			fancyPrintf(f, "connect()")
   287  			a.subcon.Connect()
   288  		}
   289  	}
   290  	if !f.addresslist.IsEmpty() {
   291  		return // not blocked
   292  	}
   293  	if f.failing {
   294  		return // already failing
   295  	}
   296  	sc := time.Since(f.blockedSince)
   297  	fancyPrintf(f, "Blocked since: %v (%v)\n", f.blockedSince, sc)
   298  	if sc < (time.Duration(*maxblock) * time.Second) {
   299  		return // not failing for long enough to take action yet
   300  	}
   301  
   302  	f.failing = true
   303  	fp := f.Picker()
   304  	fp.failAll = true
   305  	f.cc.UpdateState(balancer.State{
   306  		ConnectivityState: connectivity.Ready,
   307  		Picker:            fp,
   308  	})
   309  
   310  }
   311  func (f *FancyBalancer) ServiceName() string {
   312  	return f.target
   313  }
   314  func (f *FancyBalancer) ExitIdle() {
   315  	/*
   316  		fmt.Printf("[go-easyops] *********** warning ******** \n")
   317  		fmt.Printf("[go-easyops] client->FancyBalancer.ExitIdle() called but not implemented\n")
   318  		fmt.Printf("[go-easyops] *********** end warning ******** \n")
   319  	*/
   320  }
   321  
   322  func (f *FancyBuilder) ServiceName() string {
   323  	return "fancy_builder.go"
   324  }
   325  

View as plain text