1 package client
2
3 import (
4 "context"
5 "fmt"
6 "golang.conradwood.net/apis/registry"
7 "golang.conradwood.net/go-easyops/auth"
8 "google.golang.org/grpc"
9 "google.golang.org/grpc/balancer"
10 "google.golang.org/grpc/connectivity"
11 "google.golang.org/grpc/resolver"
12 "strings"
13 "sync"
14 "time"
15 )
16
17
27 type FancyAddressList struct {
28 Name string
29 addresses []*FancyAddr
30 }
31
32 type FancyAddr struct {
33 addr string
34 subcon balancer.SubConn
35 state connectivity.State
36 removed bool
37 Target *registry.Target
38 grpc_con *grpc.ClientConn
39 dial_lock sync.Mutex
40 }
41
42
43 func (fa *FancyAddr) Key() string {
44 return "faddr_" + fa.addr
45 }
46
47
48 func (fa *FancyAddr) Address() string {
49 return fa.addr
50 }
51 func (fa *FancyAddr) String() string {
52 return fmt.Sprintf("%s: %s[%s] removed=%v", fa.Target.ServiceName, fa.addr, fa.state.String(), fa.removed)
53 }
54
55
56 func (fa *FancyAddr) IsReady() bool {
57 return fa.state == connectivity.Ready
58 }
59
60 func (fa *FancyAddr) disconnect() {
61 if fa.grpc_con == nil {
62 return
63 }
64 fa.grpc_con.Close()
65 fa.grpc_con = nil
66 }
67
68
72
73 func (fa *FancyAddr) Connection() (*grpc.ClientConn, error) {
74 if fa.grpc_con != nil {
75 return fa.grpc_con, nil
76 }
77 fa.dial_lock.Lock()
78 defer fa.dial_lock.Unlock()
79 if fa.grpc_con != nil {
80 return fa.grpc_con, nil
81 }
82 dialstring := fmt.Sprintf("%s", fa.addr)
83 fmt.Printf("[go-easyops] dialling \"%s\"...\n", dialstring)
84 ctx, cnc := context.WithTimeout(context.Background(), time.Duration(2)*time.Second)
85 defer cnc()
86 gc, err := grpc.DialContext(ctx, dialstring, grpc.WithBlock(),
87 grpc.WithTransportCredentials(GetClientCreds()),
88 grpc.WithUnaryInterceptor(ClientMetricsUnaryInterceptor),
89 grpc.WithStreamInterceptor(unaryStreamInterceptor),
90 )
91 if err != nil {
92 return nil, err
93 }
94 fa.grpc_con = gc
95 return gc, nil
96 }
97 func (fal *FancyAddressList) Count() int {
98 return len(fal.addresses)
99 }
100 func (fal *FancyAddressList) IsEmpty() bool {
101 return len(fal.addresses) == 0
102 }
103
104
105
106 func (fal *FancyAddressList) Updated() {
107 }
108
109
110 func (fal *FancyAddressList) Add(f *FancyAddr) {
111 fal.addresses = append(fal.addresses, f)
112 fal.Updated()
113 }
114 func (fal *FancyAddressList) remove(f *FancyAddr) {
115 var nr []*FancyAddr
116 for _, fa := range fal.addresses {
117 if fa.Key() == f.Key() {
118 continue
119 }
120 nr = append(nr, fa)
121 }
122 fal.addresses = nr
123 }
124
125
126 func (fal *FancyAddressList) ByKey(key string) *FancyAddr {
127 for _, fa := range fal.addresses {
128 if fa.Key() == key {
129 return fa
130 }
131 }
132 return nil
133 }
134
135
136 func (fal *FancyAddressList) RequiredList(addresses []resolver.Address) []*FancyAddr {
137 var res []*FancyAddr
138 removed := false
139 for _, fa := range fal.addresses {
140 stillgood := false
141 for _, r := range addresses {
142 if r.Addr == fa.addr {
143 stillgood = true
144 break
145 }
146 }
147 if stillgood {
148 continue
149 }
150 fancyPrintf(fal, "balancer: removed %s\n", fa.addr)
151 fa.removed = true
152 removed = true
153 fa.disconnect()
154 }
155 if removed {
156 var fa []*FancyAddr
157 for _, foa := range fal.addresses {
158 if foa.removed {
159 res = append(res, foa)
160 continue
161 }
162 fa = append(fa, foa)
163 }
164 fal.addresses = fa
165 fal.Updated()
166 }
167 return res
168 }
169
170
171
172 func (fal *FancyAddressList) ByAddr(adr string) *FancyAddr {
173 for _, fa := range fal.addresses {
174 if fa.addr == adr {
175 return fa
176 }
177 }
178 return nil
179 }
180
181 func (fal *FancyAddressList) BySubCon(sc balancer.SubConn) *FancyAddr {
182 var fa *FancyAddr
183 for _, a := range fal.addresses {
184 if a.subcon == sc {
185 fa = a
186 break
187 }
188 }
189 return fa
190 }
191
192
193 func (fal *FancyAddressList) AllAddresses() []*FancyAddr {
194 var valids []*FancyAddr
195 for _, a := range fal.addresses {
196 valids = append(valids, a)
197 }
198 return valids
199 }
200
201
202 func (fal *FancyAddressList) AllReadyAddresses() []*FancyAddr {
203 var valids []*FancyAddr
204 for _, a := range fal.addresses {
205 if !a.IsReady() {
206 continue
207 }
208 valids = append(valids, a)
209 }
210 return valids
211 }
212
213
214 func (fal *FancyAddressList) ByWithoutTags() []*FancyAddr {
215 var valids []*FancyAddr
216
217 for _, a := range fal.addresses {
218 if a.Target == nil {
219 continue
220 }
221 if a.Target.RoutingInfo == nil || a.Target.RoutingInfo.Tags == nil || len(a.Target.RoutingInfo.Tags) == 0 {
222 valids = append(valids, a)
223 }
224 }
225 return valids
226 }
227
228
234
235 func (fal *FancyAddressList) ByMatchingTags(tags map[string]string) []*FancyAddr {
236 fancyPrintf(fal, "Filtering (%d) addresses by tags\n", len(fal.addresses))
237 if len(tags) == 0 {
238
239
240 fancyPrintf(fal, "empty list for filterbytags!")
241 return fal.addresses
242 }
243 var valids []*FancyAddr
244
245 for _, a := range fal.addresses {
246 valid := true
247 if a.Target == nil || a.Target.RoutingInfo == nil || a.Target.RoutingInfo.Tags == nil {
248 fancyPrintf(fal, "tag in %s does have special routing\n", a.addr)
249 continue
250 }
251 for k, v := range tags {
252 tgv := a.Target.RoutingInfo.Tags[k]
253 if tgv != v {
254 fancyPrintf(fal, "tag in %s does not match. \"%s\" != \"%s\"\n", a.addr, tgv, v)
255 valid = false
256 break
257 }
258 }
259 if valid {
260 valids = append(valids, a)
261 }
262 }
263 return valids
264 }
265
266
267 func (fal *FancyAddressList) ByNoUserRoutingInfo() []*FancyAddr {
268 var res []*FancyAddr
269 for _, a := range fal.addresses {
270 ri := a.Target.RoutingInfo
271 if ri != nil && ri.RunningAs != nil {
272 continue
273 }
274 res = append(res, a)
275 }
276 return res
277 }
278
279
280 func (fal *FancyAddressList) ByUser(userid string) []*FancyAddr {
281 var res []*FancyAddr
282 for _, a := range fal.addresses {
283 ri := a.Target.RoutingInfo
284 if ri == nil || ri.RunningAs == nil {
285 continue
286 }
287 if ri.RunningAs.ID != userid {
288 continue
289 }
290 res = append(res, a)
291 }
292 return res
293 }
294
295 func (fal *FancyAddressList) readyOnly(in []*FancyAddr) []*FancyAddr {
296 var valids []*FancyAddr
297 bal_state_lock.Lock()
298 for _, fa := range in {
299 if fa.state != connectivity.Ready {
300 continue
301 }
302 valids = append(valids, fa)
303 }
304 bal_state_lock.Unlock()
305 return valids
306
307 }
308
309
323 func (fal *FancyAddressList) SelectValid(ctx context.Context) []*FancyAddr {
324 nro := fal.ByNoUserRoutingInfo()
325 if len(nro) == len(fal.addresses) {
326
327 fancyPrintf(fal, "all addresses for %s have routinginfo\n", fal.Name)
328 return fal.readyOnly(nro)
329 }
330 u := auth.GetUser(ctx)
331 if u == nil {
332
333 fancyPrintf(fal, "user-less rpc\n")
334 if len(nro) == 0 && len(fal.addresses) > 0 {
335 fmt.Printf("[go-easyops] Warning - of %d targets, all require a user in outbound context (but none provided)\n", len(fal.addresses))
336 }
337 return fal.readyOnly(nro)
338 }
339 bu := fal.ByUser(u.ID)
340 if len(bu) == 0 {
341
342 fancyPrintf(fal, "no connections specifically for user %s\n", u.Email)
343 return fal.readyOnly(nro)
344 }
345 fancyPrintf(fal, "%d connections specifically for user %s\n", len(bu), u.Email)
346 return fal.readyOnly(bu)
347 }
348
349
350 func (fal *FancyAddressList) ServiceName() string {
351 s := fal.Name
352 idx := strings.Index(s, "@")
353 if idx != -1 {
354 s = s[:idx]
355 }
356 return s
357 }
358
359 func GetAllFancyAddressLists() []*FancyAddressList {
360 var res []*FancyAddressList
361 for _, bal := range balancers {
362 if bal.addresslist != nil {
363 res = append(res, bal.addresslist)
364 }
365 }
366 return res
367 }
368
View as plain text