...
1 package client
2
3 import (
4 "context"
5 "fmt"
6 ge "golang.conradwood.net/apis/goeasyops"
7 "golang.conradwood.net/go-easyops/auth"
8
9 "golang.conradwood.net/go-easyops/cmdline"
10 gectx "golang.conradwood.net/go-easyops/ctx"
11 "golang.conradwood.net/go-easyops/rpc"
12 "google.golang.org/grpc/balancer"
13 "google.golang.org/grpc/connectivity"
14
15 )
16
17 type FancyPicker struct {
18 addresslist *FancyAddressList
19 failAll bool
20 ctr uint32
21 }
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 func (f *FancyPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
44 if f.failAll {
45
46
47
48 fancyPrintf(f, "Picker - failing connections for \"%s\" w/o instance\n", info.FullMethodName)
49 sn := "[unknown rpc]"
50 cs := rpc.CallStateFromContext(info.Ctx)
51 if cs != nil {
52 sn = fmt.Sprintf("%s.%s()", cs.ServiceName, cs.MethodName)
53 }
54 return balancer.PickResult{}, fmt.Errorf("failure in %s whilst calling %s - no backend available", sn, info.FullMethodName)
55 }
56 if f.addresslist.IsEmpty() {
57
58 fancyPrintf(f, "Picker - No connections for %s\n", info.FullMethodName)
59 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
60 }
61
62 lf := f.addresslist
63
64 cri := tags_from_context(info.Ctx)
65 if cri == nil {
66 fancyPrintf(f, "picking for tag-less context\n")
67 } else {
68 fancyPrintf(f, "Picking by tags (%v)\n", cri.Tags)
69
70 adrs := lf.ByMatchingTags(cri.Tags)
71 if len(adrs) == 0 {
72 fancyPrintf(f, "Picker - No connection matched all required tags (%v)\n", cri.Tags)
73 if !cri.FallbackToPlain {
74 return balancer.PickResult{}, fmt.Errorf("No addresses matched all supplied tags (%v) for %s", cri.Tags, info.FullMethodName)
75 } else {
76 lf = f.addresslist
77 lf = &FancyAddressList{Name: lf.Name, addresses: lf.ByWithoutTags()}
78 }
79 } else {
80 lf = &FancyAddressList{Name: lf.Name, addresses: adrs}
81 }
82 }
83
84
85 matching := lf.SelectValid(info.Ctx)
86
87 if len(matching) == 0 {
88 bal_state_lock.Lock()
89 for _, a := range lf.addresses {
90
91
92
93 if a.state == connectivity.Idle {
94
95
96
97 }
98 fancyPrintf(f, "picker address: %s\n", a.String())
99 }
100 bal_state_lock.Unlock()
101 fancyPrintf(f, "Picker - No valid connections for %s\n", info.FullMethodName)
102
103 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
104 }
105
106 f.ctr++
107
108 indx := f.ctr % uint32(len(matching))
109 fa := matching[indx]
110 if *debug_fancy {
111 u := auth.GetUser(info.Ctx)
112 fancyPrintf(f, "Picking: %s [%s] for user %s to serve %s from %d connections (%d matching) (ctr=%d))\n",
113 fa.addr, fa.state.String(),
114 auth.Description(u),
115 info.FullMethodName,
116 lf.Count(), len(matching), f.ctr)
117 fancyPrintf(f, " RoutingInfo: %#v\n", fa.Target.RoutingInfo)
118 }
119
120 res := balancer.PickResult{SubConn: fa.subcon}
121 fa.subcon.Connect()
122 return res, nil
123 }
124
125 func (f *FancyPicker) ServiceName() string {
126 if f.addresslist != nil {
127 return f.addresslist.Name
128 }
129 return "fancy_picker.go"
130 }
131
132 func tags_from_context(ctx context.Context) *ge.CTXRoutingTags {
133 if cmdline.ContextWithBuilder() {
134 ls := gectx.GetLocalState(ctx)
135 lr := ls.RoutingTags()
136 return lr
137 }
138 panic("obsolete codepath")
139 }
140
View as plain text