...

Source file src/golang.conradwood.net/go-easyops/client/targetlist.go

Documentation: golang.conradwood.net/go-easyops/client

     1  package client
     2  
     3  import (
     4  	"fmt"
     5  	"golang.conradwood.net/apis/registry"
     6  	"golang.conradwood.net/go-easyops/cmdline"
     7  	"golang.conradwood.net/go-easyops/utils"
     8  	"google.golang.org/grpc"
     9  	"sync"
    10  	"time"
    11  )
    12  
    13  type TargetList struct {
    14  	reg      string
    15  	svcname  string
    16  	runOnce  bool
    17  	tstarted bool
    18  	targets  map[string]*TargetWithConnection
    19  	rlock    sync.Mutex
    20  }
    21  type TargetWithConnection struct {
    22  	target *registry.Target
    23  	con    *grpc.ClientConn
    24  }
    25  
    26  func NewTargetList(servicename string) *TargetList {
    27  	t := &TargetList{svcname: servicename, reg: cmdline.GetClientRegistryAddress()}
    28  	t.targets = make(map[string]*TargetWithConnection)
    29  	return t
    30  }
    31  func (t *TargetWithConnection) Connection() *grpc.ClientConn {
    32  	return t.con
    33  }
    34  func (t *TargetWithConnection) String() string {
    35  	return fmt.Sprintf("%s@%s:%d", t.target.ServiceName, t.target.IP, t.target.Port)
    36  }
    37  
    38  func (t *TargetList) Targets() []*TargetWithConnection {
    39  	if !t.runOnce {
    40  		err := t.refresh()
    41  		if err != nil {
    42  			fmt.Printf("[go-easyops] failed to refresh %s: %s\n", t.svcname, utils.ErrorString(err))
    43  			return nil
    44  		}
    45  	}
    46  	var res []*TargetWithConnection
    47  	for _, v := range t.targets {
    48  		if v.con == nil {
    49  			continue
    50  		}
    51  		res = append(res, v)
    52  	}
    53  	return res
    54  }
    55  
    56  // returns a connection to exactly one target
    57  func (t *TargetList) Connections() []*grpc.ClientConn {
    58  	if !t.runOnce {
    59  		err := t.refresh()
    60  		if err != nil {
    61  			fmt.Printf("[go-easyops] failed to refresh %s: %s\n", t.svcname, utils.ErrorString(err))
    62  			return nil
    63  		}
    64  	}
    65  	var res []*grpc.ClientConn
    66  	for _, v := range t.targets {
    67  		if v.con != nil {
    68  			res = append(res, v.con)
    69  		}
    70  	}
    71  	return res
    72  }
    73  func (t *TargetList) ByAddress(address string) []*TargetWithConnection {
    74  	var res []*TargetWithConnection
    75  	for _, v := range t.targets {
    76  		if v.target.IP != address {
    77  			continue
    78  		}
    79  		res = append(res, v)
    80  	}
    81  	return res
    82  }
    83  func (t *TargetList) refresh_loop() {
    84  	for {
    85  		time.Sleep(30 * time.Second)
    86  		err := t.refresh()
    87  		if err != nil {
    88  			fmt.Printf("refresh failed: %s\n", utils.ErrorString(err))
    89  		}
    90  	}
    91  }
    92  func (t *TargetList) refresh() error {
    93  	t.rlock.Lock()
    94  	defer t.rlock.Unlock()
    95  	if !t.tstarted {
    96  		go t.refresh_loop()
    97  		t.tstarted = true
    98  	}
    99  	rc := GetRegistryClient()
   100  	treq := &registry.V2GetTargetRequest{ApiType: registry.Apitype_grpc, ServiceName: []string{t.svcname}}
   101  	ctx := getContext()
   102  	tr, err := rc.V2GetTarget(ctx, treq)
   103  	if err != nil {
   104  		return err
   105  	}
   106  	for _, target := range tr.Targets {
   107  		t.add(target)
   108  	}
   109  	for _, tc := range t.targets {
   110  		target := tc.target
   111  		url := fmt.Sprintf("%s:%d", target.IP, target.Port)
   112  		found := false
   113  		for _, yestarget := range tr.Targets {
   114  			yesurl := fmt.Sprintf("%s:%d", yestarget.IP, yestarget.Port)
   115  			if yesurl == url {
   116  				found = true
   117  				break
   118  			}
   119  		}
   120  		if !found {
   121  			t.remove(target)
   122  		}
   123  	}
   124  	t.runOnce = true
   125  	return nil
   126  }
   127  
   128  // if target does not exist, it's a noop. otherwise close connection and remove
   129  func (t *TargetList) remove(target *registry.Target) {
   130  	url := fmt.Sprintf("%s:%d", target.IP, target.Port)
   131  	tc, k := t.targets[url]
   132  	if !k {
   133  		return
   134  	}
   135  	if tc.con != nil {
   136  		tc.con.Close()
   137  		tc.con = nil
   138  	}
   139  	delete(t.targets, url)
   140  }
   141  
   142  // if target exists, it's a noop
   143  func (t *TargetList) add(target *registry.Target) {
   144  	url := fmt.Sprintf("%s:%d", target.IP, target.Port)
   145  	_, k := t.targets[url]
   146  	if k {
   147  		return
   148  	}
   149  	tc := &TargetWithConnection{target: target}
   150  	t.targets[url] = tc
   151  	c, err := ConnectWithIP(url)
   152  	if err != nil {
   153  		fmt.Printf("[go-easyops] could not connect to %s: %s\n", url, utils.ErrorString(err))
   154  		return
   155  	}
   156  	tc.con = c
   157  }
   158  

View as plain text