...
1package fanoutrouter
2
3import (
4 "fmt"
5 "sync"
6 "time"
7
8 "golang.conradwood.net/go-easyops/authremote"
9 "google.golang.org/grpc"
10)
11
12const (
13 state_Starting = 1
14 state_Started = 2
15 state_Stopped = 3
16)
17
18/*
19 a fanoutrouter maintains a go-routine per instance. each go-routine listens on a channel for work, if received it calls a function
20 with a grpcConnection as parameter. The result is passed to another channel
21 the number of go-routines changes dynamically as instances come and go
22 each go-routine, if it has work to do, will call a "processor" (a user defined function) with a ProcessRequest.
23 Once the processor completed its work, the result will be send to a function (perhaps even multi-threaded!)
24*/
25type FanoutRouter struct {
26 cm *ConnectionManager
27 requests chan *fanout_router_process_request
28 cn func(*CompletionNotification)
29 proc func(*ProcessRequest) error
30 processor_wg *sync.WaitGroup
31 stopping bool
32 cur_processors []*fanout_router_processor
33 stop_lock sync.Mutex
34}
35type CompletionNotification struct {
36 pr *ProcessRequest
37 err error
38}
39type ProcessRequest struct {
40 req *fanout_router_process_request
41 proc *fanout_router_processor
42}
43
44// one processor per target
45type fanout_router_processor struct {
46 state int
47 fr *FanoutRouter
48 target *ConnectionTarget
49 control_channel chan *fanout_router_control_request
50 processed int
51}
52
53type fanout_router_process_request struct {
54 o interface{} // whatever the user wants to process
55 quit bool // special flag to stop a go-routine from processing more
56}
57type fanout_router_control_request struct {
58 quit bool
59}
60
61func NewFanoutRouter(cm *ConnectionManager, processor func(*ProcessRequest) error, consumer func(*CompletionNotification)) *FanoutRouter {
62 res := &FanoutRouter{
63 cm: cm,
64 requests: make(chan *fanout_router_process_request, 1),
65 proc: processor,
66 cn: consumer,
67 processor_wg: &sync.WaitGroup{},
68 }
69 go res.poll_target_list()
70 return res
71}
72func (fr *FanoutRouter) SubmitWork(object interface{}) {
73 pr := &fanout_router_process_request{o: object}
74 fr.requests <- pr
75}
76
77// this can take a long time, because we wait for all pending requests to finish before returning
78func (fr *FanoutRouter) Stop() {
79 fr.debugf("Stopping...\n")
80 fr.stopping = true
81 fr.stop_lock.Lock()
82 defer fr.stop_lock.Unlock()
83 for i := 0; i < len(fr.cur_processors); i++ {
84 fr.requests <- &fanout_router_process_request{quit: true}
85 }
86 fr.processor_wg.Wait()
87 fr.debugf("Stopped\n")
88}
89
90func (fr *FanoutRouter) poll_target_list() {
91 fr.stop_lock.Lock()
92 fr.debugf("starting polling...\n")
93 ctx := authremote.Context()
94 ct := fr.cm.GetCurrentTargets(ctx)
95 fr.debugf("first polling got %d targets\n", len(ct))
96 fr.compare_current_targets(ct)
97 fr.stop_lock.Unlock()
98
99 for {
100 if fr.stopping {
101 break
102 }
103 time.Sleep(time.Duration(15) * time.Second)
104 fr.stop_lock.Lock()
105 if fr.stopping {
106 fr.stop_lock.Unlock()
107 break
108 }
109 ctx := authremote.Context()
110 fr.debugf("polling...\n")
111 ct := fr.cm.GetCurrentTargets(ctx)
112 fr.debugf("got %d targets\n", len(ct))
113 fr.compare_current_targets(ct)
114 fr.stop_lock.Unlock()
115
116 }
117}
118func (fr *FanoutRouter) compare_current_targets(ct []*ConnectionTarget) {
119 targets := make(map[string]*ConnectionTarget)
120 for _, c := range ct {
121 targets[c.Address()] = c
122 }
123 // find new ones to start
124 for _, proc := range fr.cur_processors {
125 proc_adr := proc.address()
126 delete(targets, proc_adr)
127 }
128 //now start those in targets
129 for _, v := range targets {
130 fp := &fanout_router_processor{fr: fr, target: v, control_channel: make(chan *fanout_router_control_request, 10)}
131 fr.start_processor(fp)
132 }
133
134 // find ones to stop
135 targets = make(map[string]*ConnectionTarget)
136 for _, c := range ct {
137 targets[c.Address()] = c
138 }
139 for _, proc := range fr.cur_processors {
140 _, valid := targets[proc.address()]
141 if !valid {
142 proc.control_channel <- &fanout_router_control_request{quit: true}
143 }
144 }
145}
146func (fr *FanoutRouter) start_processor(pr *fanout_router_processor) {
147 pr.state = state_Starting
148 fr.cur_processors = append(fr.cur_processors, pr)
149 go pr.process_requests()
150 fr.processor_wg.Add(1)
151}
152func (fp *fanout_router_processor) process_requests() {
153 prefix := fmt.Sprintf("[%s] ", fp.address())
154 fmt.Printf("%sstarted\n", prefix)
155 fp.state = state_Started
156 for {
157 select {
158 case ctrl := <-fp.control_channel:
159 if ctrl.quit {
160 goto out
161 }
162 case req := <-fp.fr.requests:
163 if req.quit {
164 goto out
165 }
166 pr := &ProcessRequest{proc: fp, req: req}
167 fmt.Printf("%sprocessing...\n", prefix)
168 err := fp.fr.proc(pr)
169 fmt.Printf("%scomplete...\n", prefix)
170 cn := &CompletionNotification{pr: pr, err: err}
171 fp.processed++
172 fp.fr.cn(cn)
173 //
174 }
175
176 }
177out:
178 fmt.Printf("%sFinished (after %d requests)\n", prefix, fp.processed)
179 fp.fr.processor_wg.Done()
180 fp.state = state_Stopped
181}
182func (fp *fanout_router_processor) address() string {
183 return fp.target.Address()
184}
185func (p *ProcessRequest) GRPCConnection() *grpc.ClientConn {
186 rcon, err := p.proc.target.Connection()
187 if err != nil {
188 fmt.Printf("Failed to get Connection: %s\n", err)
189 return nil
190 }
191 gcon, err := rcon.GRPCConnection()
192 if err != nil {
193 fmt.Printf("Failed to get GRPCConnection: %s\n", err)
194 return nil
195 }
196 return gcon
197
198}
199
200/**************** debugf *********************/
201func (fr *FanoutRouter) debugf(format string, args ...interface{}) {
202 s := "[fanoutrouter] "
203 s2 := fmt.Sprintf(format, args...)
204 fmt.Printf("%s%s", s, s2)
205}
View as plain text