...

Text file src/golang.conradwood.net/go-easyops/router/fanoutrouter.go~

Documentation: golang.conradwood.net/go-easyops/router

     1package fanoutrouter
     2
     3import (
     4	"fmt"
     5	"sync"
     6	"time"
     7
     8	"golang.conradwood.net/go-easyops/authremote"
     9	"google.golang.org/grpc"
    10)
    11
    12const (
    13	state_Starting = 1
    14	state_Started  = 2
    15	state_Stopped  = 3
    16)
    17
    18/*
    19			a fanoutrouter maintains a go-routine per instance. each go-routine listens on a channel for work, if received it calls a function
    20			with a grpcConnection as parameter. The result is passed to another channel
    21		        the number of go-routines changes dynamically as instances come and go
    22	                each go-routine, if it has work to do, will call a "processor" (a user defined function) with a ProcessRequest.
    23	                Once the processor completed its work, the result will be send to a function (perhaps even multi-threaded!)
    24*/
    25type FanoutRouter struct {
    26	cm             *ConnectionManager
    27	requests       chan *fanout_router_process_request
    28	cn             func(*CompletionNotification)
    29	proc           func(*ProcessRequest) error
    30	processor_wg   *sync.WaitGroup
    31	stopping       bool
    32	cur_processors []*fanout_router_processor
    33	stop_lock      sync.Mutex
    34}
    35type CompletionNotification struct {
    36	pr  *ProcessRequest
    37	err error
    38}
    39type ProcessRequest struct {
    40	req  *fanout_router_process_request
    41	proc *fanout_router_processor
    42}
    43
    44// one processor per target
    45type fanout_router_processor struct {
    46	state           int
    47	fr              *FanoutRouter
    48	target          *ConnectionTarget
    49	control_channel chan *fanout_router_control_request
    50	processed       int
    51}
    52
    53type fanout_router_process_request struct {
    54	o    interface{} // whatever the user wants to process
    55	quit bool        // special flag to stop a go-routine from processing more
    56}
    57type fanout_router_control_request struct {
    58	quit bool
    59}
    60
    61func NewFanoutRouter(cm *ConnectionManager, processor func(*ProcessRequest) error, consumer func(*CompletionNotification)) *FanoutRouter {
    62	res := &FanoutRouter{
    63		cm:           cm,
    64		requests:     make(chan *fanout_router_process_request, 1),
    65		proc:         processor,
    66		cn:           consumer,
    67		processor_wg: &sync.WaitGroup{},
    68	}
    69	go res.poll_target_list()
    70	return res
    71}
    72func (fr *FanoutRouter) SubmitWork(object interface{}) {
    73	pr := &fanout_router_process_request{o: object}
    74	fr.requests <- pr
    75}
    76
    77// this can take a long time, because we wait for all pending requests to finish before returning
    78func (fr *FanoutRouter) Stop() {
    79	fr.debugf("Stopping...\n")
    80	fr.stopping = true
    81	fr.stop_lock.Lock()
    82	defer fr.stop_lock.Unlock()
    83	for i := 0; i < len(fr.cur_processors); i++ {
    84		fr.requests <- &fanout_router_process_request{quit: true}
    85	}
    86	fr.processor_wg.Wait()
    87	fr.debugf("Stopped\n")
    88}
    89
    90func (fr *FanoutRouter) poll_target_list() {
    91	fr.stop_lock.Lock()
    92	fr.debugf("starting polling...\n")
    93	ctx := authremote.Context()
    94	ct := fr.cm.GetCurrentTargets(ctx)
    95	fr.debugf("first polling got %d targets\n", len(ct))
    96	fr.compare_current_targets(ct)
    97	fr.stop_lock.Unlock()
    98
    99	for {
   100		if fr.stopping {
   101			break
   102		}
   103		time.Sleep(time.Duration(15) * time.Second)
   104		fr.stop_lock.Lock()
   105		if fr.stopping {
   106			fr.stop_lock.Unlock()
   107			break
   108		}
   109		ctx := authremote.Context()
   110		fr.debugf("polling...\n")
   111		ct := fr.cm.GetCurrentTargets(ctx)
   112		fr.debugf("got %d targets\n", len(ct))
   113		fr.compare_current_targets(ct)
   114		fr.stop_lock.Unlock()
   115
   116	}
   117}
   118func (fr *FanoutRouter) compare_current_targets(ct []*ConnectionTarget) {
   119	targets := make(map[string]*ConnectionTarget)
   120	for _, c := range ct {
   121		targets[c.Address()] = c
   122	}
   123	// find new ones to start
   124	for _, proc := range fr.cur_processors {
   125		proc_adr := proc.address()
   126		delete(targets, proc_adr)
   127	}
   128	//now start those in targets
   129	for _, v := range targets {
   130		fp := &fanout_router_processor{fr: fr, target: v, control_channel: make(chan *fanout_router_control_request, 10)}
   131		fr.start_processor(fp)
   132	}
   133
   134	// find ones to stop
   135	targets = make(map[string]*ConnectionTarget)
   136	for _, c := range ct {
   137		targets[c.Address()] = c
   138	}
   139	for _, proc := range fr.cur_processors {
   140		_, valid := targets[proc.address()]
   141		if !valid {
   142			proc.control_channel <- &fanout_router_control_request{quit: true}
   143		}
   144	}
   145}
   146func (fr *FanoutRouter) start_processor(pr *fanout_router_processor) {
   147	pr.state = state_Starting
   148	fr.cur_processors = append(fr.cur_processors, pr)
   149	go pr.process_requests()
   150	fr.processor_wg.Add(1)
   151}
   152func (fp *fanout_router_processor) process_requests() {
   153	prefix := fmt.Sprintf("[%s] ", fp.address())
   154	fmt.Printf("%sstarted\n", prefix)
   155	fp.state = state_Started
   156	for {
   157		select {
   158		case ctrl := <-fp.control_channel:
   159			if ctrl.quit {
   160				goto out
   161			}
   162		case req := <-fp.fr.requests:
   163			if req.quit {
   164				goto out
   165			}
   166			pr := &ProcessRequest{proc: fp, req: req}
   167			fmt.Printf("%sprocessing...\n", prefix)
   168			err := fp.fr.proc(pr)
   169			fmt.Printf("%scomplete...\n", prefix)
   170			cn := &CompletionNotification{pr: pr, err: err}
   171			fp.processed++
   172			fp.fr.cn(cn)
   173			//
   174		}
   175
   176	}
   177out:
   178	fmt.Printf("%sFinished (after %d requests)\n", prefix, fp.processed)
   179	fp.fr.processor_wg.Done()
   180	fp.state = state_Stopped
   181}
   182func (fp *fanout_router_processor) address() string {
   183	return fp.target.Address()
   184}
   185func (p *ProcessRequest) GRPCConnection() *grpc.ClientConn {
   186	rcon, err := p.proc.target.Connection()
   187	if err != nil {
   188		fmt.Printf("Failed to get Connection: %s\n", err)
   189		return nil
   190	}
   191	gcon, err := rcon.GRPCConnection()
   192	if err != nil {
   193		fmt.Printf("Failed to get GRPCConnection: %s\n", err)
   194		return nil
   195	}
   196	return gcon
   197
   198}
   199
   200/**************** debugf *********************/
   201func (fr *FanoutRouter) debugf(format string, args ...interface{}) {
   202	s := "[fanoutrouter] "
   203	s2 := fmt.Sprintf(format, args...)
   204	fmt.Printf("%s%s", s, s2)
   205}

View as plain text