...
1 package router
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7
8 "golang.conradwood.net/apis/registry"
9 "golang.conradwood.net/go-easyops/client"
10 "google.golang.org/grpc"
11 )
12
13 type ConnectionManager struct {
14 use_all bool
15 one_per_ip bool
16 servicename string
17 }
18 type ConnectionTarget struct {
19 lock sync.Mutex
20 ip string
21 port uint32
22 connection *Connection
23 }
24 type Connection struct {
25 lock sync.Mutex
26 address string
27 gcon *grpc.ClientConn
28 }
29
30 func NewConnectionManager(servicename string) *ConnectionManager {
31 return &ConnectionManager{use_all: true, one_per_ip: true, servicename: servicename}
32 }
33 func (cm *ConnectionManager) ServiceName() string {
34 return cm.servicename
35 }
36 func (cm *ConnectionManager) AllowMultipleInstancesPerIP() {
37 cm.one_per_ip = false
38 }
39 func (cm *ConnectionManager) GetCurrentTargets(ctx context.Context) []*ConnectionTarget {
40 var res []*ConnectionTarget
41 if cm.use_all {
42 res = cm.getCurrentRegistrationsAsTargets(ctx)
43 res = append(res, cm.getCurrentTargets(ctx)...)
44 } else {
45 res = cm.getCurrentTargets(ctx)
46 }
47 res = cm.filter(res)
48 return res
49 }
50 func (cm *ConnectionManager) getCurrentRegistrationsAsTargets(ctx context.Context) []*ConnectionTarget {
51 req := ®istry.V2ListRequest{NameMatch: cm.servicename}
52 targetlist, err := client.GetRegistryClient().ListRegistrations(ctx, req)
53 if err != nil {
54 fmt.Printf("Failed to get registrations for \"%s\": %s\n", req.NameMatch, err)
55 return nil
56 }
57 var res []*ConnectionTarget
58 for _, regi := range targetlist.Registrations {
59 if !regi.Targetable {
60 continue
61 }
62 t := regi.Target
63 found := false
64 for _, at := range t.ApiType {
65 if at == registry.Apitype_grpc {
66 found = true
67 break
68 }
69 }
70 if !found {
71
72 continue
73 }
74 ct := &ConnectionTarget{ip: t.IP, port: t.Port}
75 res = append(res, ct)
76 }
77 return res
78 }
79 func (cm *ConnectionManager) getCurrentTargets(ctx context.Context) []*ConnectionTarget {
80 req := ®istry.V2GetTargetRequest{
81 ServiceName: []string{cm.servicename},
82 ApiType: registry.Apitype_grpc,
83 }
84 targetlist, err := client.GetRegistryClient().V2GetTarget(ctx, req)
85 if err != nil {
86 fmt.Printf("Failed to get targets for \"%s\": %s\n", req.ServiceName, err)
87 return nil
88 }
89 var res []*ConnectionTarget
90 for _, t := range targetlist.Targets {
91 ct := &ConnectionTarget{ip: t.IP, port: t.Port}
92 res = append(res, ct)
93 }
94 return res
95 }
96 func (ct *ConnectionTarget) Address() string {
97 return fmt.Sprintf("%s:%d", ct.ip, ct.port)
98 }
99
100 func (ct *ConnectionTarget) Connection() (*Connection, error) {
101 ct.lock.Lock()
102 defer ct.lock.Unlock()
103 if ct.connection != nil {
104 return ct.connection, nil
105 }
106 c := &Connection{address: ct.Address()}
107 ct.connection = c
108 return ct.connection, nil
109 }
110 func (c *Connection) GRPCConnection() (*grpc.ClientConn, error) {
111 c.lock.Lock()
112 defer c.lock.Unlock()
113 if c.gcon != nil {
114 return c.gcon, nil
115 }
116 gcon, err := client.ConnectWithIP(c.address)
117 if err != nil {
118 return nil, err
119 }
120 c.gcon = gcon
121 return c.gcon, nil
122 }
123 func (c *Connection) Close() {
124 c.lock.Lock()
125 defer c.lock.Unlock()
126 if c.gcon != nil {
127 c.gcon.Close()
128 c.gcon = nil
129 }
130 }
131
132 func (cm *ConnectionManager) filter(input []*ConnectionTarget) []*ConnectionTarget {
133 ipmap := make(map[string]*ConnectionTarget)
134 for _, ct := range input {
135 key := ct.ip
136 if !cm.one_per_ip {
137 key = fmt.Sprintf("%s:%d", ct.ip, ct.port)
138 }
139
140 _, fd := ipmap[key]
141 if fd {
142 continue
143 }
144 ipmap[key] = ct
145 }
146 var res []*ConnectionTarget
147 for _, v := range ipmap {
148 res = append(res, v)
149 }
150 return res
151 }
152
153 func (cm *ConnectionManager) debugf(format string, args ...interface{}) {
154 if !*debug {
155 return
156 }
157 prefix := fmt.Sprintf("[go-easyops router/cntmgr %s]", cm.servicename)
158 txt := fmt.Sprintf(format, args...)
159 fmt.Print(prefix + txt)
160 }
161 func (ct *ConnectionTarget) Close() {
162 ct.lock.Lock()
163 defer ct.lock.Unlock()
164 if ct.connection == nil {
165 return
166 }
167 ct.connection.Close()
168 }
169
View as plain text