1 package client
2
3 import (
4 "flag"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "golang.conradwood.net/apis/registry"
11 "golang.conradwood.net/go-easyops/auth"
12 "golang.conradwood.net/go-easyops/utils"
13 "google.golang.org/grpc/balancer"
14 "google.golang.org/grpc/connectivity"
15 "google.golang.org/grpc/resolver"
16 )
17
18 func init() {
19 balancer.Register(&FancyBuilder{})
20 go balancer_thread()
21 }
22
23 var (
24 balancers []*FancyBalancer
25 bal_lock sync.Mutex
26 bal_state_lock sync.Mutex
27 maxblock = flag.Float64("ge_max_block", 30, "max `seconds` to block rpcs for if backends are not available (fail afterwards)")
28 )
29
30
31 type FancyBuilder struct {
32 }
33
34
35 func (f *FancyBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
36 fancyPrintf(f, "Building Balancer for %s\n", opts.Target.Endpoint())
37
38 fal := &FancyAddressList{Name: opts.Target.Endpoint()}
39 cc.UpdateState(balancer.State{
40 ConnectivityState: connectivity.Ready,
41 Picker: &FancyPicker{addresslist: fal},
42 })
43
44 idx := strings.Index(fal.Name, "@")
45 if idx == -1 {
46 panic(fmt.Sprintf("unsupported target: %s", fal.Name))
47 }
48 target := fal.Name[:idx]
49 res := &FancyBalancer{cc: cc,
50 target: target,
51 blockedSince: time.Now(),
52 addresslist: fal,
53 }
54 fancyPrintf(f, "Built balancer for target \"%s\"\n", res.target)
55 if res.target == "" {
56 s := fmt.Sprintf("cannot create fancy-balancer without servicename (opts=%#v). Dial must be in the format 'go-easyops://servicename/servicename@registry'", opts)
57 panic(s)
58 }
59
60 bal_lock.Lock()
61 balancers = append(balancers, res)
62 bal_lock.Unlock()
63 return res
64 }
65
66
67
68 func (f *FancyBuilder) Name() string {
69 return "fancybalancer"
70 }
71
72
73
74 type FancyBalancer struct {
75 cc balancer.ClientConn
76 target string
77 addresslist *FancyAddressList
78 closed bool
79 failing bool
80 blockedSince time.Time
81 }
82
83
84 func (f *FancyBalancer) ResolverError(err error) {
85 if *debug_fancy {
86 utils.NotImpl("resolver error")
87 }
88 fmt.Printf("[go-easyops] Resolver reported an error, which is not handled yet: %s\n", err)
89
90 }
91
92
93
94 func (f *FancyBalancer) UpdateClientConnState(bc balancer.ClientConnState) error {
95 fancyPrintf(f, "balancer: updateclientconnstate (ResolverState: %d addresses)\n", len(bc.ResolverState.Addresses))
96 f.HandleResolvedAddrs(bc.ResolverState.Addresses, nil)
97
103 return nil
104 }
105
106
107 func (f *FancyBalancer) UpdateSubConnState(sc balancer.SubConn, bc balancer.SubConnState) {
108 fancyPrintf(f, "balancer: updatesubconnstate\n")
109 f.HandleSubConnStateChange(sc, bc.ConnectivityState)
110 }
111
112
113
114
115
116
117
118
119 func (f *FancyBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
120 fa := f.addresslist.BySubCon(sc)
121 if fa == nil {
122 fancyPrintf(f, "balancer: SubConnState on a subconnection we don't know (%#v)!\n", sc)
123
124 f.cc.UpdateState(balancer.State{
125 ConnectivityState: connectivity.Ready,
126 Picker: f.Picker(),
127 })
128
129 return
130 }
131
132 bal_state_lock.Lock()
133 oldstate := fa.state
134 fa.state = state
135 bal_state_lock.Unlock()
136
137 fancyPrintf(f, "balancer: Handlesubstate service %s at %s transitioned from %s to %s\n", f.target, fa.addr, oldstate.String(), state.String())
138 f.failing = false
139 f.cc.UpdateState(balancer.State{
140 ConnectivityState: connectivity.Ready,
141 Picker: f.Picker(),
142 })
143 }
144
145
146
147
148
149
150
151
152
153 func (f *FancyBalancer) HandleResolvedAddrs(addresses []resolver.Address, err error) {
154
155
156
157 added := false
158 for _, resolverAddr := range addresses {
159 var sa *registry.Target
160 if resolverAddr.Attributes != nil {
161 o := resolverAddr.Attributes.Value(RESOLVER_ATTRIBUTE_SERVICE_ADDRESS)
162 if o != nil {
163 sa = o.(*registry.Target)
164 }
165 }
166 if sa == nil {
167
168 s := fmt.Sprintf("fancy balancer received a very unfancy address without registry.ServiceAddress attribute for \"%s\". Weird resolver?", f.target)
169 panic(s)
170 }
171 rf := ""
172 ri := sa.RoutingInfo
173 if ri != nil {
174 u := ri.RunningAs
175 if u != nil {
176 rf = fmt.Sprintf("runningas=%s/#%s", auth.Description(u), u.ID)
177 }
178 }
179 fr := f.addresslist.ByAddr(resolverAddr.Addr)
180 if fr != nil {
181 fr.Target = sa
182 f.addresslist.Updated()
183 fancyPrintf(f, "balancer: %s, conn %s known as state %s\n", f.target, resolverAddr.Addr, fr.state.String())
184 continue
185 }
186 fancyPrintf(f, "balancer: New Address %s (%s)\n", resolverAddr.Addr, rf)
187
188 sco, err := f.cc.NewSubConn([]resolver.Address{resolverAddr}, balancer.NewSubConnOptions{})
189 if err != nil {
190 fancyPrintf(f, "Failed to create subconn: %s\n", err)
191 continue
192 }
193
194 f.addresslist.Add(&FancyAddr{
195 state: connectivity.Ready,
196
197
198 addr: resolverAddr.Addr,
199 subcon: sco,
200 Target: sa,
201 })
202 added = true
203 }
204
205 remlist := f.addresslist.RequiredList(addresses)
206 for _, fa := range remlist {
207 f.cc.RemoveSubConn(fa.subcon)
208 }
209 removed := len(remlist) != 0
210 if !added && !removed {
211 fancyPrintf(f, "balancer: no state change for \"%s\"\n", f.target)
212 f.cc.UpdateState(balancer.State{
213 ConnectivityState: connectivity.Ready,
214 Picker: f.Picker(),
215 })
216 return
217 }
218 f.failing = false
219 fancyPrintf(f, "balancer: Sending state update for \"%s\", we got %d subconnections now\n", f.target, f.addresslist.Count())
220
221 if f.addresslist.IsEmpty() {
222 f.blockedSince = time.Now()
223 f.cc.UpdateState(balancer.State{
224 ConnectivityState: connectivity.Ready,
225
226 Picker: f.Picker(),
227 })
228 return
229 }
230 f.cc.UpdateState(balancer.State{
231 ConnectivityState: connectivity.Ready,
232 Picker: f.Picker(),
233 })
234
235 }
236
237
238
239 func (f *FancyBalancer) Close() {
240 f.closed = true
241 bal_lock.Lock()
242 defer bal_lock.Unlock()
243 var res []*FancyBalancer
244
245 for _, b := range balancers {
246 if b.closed {
247 continue
248 }
249 res = append(res, b)
250 }
251 balancers = res
252 fancyPrintf(f, "Close\n")
253 }
254
255
256 func (f *FancyBalancer) Picker() *FancyPicker {
257 res := &FancyPicker{addresslist: f.addresslist}
258 return res
259 }
260
261
268
269 func balancer_thread() {
270 for {
271 bal_lock.Lock()
272 x := balancers
273 bal_lock.Unlock()
274 for _, b := range x {
275 b.Check()
276 }
277 time.Sleep(time.Duration(1) * time.Second)
278 }
279
280 }
281
282
283 func (f *FancyBalancer) Check() {
284 for _, a := range f.addresslist.addresses {
285 if a.state == connectivity.Idle {
286 fancyPrintf(f, "connect()")
287 a.subcon.Connect()
288 }
289 }
290 if !f.addresslist.IsEmpty() {
291 return
292 }
293 if f.failing {
294 return
295 }
296 sc := time.Since(f.blockedSince)
297 fancyPrintf(f, "Blocked since: %v (%v)\n", f.blockedSince, sc)
298 if sc < (time.Duration(*maxblock) * time.Second) {
299 return
300 }
301
302 f.failing = true
303 fp := f.Picker()
304 fp.failAll = true
305 f.cc.UpdateState(balancer.State{
306 ConnectivityState: connectivity.Ready,
307 Picker: fp,
308 })
309
310 }
311 func (f *FancyBalancer) ServiceName() string {
312 return f.target
313 }
314 func (f *FancyBalancer) ExitIdle() {
315
320 }
321
322 func (f *FancyBuilder) ServiceName() string {
323 return "fancy_builder.go"
324 }
325
View as plain text