...

Text file src/golang.conradwood.net/go-easyops/client/connection_manager.go~

Documentation: golang.conradwood.net/go-easyops/client

     1package fanoutrouter
     2
     3import (
     4	"context"
     5	"fmt"
     6	"sync"
     7
     8	"golang.conradwood.net/apis/registry"
     9	"golang.conradwood.net/go-easyops/client"
    10	"google.golang.org/grpc"
    11)
    12
    13type ConnectionManager struct {
    14	use_all     bool // don't follow registry recommendations, just all registered ones
    15	one_per_ip  bool // but at most one per ip
    16	servicename string
    17}
    18type ConnectionTarget struct {
    19	lock       sync.Mutex
    20	ip         string
    21	port       uint32
    22	connection *Connection
    23}
    24type Connection struct {
    25	lock    sync.Mutex
    26	address string
    27	gcon    *grpc.ClientConn
    28}
    29
    30func NewConnectionManager(servicename string) *ConnectionManager {
    31	return &ConnectionManager{use_all: true, one_per_ip: true, servicename: servicename}
    32}
    33func (cm *ConnectionManager) AllowMultipleInstancesPerIP() {
    34	cm.one_per_ip = false
    35}
    36func (cm *ConnectionManager) GetCurrentTargets(ctx context.Context) []*ConnectionTarget {
    37	var res []*ConnectionTarget
    38	if cm.use_all {
    39		res = cm.getCurrentRegistrationsAsTargets(ctx)
    40	} else {
    41		res = cm.getCurrentTargets(ctx)
    42	}
    43	res = cm.filter(res)
    44	return res
    45}
    46func (cm *ConnectionManager) getCurrentRegistrationsAsTargets(ctx context.Context) []*ConnectionTarget {
    47	req := &registry.V2ListRequest{NameMatch: cm.servicename}
    48	targetlist, err := client.GetRegistryClient().ListRegistrations(ctx, req)
    49	if err != nil {
    50		fmt.Printf("Failed to get registrations for \"%s\": %s\n", req.NameMatch, err)
    51		return nil
    52	}
    53	var res []*ConnectionTarget
    54	for _, regi := range targetlist.Registrations {
    55		if !regi.Targetable {
    56			continue
    57		}
    58		t := regi.Target
    59		found := false
    60		for _, at := range t.ApiType {
    61			if at == registry.Apitype_grpc {
    62				found = true
    63				break
    64			}
    65		}
    66		if !found {
    67			// not a grpc targettype
    68			continue
    69		}
    70		ct := &ConnectionTarget{ip: t.IP, port: t.Port}
    71		res = append(res, ct)
    72	}
    73	return res
    74}
    75func (cm *ConnectionManager) getCurrentTargets(ctx context.Context) []*ConnectionTarget {
    76	req := &registry.V2GetTargetRequest{
    77		ServiceName: []string{cm.servicename},
    78		ApiType:     registry.Apitype_grpc,
    79	}
    80	targetlist, err := client.GetRegistryClient().V2GetTarget(ctx, req)
    81	if err != nil {
    82		fmt.Printf("Failed to get targets for \"%s\": %s\n", req.ServiceName, err)
    83		return nil
    84	}
    85	var res []*ConnectionTarget
    86	for _, t := range targetlist.Targets {
    87		ct := &ConnectionTarget{ip: t.IP, port: t.Port}
    88		res = append(res, ct)
    89	}
    90	return res
    91}
    92func (ct *ConnectionTarget) Address() string {
    93	return fmt.Sprintf("%s:%d", ct.ip, ct.port)
    94}
    95
    96func (ct *ConnectionTarget) Connection() (*Connection, error) {
    97	ct.lock.Lock()
    98	defer ct.lock.Unlock()
    99	if ct.connection != nil {
   100		return ct.connection, nil
   101	}
   102	c := &Connection{address: ct.Address()}
   103	ct.connection = c
   104	return ct.connection, nil
   105}
   106func (c *Connection) GRPCConnection() (*grpc.ClientConn, error) {
   107	c.lock.Lock()
   108	defer c.lock.Unlock()
   109	gcon, err := client.ConnectWithIP(c.address)
   110	if err != nil {
   111		return nil, err
   112	}
   113	c.gcon = gcon
   114	return c.gcon, nil
   115}
   116func (c *Connection) Close() {
   117	c.lock.Lock()
   118	defer c.lock.Unlock()
   119	if c.gcon != nil {
   120		c.gcon.Close()
   121		c.gcon = nil
   122	}
   123}
   124
   125func (cm *ConnectionManager) filter(input []*ConnectionTarget) []*ConnectionTarget {
   126	if !cm.one_per_ip {
   127		return input
   128	}
   129	ipmap := make(map[string]*ConnectionTarget)
   130	for _, ct := range input {
   131		_, fd := ipmap[ct.ip]
   132		if fd {
   133			continue
   134		}
   135		ipmap[ct.ip] = ct
   136	}
   137	var res []*ConnectionTarget
   138	for _, v := range ipmap {
   139		res = append(res, v)
   140	}
   141	return res
   142}

View as plain text