1 package client
2
3
4
5 import (
6 "context"
7
8 "flag"
9 "fmt"
10 pb "golang.conradwood.net/apis/registry"
11 "golang.conradwood.net/go-easyops/common"
12 "golang.conradwood.net/go-easyops/prometheus"
13 "golang.conradwood.net/go-easyops/utils"
14 "google.golang.org/grpc"
15 "google.golang.org/grpc/attributes"
16 "google.golang.org/grpc/resolver"
17 "net"
18 "strconv"
19 "strings"
20 "sync"
21 "time"
22 )
23
24 const (
25 RESOLVER_ATTRIBUTE_SERVICE_ADDRESS = "service_address"
26 )
27
28 var (
29 query_for_proxies = flag.Bool("ge_support_proxies", true, "if true, supports routing via and to registrymultiplexer proxies")
30 reglock sync.Mutex
31 proxyTargetLock sync.Mutex
32 proxiedTargets = make(map[string]*ProxyTarget)
33 registryclients = make(map[string]pb.RegistryClient)
34 resolv_chan = make(chan int, 50)
35 resolvers []*FancyResolver
36 totalQueryCtr = prometheus.NewCounterVec(
37 prometheus.CounterOpts{
38 Name: "grpc_loadbalancer_registry_queries",
39 Help: "counter incremented each time the loadbalancer queries the registry",
40 },
41 []string{"servicename"},
42 )
43 )
44
45 type ProxyTarget struct {
46 Target *pb.Target
47 created time.Time
48 lastused time.Time
49 goingaway bool
50 tcpConn net.Conn
51 tlsConn net.Conn
52 }
53
54 func (p *ProxyTarget) key() string {
55 return fmt.Sprintf("%s_%s_%d_%s_%s",
56 p.Target.ServiceName,
57 p.Target.IP,
58 p.Target.Port,
59 p.Target.RoutingInfo.GatewayID,
60 p.Target.Partition,
61 )
62 }
63
64 func init() {
65 go resolver_thread()
66 resolver.Register(&FancyResolverBuilder{})
67 prometheus.MustRegister(totalQueryCtr)
68 }
69
70 type FancyResolverBuilder struct {
71 }
72
73
74 func (f *FancyResolverBuilder) Scheme() string {
75 return "go-easyops"
76 }
77 func (f *FancyResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
78
79 authority := target.URL.Host
80
81
82
83 if authority == "" {
84 fmt.Printf("[go-easyops] invalid build options\n")
85 fmt.Printf("[go-easyops] target: %#v\n", target)
86 fmt.Printf("[go-easyops] con: %#v\n", cc)
87 panic("no target authority")
88 }
89 var registry string
90 if !strings.Contains(target.Endpoint(), "@") {
91 panic(fmt.Sprintf("Invalid url - no registry in resolver. is \"%s\", missing @host:ip", target.Endpoint()))
92 }
93 rs := strings.Split(target.Endpoint(), "@")
94 registry = rs[len(rs)-1]
95 res := &FancyResolver{cc: cc, target: authority, registry: registry}
96 fancyPrintf(res, "fancy_resolver(): Request to build resolver for %#v\n", target)
97
98 common.AddServiceName(res.target)
99 resolvers = append(resolvers, res)
100 resolv_chan <- 1
101 return res, nil
102 }
103
104 type FancyResolver struct {
105 registry string
106 target string
107 cc resolver.ClientConn
108 noInstanceWarningPrinted bool
109 instances int
110 lastScanned time.Time
111 }
112
113 func (f *FancyResolver) ResolveNow(opts resolver.ResolveNowOptions) {
114 fancyPrintf(f, "ResolveNow() on target %s with opts: %#v\n", f.target, opts)
115 resolv_chan <- 1
116
117 }
118
119 func (f *FancyResolver) Close() {
120 return
121 }
122
123
124 func (f *FancyResolver) ActionResolve() {
125
126 fancyPrintf(f, "fancy_resolver(): Resolving %s\n", f.target)
127 regs, err := f.queryForInstances()
128 if err != nil {
129 fancyPrintf(f, "Error resolving: %s\n", err)
130 f.cc.ReportError(err)
131 return
132 }
133 f.instances = len(regs)
134 f.updateCounters(len(regs))
135 f.blockedWarning(len(regs))
136 var ra []resolver.Address
137 for _, a := range regs {
138 rad := resolver.Address{
139 ServerName: "go-easyops-server-name",
140 Addr: fmt.Sprintf("%s%s:%d", DIRECT_PREFIX, a.IP, a.Port),
141 Attributes: attributes.New(RESOLVER_ATTRIBUTE_SERVICE_ADDRESS, a),
142 }
143 if a.RoutingInfo != nil && a.RoutingInfo.GatewayID != "" && *query_for_proxies {
144 pt := &ProxyTarget{Target: a, created: time.Now()}
145 proxiedTargets[pt.key()] = pt
146 rad.Addr = fmt.Sprintf("%s%s", PROXY_PREFIX, pt.key())
147 }
148
149 ra = append(ra, rad)
150
151 }
152 f.cc.UpdateState(resolver.State{Addresses: ra})
153 }
154
155
156 func (f *FancyResolver) updateCounters(adrcount int) {
157 if adrcount == 0 {
158 blockCtr.With(prometheus.Labels{"servicename": f.target}).Inc()
159 }
160
161
162
163 }
164
165
166 func (f *FancyResolver) blockedWarning(adrcount int) {
167 if adrcount == 0 && !f.noInstanceWarningPrinted {
168 fmt.Printf("WARNING - no instances for \"%s\"\n", f.target)
169 f.noInstanceWarningPrinted = true
170 }
171 if adrcount != 0 && f.noInstanceWarningPrinted {
172 fmt.Printf("WARNING CANCEL - %d instances for \"%s\"\n", adrcount, f.target)
173 f.noInstanceWarningPrinted = false
174 }
175 }
176
177
178 func resolver_thread() {
179 interval := defaultInterval()
180 for {
181 select {
182 case _ = <-resolv_chan:
183
184 case <-time.After(interval):
185
186 }
187
188 if len(resolvers) == 0 {
189 continue
190 }
191
192 interval = defaultInterval()
193
194 for _, r := range resolvers {
195 if r.instances != 0 && (time.Since(r.lastScanned) < defaultInterval()) {
196
197 continue
198 }
199 r.ActionResolve()
200 if r.instances == 0 {
201
202 interval = time.Duration(1) * time.Second
203 }
204 }
205 }
206 }
207
208 func defaultInterval() time.Duration {
209 return *normal_sleep_time
210 }
211
212
213 func (f *FancyResolver) queryForInstances() ([]*pb.Target, error) {
214 return queryServiceInstances(f.registry, f.target)
215 }
216
217
227 func getRegistryClient(registryAddress string) (pb.RegistryClient, error) {
228 r := registryclients[registryAddress]
229 if r != nil {
230 return r, nil
231 }
232 reglock.Lock()
233 defer reglock.Unlock()
234
235
236
237 var err error
238
239 conn := withTLS(registryAddress)
240 if conn == nil {
241 fmt.Printf("[go-easyops] Failed to connect to registry (%s) via TLS, falling back to non-tls\n", registryAddress)
242 conn, err = grpc.Dial(
243 registryAddress,
244
245 grpc.WithInsecure(),
246
247
248 grpc.WithTimeout(time.Duration(CONST_CALL_TIMEOUT)*time.Second),
249 )
250 }
251 if err != nil {
252 fmt.Printf("Failed to connect to registry at %s: %s\n", registryAddress, utils.ErrorString(err))
253 return nil, err
254 }
255 registryClient := pb.NewRegistryClient(conn)
256 registryclients[registryAddress] = registryClient
257 return registryClient, nil
258 }
259
260
261
262
263 func withTLS(address string) *grpc.ClientConn {
264 xs := strings.Split(address, ":")
265 if len(xs) < 2 {
266 return nil
267 }
268 xx, err := strconv.Atoi(xs[1])
269 if err != nil {
270 fmt.Printf("weird registry, not a number \"%s\": %s\n", address, err)
271 return nil
272 }
273 np := fmt.Sprintf("%s:%d", xs[0], xx+1)
274 conn, err := grpc.Dial(
275 np,
276 grpc.WithTransportCredentials(GetClientCreds()),
277
278
279 grpc.WithTimeout(time.Duration(CONST_CALL_TIMEOUT)*time.Second),
280 )
281 if err != nil {
282 fmt.Printf("unable to dial registry with TLS: %s", err)
283 return nil
284 }
285 return conn
286 }
287
288 func GetProxyTarget(ctx context.Context, serviceid string) (*ProxyTarget, error) {
289 proxyTargetLock.Lock()
290 defer proxyTargetLock.Unlock()
291 pt := proxiedTargets[serviceid]
292 if pt == nil {
293 return nil, fmt.Errorf("Proxy ID %s is not known here", serviceid)
294 }
295 if pt.tlsConn == nil {
296 tcs := fmt.Sprintf("%s:%d", pt.Target.IP, pt.Target.Port)
297 scs := fmt.Sprintf("\"%s\"@%s", pt.Target.ServiceName, tcs)
298 fmt.Printf("Dialing proxy-connection %s\n", scs)
299 conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", tcs)
300 if err != nil {
301 fmt.Printf("Failed to connect %s: %s\n", scs, err)
302 return nil, err
303 }
304 err = pt.Start(conn)
305 if err != nil {
306 conn.Close()
307 fmt.Printf("Failed to start connection %s: %s\n", scs, err)
308 return nil, err
309 }
310
311 return pt, nil
312 }
313 pt.lastused = time.Now()
314 return pt, nil
315 }
316
317
318 func (p *ProxyTarget) Start(c net.Conn) error {
319
320
321 s, err := utils.Marshal(p.Target.RoutingInfo)
322 if err != nil {
323 return err
324 }
325 buf := []byte("C" + s + "\n")
326 _, err = c.Write(buf)
327 if err != nil {
328 return err
329 }
330
331 p.tcpConn = c
332 fmt.Printf("Started tcp connection for %s\n", p.Target.ServiceName)
333 return err
334 }
335 func (f *FancyResolverBuilder) ServiceName() string {
336 return "fancyresolverbuilder"
337 }
338 func (f *FancyResolver) ServiceName() string {
339 return f.target
340 }
341
View as plain text