...
1package fanoutrouter
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "golang.conradwood.net/apis/registry"
9 "golang.conradwood.net/go-easyops/client"
10 "google.golang.org/grpc"
11)
12
13type ConnectionManager struct {
14 use_all bool // don't follow registry recommendations, just all registered ones
15 one_per_ip bool // but at most one per ip
16 servicename string
17}
18type ConnectionTarget struct {
19 lock sync.Mutex
20 ip string
21 port uint32
22 connection *Connection
23}
24type Connection struct {
25 lock sync.Mutex
26 address string
27 gcon *grpc.ClientConn
28}
29
30func NewConnectionManager(servicename string) *ConnectionManager {
31 return &ConnectionManager{use_all: true, one_per_ip: true, servicename: servicename}
32}
33func (cm *ConnectionManager) AllowMultipleInstancesPerIP() {
34 cm.one_per_ip = false
35}
36func (cm *ConnectionManager) GetCurrentTargets(ctx context.Context) []*ConnectionTarget {
37 var res []*ConnectionTarget
38 if cm.use_all {
39 res = cm.getCurrentRegistrationsAsTargets(ctx)
40 } else {
41 res = cm.getCurrentTargets(ctx)
42 }
43 res = cm.filter(res)
44 return res
45}
46func (cm *ConnectionManager) getCurrentRegistrationsAsTargets(ctx context.Context) []*ConnectionTarget {
47 req := ®istry.V2ListRequest{NameMatch: cm.servicename}
48 targetlist, err := client.GetRegistryClient().ListRegistrations(ctx, req)
49 if err != nil {
50 fmt.Printf("Failed to get registrations for \"%s\": %s\n", req.NameMatch, err)
51 return nil
52 }
53 var res []*ConnectionTarget
54 for _, regi := range targetlist.Registrations {
55 if !regi.Targetable {
56 continue
57 }
58 t := regi.Target
59 found := false
60 for _, at := range t.ApiType {
61 if at == registry.Apitype_grpc {
62 found = true
63 break
64 }
65 }
66 if !found {
67 // not a grpc targettype
68 continue
69 }
70 ct := &ConnectionTarget{ip: t.IP, port: t.Port}
71 res = append(res, ct)
72 }
73 return res
74}
75func (cm *ConnectionManager) getCurrentTargets(ctx context.Context) []*ConnectionTarget {
76 req := ®istry.V2GetTargetRequest{
77 ServiceName: []string{cm.servicename},
78 ApiType: registry.Apitype_grpc,
79 }
80 targetlist, err := client.GetRegistryClient().V2GetTarget(ctx, req)
81 if err != nil {
82 fmt.Printf("Failed to get targets for \"%s\": %s\n", req.ServiceName, err)
83 return nil
84 }
85 var res []*ConnectionTarget
86 for _, t := range targetlist.Targets {
87 ct := &ConnectionTarget{ip: t.IP, port: t.Port}
88 res = append(res, ct)
89 }
90 return res
91}
92func (ct *ConnectionTarget) Address() string {
93 return fmt.Sprintf("%s:%d", ct.ip, ct.port)
94}
95
96func (ct *ConnectionTarget) Connection() (*Connection, error) {
97 ct.lock.Lock()
98 defer ct.lock.Unlock()
99 if ct.connection != nil {
100 return ct.connection, nil
101 }
102 c := &Connection{address: ct.Address()}
103 ct.connection = c
104 return ct.connection, nil
105}
106func (c *Connection) GRPCConnection() (*grpc.ClientConn, error) {
107 c.lock.Lock()
108 defer c.lock.Unlock()
109 gcon, err := client.ConnectWithIP(c.address)
110 if err != nil {
111 return nil, err
112 }
113 c.gcon = gcon
114 return c.gcon, nil
115}
116func (c *Connection) Close() {
117 c.lock.Lock()
118 defer c.lock.Unlock()
119 if c.gcon != nil {
120 c.gcon.Close()
121 c.gcon = nil
122 }
123}
124
125func (cm *ConnectionManager) filter(input []*ConnectionTarget) []*ConnectionTarget {
126 if !cm.one_per_ip {
127 return input
128 }
129 ipmap := make(map[string]*ConnectionTarget)
130 for _, ct := range input {
131 _, fd := ipmap[ct.ip]
132 if fd {
133 continue
134 }
135 ipmap[ct.ip] = ct
136 }
137 var res []*ConnectionTarget
138 for _, v := range ipmap {
139 res = append(res, v)
140 }
141 return res
142}
View as plain text