...
1
6 package router
7
8 import (
9 "fmt"
10 "sync"
11 "time"
12
13 "flag"
14
15 "golang.conradwood.net/go-easyops/authremote"
16 "golang.conradwood.net/go-easyops/common"
17 "google.golang.org/grpc"
18 )
19
20 const (
21 state_Starting = 1
22 state_Started = 2
23 state_Stopped = 3
24 )
25
26 var (
27 debug = flag.Bool("ge_debug_router", false, "debug the fanoutrouter")
28 )
29
30 func init() {
31 common.RegisterInfoProvider("fanoutrouter", infoprovider)
32 }
33
34 type FanoutRouter struct {
35 cm *ConnectionManager
36 requests chan *fanout_router_process_request
37 cn func(*CompletionNotification)
38 proc func(*ProcessRequest) error
39 processor_wg *sync.WaitGroup
40 stopping bool
41 cur_processors []*fanout_router_processor
42 stop_lock sync.Mutex
43 }
44 type CompletionNotification struct {
45 pr *ProcessRequest
46 err error
47 }
48 type ProcessRequest struct {
49 req *fanout_router_process_request
50 proc *fanout_router_processor
51 }
52
53
54 type fanout_router_processor struct {
55 state int
56 fr *FanoutRouter
57 target *ConnectionTarget
58 control_channel chan *fanout_router_control_request
59 processed int
60 }
61
62 type fanout_router_process_request struct {
63 o interface{}
64 quit bool
65 }
66 type fanout_router_control_request struct {
67 quit bool
68 }
69
70 func NewFanoutRouter(cm *ConnectionManager, processor func(*ProcessRequest) error, consumer func(*CompletionNotification)) *FanoutRouter {
71 res := &FanoutRouter{
72 cm: cm,
73 requests: make(chan *fanout_router_process_request, 1),
74 proc: processor,
75 cn: consumer,
76 processor_wg: &sync.WaitGroup{},
77 }
78 go res.poll_target_list()
79 return res
80 }
81 func (fr *FanoutRouter) SubmitWork(object interface{}) {
82 if fr.stopping {
83 fr.debugf("WARNING - submitted work to fanoutrouter, which is in the process of stopping\n")
84 }
85 if len(fr.cur_processors) == 0 {
86 fr.debugf("WARNING - submitted work to fanoutrouter, which has no backends atm\n")
87 }
88 pr := &fanout_router_process_request{o: object}
89 fr.requests <- pr
90 }
91
92
93 func (fr *FanoutRouter) Stop() {
94 fr.debugf("Stopping...\n")
95 fr.stopping = true
96 fr.stop_lock.Lock()
97 defer fr.stop_lock.Unlock()
98 for i := 0; i < len(fr.cur_processors); i++ {
99 fr.requests <- &fanout_router_process_request{quit: true}
100 }
101 fr.processor_wg.Wait()
102 fr.debugf("Stopped\n")
103 }
104
105 func (fr *FanoutRouter) poll_target_list() {
106 fr.stop_lock.Lock()
107 fr.debugf("starting polling...\n")
108 ctx := authremote.Context()
109 ct := fr.cm.GetCurrentTargets(ctx)
110 fr.debugf("first polling got %d targets\n", len(ct))
111 fr.compare_current_targets(ct)
112 fr.stop_lock.Unlock()
113
114 for {
115 if fr.stopping {
116 break
117 }
118 time.Sleep(time.Duration(15) * time.Second)
119 fr.stop_lock.Lock()
120 if fr.stopping {
121 fr.stop_lock.Unlock()
122 break
123 }
124 ctx := authremote.Context()
125 fr.debugf("polling...\n")
126 ct := fr.cm.GetCurrentTargets(ctx)
127 fr.debugf("got %d targets\n", len(ct))
128 fr.compare_current_targets(ct)
129 fr.stop_lock.Unlock()
130 }
131 fr.debugf("polling stopped\n")
132 }
133 func (fr *FanoutRouter) compare_current_targets(ct []*ConnectionTarget) {
134 targets := make(map[string]*ConnectionTarget)
135 for _, c := range ct {
136 targets[c.Address()] = c
137 }
138
139 for _, proc := range fr.cur_processors {
140 proc_adr := proc.address()
141 delete(targets, proc_adr)
142 }
143
144 for _, v := range targets {
145 fp := &fanout_router_processor{fr: fr, target: v, control_channel: make(chan *fanout_router_control_request, 10)}
146 fr.start_processor(fp)
147 }
148
149
150 targets = make(map[string]*ConnectionTarget)
151 for _, c := range ct {
152 targets[c.Address()] = c
153 }
154 for _, proc := range fr.cur_processors {
155 _, valid := targets[proc.address()]
156 if !valid {
157 proc.control_channel <- &fanout_router_control_request{quit: true}
158 }
159 }
160 }
161 func (fr *FanoutRouter) start_processor(pr *fanout_router_processor) {
162 pr.state = state_Starting
163 fr.cur_processors = append(fr.cur_processors, pr)
164 go pr.process_requests()
165 fr.processor_wg.Add(1)
166 }
167 func (fp *fanout_router_processor) process_requests() {
168 prefix := fmt.Sprintf("[%s] ", fp.address())
169 fp.fr.debugf("%sstarted\n", prefix)
170 fp.state = state_Started
171 for {
172 select {
173 case ctrl := <-fp.control_channel:
174 if ctrl.quit {
175 goto out
176 }
177 case req := <-fp.fr.requests:
178 if req.quit {
179 goto out
180 }
181 pr := &ProcessRequest{proc: fp, req: req}
182 fp.fr.debugf("%sprocessing...\n", prefix)
183 err := fp.fr.proc(pr)
184 fp.fr.debugf("%scomplete...\n", prefix)
185 cn := &CompletionNotification{pr: pr, err: err}
186 fp.processed++
187 fp.fr.cn(cn)
188
189 }
190
191 }
192 out:
193 fp.target.Close()
194 fp.fr.debugf("%sFinished (after %d requests)\n", prefix, fp.processed)
195 fp.fr.processor_wg.Done()
196 fp.state = state_Stopped
197 }
198 func (fp *fanout_router_processor) address() string {
199 return fp.target.Address()
200 }
201 func (p *ProcessRequest) Object() interface{} {
202 return p.req.o
203 }
204 func (p *ProcessRequest) GRPCConnection() *grpc.ClientConn {
205 rcon, err := p.proc.target.Connection()
206 if err != nil {
207 fmt.Printf("Failed to get Connection: %s\n", err)
208 return nil
209 }
210 gcon, err := rcon.GRPCConnection()
211 if err != nil {
212 fmt.Printf("Failed to get GRPCConnection: %s\n", err)
213 return nil
214 }
215 return gcon
216
217 }
218 func (p *CompletionNotification) Error() error {
219 return p.err
220 }
221 func (p *CompletionNotification) Object() interface{} {
222 return p.pr.req.o
223 }
224
225
226
227 func (fr *FanoutRouter) debugf(format string, args ...interface{}) {
228 if !*debug {
229 return
230 }
231 prefix := fmt.Sprintf("[go-easyops router/fanout %s]", fr.cm.ServiceName())
232 txt := fmt.Sprintf(format, args...)
233 fmt.Print(prefix + txt)
234 }
235
236 func infoprovider() []*common.InfoValue {
237 return nil
238 }
239
View as plain text