...

Source file src/golang.conradwood.net/go-easyops/router/fanoutrouter.go

Documentation: golang.conradwood.net/go-easyops/router

     1  /*
     2  	The FanoutRouter distributes work (evenly) between available instances, dynamically adjusting to instances coming and going.
     3  
     4  fanoutrouter maintains a go-routine per instance. each go-routine listens on a channel for work. Any work received, triggers a function with a grpcConnection as parameter. The result of that function is passed to second channel. The number of go-routines changes dynamically as instances come and go. Each go-routine, if it has work to do, will call a "processor" (a user defined function) with a ProcessRequest. Once the processor completed its work, the result will be send to a function (perhaps even multi-threaded!)
     5  */
     6  package router
     7  
     8  import (
     9  	"fmt"
    10  	"sync"
    11  	"time"
    12  
    13  	"flag"
    14  
    15  	"golang.conradwood.net/go-easyops/authremote"
    16  	"golang.conradwood.net/go-easyops/common"
    17  	"google.golang.org/grpc"
    18  )
    19  
    20  const (
    21  	state_Starting = 1
    22  	state_Started  = 2
    23  	state_Stopped  = 3
    24  )
    25  
    26  var (
    27  	debug = flag.Bool("ge_debug_router", false, "debug the fanoutrouter")
    28  )
    29  
    30  func init() {
    31  	common.RegisterInfoProvider("fanoutrouter", infoprovider)
    32  }
    33  
    34  type FanoutRouter struct {
    35  	cm             *ConnectionManager
    36  	requests       chan *fanout_router_process_request
    37  	cn             func(*CompletionNotification)
    38  	proc           func(*ProcessRequest) error
    39  	processor_wg   *sync.WaitGroup
    40  	stopping       bool
    41  	cur_processors []*fanout_router_processor
    42  	stop_lock      sync.Mutex
    43  }
    44  type CompletionNotification struct {
    45  	pr  *ProcessRequest
    46  	err error
    47  }
    48  type ProcessRequest struct {
    49  	req  *fanout_router_process_request
    50  	proc *fanout_router_processor
    51  }
    52  
    53  // one processor per target
    54  type fanout_router_processor struct {
    55  	state           int
    56  	fr              *FanoutRouter
    57  	target          *ConnectionTarget
    58  	control_channel chan *fanout_router_control_request
    59  	processed       int
    60  }
    61  
    62  type fanout_router_process_request struct {
    63  	o    interface{} // whatever the user wants to process
    64  	quit bool        // special flag to stop a go-routine from processing more
    65  }
    66  type fanout_router_control_request struct {
    67  	quit bool
    68  }
    69  
    70  func NewFanoutRouter(cm *ConnectionManager, processor func(*ProcessRequest) error, consumer func(*CompletionNotification)) *FanoutRouter {
    71  	res := &FanoutRouter{
    72  		cm:           cm,
    73  		requests:     make(chan *fanout_router_process_request, 1),
    74  		proc:         processor,
    75  		cn:           consumer,
    76  		processor_wg: &sync.WaitGroup{},
    77  	}
    78  	go res.poll_target_list()
    79  	return res
    80  }
    81  func (fr *FanoutRouter) SubmitWork(object interface{}) {
    82  	if fr.stopping {
    83  		fr.debugf("WARNING - submitted work to fanoutrouter, which is in the process of stopping\n")
    84  	}
    85  	if len(fr.cur_processors) == 0 {
    86  		fr.debugf("WARNING - submitted work to fanoutrouter, which has no backends atm\n")
    87  	}
    88  	pr := &fanout_router_process_request{o: object}
    89  	fr.requests <- pr
    90  }
    91  
    92  // this can take a long time, because we wait for all pending requests to finish before returning
    93  func (fr *FanoutRouter) Stop() {
    94  	fr.debugf("Stopping...\n")
    95  	fr.stopping = true
    96  	fr.stop_lock.Lock()
    97  	defer fr.stop_lock.Unlock()
    98  	for i := 0; i < len(fr.cur_processors); i++ {
    99  		fr.requests <- &fanout_router_process_request{quit: true}
   100  	}
   101  	fr.processor_wg.Wait()
   102  	fr.debugf("Stopped\n")
   103  }
   104  
   105  func (fr *FanoutRouter) poll_target_list() {
   106  	fr.stop_lock.Lock()
   107  	fr.debugf("starting polling...\n")
   108  	ctx := authremote.Context()
   109  	ct := fr.cm.GetCurrentTargets(ctx)
   110  	fr.debugf("first polling got %d targets\n", len(ct))
   111  	fr.compare_current_targets(ct)
   112  	fr.stop_lock.Unlock()
   113  
   114  	for {
   115  		if fr.stopping {
   116  			break
   117  		}
   118  		time.Sleep(time.Duration(15) * time.Second)
   119  		fr.stop_lock.Lock()
   120  		if fr.stopping {
   121  			fr.stop_lock.Unlock()
   122  			break
   123  		}
   124  		ctx := authremote.Context()
   125  		fr.debugf("polling...\n")
   126  		ct := fr.cm.GetCurrentTargets(ctx)
   127  		fr.debugf("got %d targets\n", len(ct))
   128  		fr.compare_current_targets(ct)
   129  		fr.stop_lock.Unlock()
   130  	}
   131  	fr.debugf("polling stopped\n")
   132  }
   133  func (fr *FanoutRouter) compare_current_targets(ct []*ConnectionTarget) {
   134  	targets := make(map[string]*ConnectionTarget)
   135  	for _, c := range ct {
   136  		targets[c.Address()] = c
   137  	}
   138  	// find new ones to start
   139  	for _, proc := range fr.cur_processors {
   140  		proc_adr := proc.address()
   141  		delete(targets, proc_adr)
   142  	}
   143  	//now start those in targets
   144  	for _, v := range targets {
   145  		fp := &fanout_router_processor{fr: fr, target: v, control_channel: make(chan *fanout_router_control_request, 10)}
   146  		fr.start_processor(fp)
   147  	}
   148  
   149  	// find ones to stop
   150  	targets = make(map[string]*ConnectionTarget)
   151  	for _, c := range ct {
   152  		targets[c.Address()] = c
   153  	}
   154  	for _, proc := range fr.cur_processors {
   155  		_, valid := targets[proc.address()]
   156  		if !valid {
   157  			proc.control_channel <- &fanout_router_control_request{quit: true}
   158  		}
   159  	}
   160  }
   161  func (fr *FanoutRouter) start_processor(pr *fanout_router_processor) {
   162  	pr.state = state_Starting
   163  	fr.cur_processors = append(fr.cur_processors, pr)
   164  	go pr.process_requests()
   165  	fr.processor_wg.Add(1)
   166  }
   167  func (fp *fanout_router_processor) process_requests() {
   168  	prefix := fmt.Sprintf("[%s] ", fp.address())
   169  	fp.fr.debugf("%sstarted\n", prefix)
   170  	fp.state = state_Started
   171  	for {
   172  		select {
   173  		case ctrl := <-fp.control_channel:
   174  			if ctrl.quit {
   175  				goto out
   176  			}
   177  		case req := <-fp.fr.requests:
   178  			if req.quit {
   179  				goto out
   180  			}
   181  			pr := &ProcessRequest{proc: fp, req: req}
   182  			fp.fr.debugf("%sprocessing...\n", prefix)
   183  			err := fp.fr.proc(pr)
   184  			fp.fr.debugf("%scomplete...\n", prefix)
   185  			cn := &CompletionNotification{pr: pr, err: err}
   186  			fp.processed++
   187  			fp.fr.cn(cn)
   188  			//
   189  		}
   190  
   191  	}
   192  out:
   193  	fp.target.Close()
   194  	fp.fr.debugf("%sFinished (after %d requests)\n", prefix, fp.processed)
   195  	fp.fr.processor_wg.Done()
   196  	fp.state = state_Stopped
   197  }
   198  func (fp *fanout_router_processor) address() string {
   199  	return fp.target.Address()
   200  }
   201  func (p *ProcessRequest) Object() interface{} {
   202  	return p.req.o
   203  }
   204  func (p *ProcessRequest) GRPCConnection() *grpc.ClientConn {
   205  	rcon, err := p.proc.target.Connection()
   206  	if err != nil {
   207  		fmt.Printf("Failed to get Connection: %s\n", err)
   208  		return nil
   209  	}
   210  	gcon, err := rcon.GRPCConnection()
   211  	if err != nil {
   212  		fmt.Printf("Failed to get GRPCConnection: %s\n", err)
   213  		return nil
   214  	}
   215  	return gcon
   216  
   217  }
   218  func (p *CompletionNotification) Error() error {
   219  	return p.err
   220  }
   221  func (p *CompletionNotification) Object() interface{} {
   222  	return p.pr.req.o
   223  }
   224  
   225  /**************** debugf *********************/
   226  
   227  func (fr *FanoutRouter) debugf(format string, args ...interface{}) {
   228  	if !*debug {
   229  		return
   230  	}
   231  	prefix := fmt.Sprintf("[go-easyops router/fanout %s]", fr.cm.ServiceName())
   232  	txt := fmt.Sprintf(format, args...)
   233  	fmt.Print(prefix + txt)
   234  }
   235  
   236  func infoprovider() []*common.InfoValue {
   237  	return nil
   238  }
   239  

View as plain text