...
1 package client
2
3 import (
4 "fmt"
5 "golang.conradwood.net/apis/registry"
6 "golang.conradwood.net/go-easyops/cmdline"
7 "golang.conradwood.net/go-easyops/utils"
8 "google.golang.org/grpc"
9 "sync"
10 "time"
11 )
12
13 type TargetList struct {
14 reg string
15 svcname string
16 runOnce bool
17 tstarted bool
18 targets map[string]*TargetWithConnection
19 rlock sync.Mutex
20 }
21 type TargetWithConnection struct {
22 target *registry.Target
23 con *grpc.ClientConn
24 }
25
26 func NewTargetList(servicename string) *TargetList {
27 t := &TargetList{svcname: servicename, reg: cmdline.GetClientRegistryAddress()}
28 t.targets = make(map[string]*TargetWithConnection)
29 return t
30 }
31 func (t *TargetWithConnection) Connection() *grpc.ClientConn {
32 return t.con
33 }
34 func (t *TargetWithConnection) String() string {
35 return fmt.Sprintf("%s@%s:%d", t.target.ServiceName, t.target.IP, t.target.Port)
36 }
37
38 func (t *TargetList) Targets() []*TargetWithConnection {
39 if !t.runOnce {
40 err := t.refresh()
41 if err != nil {
42 fmt.Printf("[go-easyops] failed to refresh %s: %s\n", t.svcname, utils.ErrorString(err))
43 return nil
44 }
45 }
46 var res []*TargetWithConnection
47 for _, v := range t.targets {
48 if v.con == nil {
49 continue
50 }
51 res = append(res, v)
52 }
53 return res
54 }
55
56
57 func (t *TargetList) Connections() []*grpc.ClientConn {
58 if !t.runOnce {
59 err := t.refresh()
60 if err != nil {
61 fmt.Printf("[go-easyops] failed to refresh %s: %s\n", t.svcname, utils.ErrorString(err))
62 return nil
63 }
64 }
65 var res []*grpc.ClientConn
66 for _, v := range t.targets {
67 if v.con != nil {
68 res = append(res, v.con)
69 }
70 }
71 return res
72 }
73 func (t *TargetList) ByAddress(address string) []*TargetWithConnection {
74 var res []*TargetWithConnection
75 for _, v := range t.targets {
76 if v.target.IP != address {
77 continue
78 }
79 res = append(res, v)
80 }
81 return res
82 }
83 func (t *TargetList) refresh_loop() {
84 for {
85 time.Sleep(30 * time.Second)
86 err := t.refresh()
87 if err != nil {
88 fmt.Printf("refresh failed: %s\n", utils.ErrorString(err))
89 }
90 }
91 }
92 func (t *TargetList) refresh() error {
93 t.rlock.Lock()
94 defer t.rlock.Unlock()
95 if !t.tstarted {
96 go t.refresh_loop()
97 t.tstarted = true
98 }
99 rc := GetRegistryClient()
100 treq := ®istry.V2GetTargetRequest{ApiType: registry.Apitype_grpc, ServiceName: []string{t.svcname}}
101 ctx := getContext()
102 tr, err := rc.V2GetTarget(ctx, treq)
103 if err != nil {
104 return err
105 }
106 for _, target := range tr.Targets {
107 t.add(target)
108 }
109 for _, tc := range t.targets {
110 target := tc.target
111 url := fmt.Sprintf("%s:%d", target.IP, target.Port)
112 found := false
113 for _, yestarget := range tr.Targets {
114 yesurl := fmt.Sprintf("%s:%d", yestarget.IP, yestarget.Port)
115 if yesurl == url {
116 found = true
117 break
118 }
119 }
120 if !found {
121 t.remove(target)
122 }
123 }
124 t.runOnce = true
125 return nil
126 }
127
128
129 func (t *TargetList) remove(target *registry.Target) {
130 url := fmt.Sprintf("%s:%d", target.IP, target.Port)
131 tc, k := t.targets[url]
132 if !k {
133 return
134 }
135 if tc.con != nil {
136 tc.con.Close()
137 tc.con = nil
138 }
139 delete(t.targets, url)
140 }
141
142
143 func (t *TargetList) add(target *registry.Target) {
144 url := fmt.Sprintf("%s:%d", target.IP, target.Port)
145 _, k := t.targets[url]
146 if k {
147 return
148 }
149 tc := &TargetWithConnection{target: target}
150 t.targets[url] = tc
151 c, err := ConnectWithIP(url)
152 if err != nil {
153 fmt.Printf("[go-easyops] could not connect to %s: %s\n", url, utils.ErrorString(err))
154 return
155 }
156 tc.con = c
157 }
158
View as plain text