...

Source file src/golang.conradwood.net/go-easyops/client/fancy_picker.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	ge "golang.conradwood.net/apis/goeasyops"
     7  	"golang.conradwood.net/go-easyops/auth"
     8  	//	"golang.conradwood.net/go-easyops/common"
     9  	"golang.conradwood.net/go-easyops/cmdline"
    10  	gectx "golang.conradwood.net/go-easyops/ctx"
    11  	"golang.conradwood.net/go-easyops/rpc"
    12  	"google.golang.org/grpc/balancer"
    13  	"google.golang.org/grpc/connectivity"
    14  	//	"google.golang.org/grpc/metadata"
    15  )
    16  
    17  type FancyPicker struct {
    18  	addresslist *FancyAddressList
    19  	failAll     bool // if true all RPCs will fail
    20  	ctr         uint32
    21  }
    22  
    23  // Pick returns the connection to use for this RPC and related information.
    24  //
    25  // Pick should not block.  If the balancer needs to do I/O or any blocking
    26  // or time-consuming work to service this call, it should return
    27  // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
    28  // the Picker is updated (using ClientConn.UpdateState).
    29  //
    30  // If an error is returned:
    31  //
    32  //   - If the error is ErrNoSubConnAvailable, gRPC will block until a new
    33  //     Picker is provided by the balancer (using ClientConn.UpdateState).
    34  //
    35  //   - If the error implements IsTransientFailure() bool, returning true,
    36  //     wait for ready RPCs will wait, but non-wait for ready RPCs will be
    37  //     terminated with this error's Error() string and status code
    38  //     Unavailable.
    39  //
    40  //   - Any other errors terminate all RPCs with the code and message
    41  //     provided.  If the error is not a status error, it will be converted by
    42  //     gRPC to a status error with code Unknown.
    43  func (f *FancyPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    44  	if f.failAll {
    45  		// the balancer created a special "failing" picker because it did not have any
    46  		// instances for this service for a long time (so it is not transient anymore, is it?)
    47  		// in this case we don't want to build up a queue of RPCs, we just want to fail-fast them
    48  		fancyPrintf(f, "Picker - failing connections for \"%s\" w/o instance\n", info.FullMethodName)
    49  		sn := "[unknown rpc]"
    50  		cs := rpc.CallStateFromContext(info.Ctx)
    51  		if cs != nil {
    52  			sn = fmt.Sprintf("%s.%s()", cs.ServiceName, cs.MethodName)
    53  		}
    54  		return balancer.PickResult{}, fmt.Errorf("failure in %s whilst calling %s - no backend available", sn, info.FullMethodName)
    55  	}
    56  	if f.addresslist.IsEmpty() {
    57  		// no instances, transient problem though. we tell gRPC to retry the call once we got a new picker
    58  		fancyPrintf(f, "Picker - No connections for %s\n", info.FullMethodName)
    59  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
    60  	}
    61  
    62  	lf := f.addresslist
    63  
    64  	cri := tags_from_context(info.Ctx)
    65  	if cri == nil {
    66  		fancyPrintf(f, "picking for tag-less context\n")
    67  	} else {
    68  		fancyPrintf(f, "Picking by tags (%v)\n", cri.Tags)
    69  		// convert tags to map[string]string, returning empty if invalid type assertion
    70  		adrs := lf.ByMatchingTags(cri.Tags)
    71  		if len(adrs) == 0 {
    72  			fancyPrintf(f, "Picker - No connection matched all required tags (%v)\n", cri.Tags)
    73  			if !cri.FallbackToPlain {
    74  				return balancer.PickResult{}, fmt.Errorf("No addresses matched all supplied tags (%v) for %s", cri.Tags, info.FullMethodName)
    75  			} else {
    76  				lf = f.addresslist
    77  				lf = &FancyAddressList{Name: lf.Name, addresses: lf.ByWithoutTags()}
    78  			}
    79  		} else {
    80  			lf = &FancyAddressList{Name: lf.Name, addresses: adrs}
    81  		}
    82  	}
    83  
    84  	// build up a list of valid (e.g. state Ready, match user/context/routing) connections
    85  	matching := lf.SelectValid(info.Ctx)
    86  
    87  	if len(matching) == 0 {
    88  		bal_state_lock.Lock()
    89  		for _, a := range lf.addresses {
    90  			// this is not right here. We probably should periodically keep them alive rather than wait until
    91  			// we have no more READY ones
    92  			// but this is a 'hotfix' to stop breakage
    93  			if a.state == connectivity.Idle {
    94  				// this doesn't do the trick. it just makes it worse actually,
    95  				// it covers for quick reconnects on the same port only, but breaks after long periods too
    96  				//a.subcon.Connect()
    97  			}
    98  			fancyPrintf(f, "picker address: %s\n", a.String())
    99  		}
   100  		bal_state_lock.Unlock()
   101  		fancyPrintf(f, "Picker - No valid connections for %s\n", info.FullMethodName)
   102  		//fmt.Printf("[go-easyops] picker: No valid connections for %s\n", info.FullMethodName)
   103  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   104  	}
   105  
   106  	f.ctr++ // overflows at 0xFFFFFFFF, that's ok
   107  
   108  	indx := f.ctr % uint32(len(matching))
   109  	fa := matching[indx]
   110  	if *debug_fancy {
   111  		u := auth.GetUser(info.Ctx)
   112  		fancyPrintf(f, "Picking: %s [%s] for user %s to serve %s from %d connections (%d matching) (ctr=%d))\n",
   113  			fa.addr, fa.state.String(),
   114  			auth.Description(u),
   115  			info.FullMethodName,
   116  			lf.Count(), len(matching), f.ctr)
   117  		fancyPrintf(f, "         RoutingInfo: %#v\n", fa.Target.RoutingInfo)
   118  	}
   119  
   120  	res := balancer.PickResult{SubConn: fa.subcon}
   121  	fa.subcon.Connect()
   122  	return res, nil
   123  }
   124  
   125  func (f *FancyPicker) ServiceName() string {
   126  	if f.addresslist != nil {
   127  		return f.addresslist.Name
   128  	}
   129  	return "fancy_picker.go"
   130  }
   131  
   132  func tags_from_context(ctx context.Context) *ge.CTXRoutingTags {
   133  	if cmdline.ContextWithBuilder() {
   134  		ls := gectx.GetLocalState(ctx)
   135  		lr := ls.RoutingTags()
   136  		return lr
   137  	}
   138  	panic("obsolete codepath")
   139  }
   140  

View as plain text