...

Source file src/golang.conradwood.net/go-easyops/router/connection_manager.go

Documentation: golang.conradwood.net/go-easyops/router

     1  package router
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  
     8  	"golang.conradwood.net/apis/registry"
     9  	"golang.conradwood.net/go-easyops/client"
    10  	"google.golang.org/grpc"
    11  )
    12  
    13  type ConnectionManager struct {
    14  	use_all     bool // don't follow registry recommendations, just all registered ones
    15  	one_per_ip  bool // but at most one per ip
    16  	servicename string
    17  }
    18  type ConnectionTarget struct {
    19  	lock       sync.Mutex
    20  	ip         string
    21  	port       uint32
    22  	connection *Connection
    23  }
    24  type Connection struct {
    25  	lock    sync.Mutex
    26  	address string
    27  	gcon    *grpc.ClientConn
    28  }
    29  
    30  func NewConnectionManager(servicename string) *ConnectionManager {
    31  	return &ConnectionManager{use_all: true, one_per_ip: true, servicename: servicename}
    32  }
    33  func (cm *ConnectionManager) ServiceName() string {
    34  	return cm.servicename
    35  }
    36  func (cm *ConnectionManager) AllowMultipleInstancesPerIP() {
    37  	cm.one_per_ip = false
    38  }
    39  func (cm *ConnectionManager) GetCurrentTargets(ctx context.Context) []*ConnectionTarget {
    40  	var res []*ConnectionTarget
    41  	if cm.use_all {
    42  		res = cm.getCurrentRegistrationsAsTargets(ctx)
    43  		res = append(res, cm.getCurrentTargets(ctx)...)
    44  	} else {
    45  		res = cm.getCurrentTargets(ctx)
    46  	}
    47  	res = cm.filter(res)
    48  	return res
    49  }
    50  func (cm *ConnectionManager) getCurrentRegistrationsAsTargets(ctx context.Context) []*ConnectionTarget {
    51  	req := &registry.V2ListRequest{NameMatch: cm.servicename}
    52  	targetlist, err := client.GetRegistryClient().ListRegistrations(ctx, req)
    53  	if err != nil {
    54  		fmt.Printf("Failed to get registrations for \"%s\": %s\n", req.NameMatch, err)
    55  		return nil
    56  	}
    57  	var res []*ConnectionTarget
    58  	for _, regi := range targetlist.Registrations {
    59  		if !regi.Targetable {
    60  			continue
    61  		}
    62  		t := regi.Target
    63  		found := false
    64  		for _, at := range t.ApiType {
    65  			if at == registry.Apitype_grpc {
    66  				found = true
    67  				break
    68  			}
    69  		}
    70  		if !found {
    71  			// not a grpc targettype
    72  			continue
    73  		}
    74  		ct := &ConnectionTarget{ip: t.IP, port: t.Port}
    75  		res = append(res, ct)
    76  	}
    77  	return res
    78  }
    79  func (cm *ConnectionManager) getCurrentTargets(ctx context.Context) []*ConnectionTarget {
    80  	req := &registry.V2GetTargetRequest{
    81  		ServiceName: []string{cm.servicename},
    82  		ApiType:     registry.Apitype_grpc,
    83  	}
    84  	targetlist, err := client.GetRegistryClient().V2GetTarget(ctx, req)
    85  	if err != nil {
    86  		fmt.Printf("Failed to get targets for \"%s\": %s\n", req.ServiceName, err)
    87  		return nil
    88  	}
    89  	var res []*ConnectionTarget
    90  	for _, t := range targetlist.Targets {
    91  		ct := &ConnectionTarget{ip: t.IP, port: t.Port}
    92  		res = append(res, ct)
    93  	}
    94  	return res
    95  }
    96  func (ct *ConnectionTarget) Address() string {
    97  	return fmt.Sprintf("%s:%d", ct.ip, ct.port)
    98  }
    99  
   100  func (ct *ConnectionTarget) Connection() (*Connection, error) {
   101  	ct.lock.Lock()
   102  	defer ct.lock.Unlock()
   103  	if ct.connection != nil {
   104  		return ct.connection, nil
   105  	}
   106  	c := &Connection{address: ct.Address()}
   107  	ct.connection = c
   108  	return ct.connection, nil
   109  }
   110  func (c *Connection) GRPCConnection() (*grpc.ClientConn, error) {
   111  	c.lock.Lock()
   112  	defer c.lock.Unlock()
   113  	if c.gcon != nil {
   114  		return c.gcon, nil
   115  	}
   116  	gcon, err := client.ConnectWithIP(c.address)
   117  	if err != nil {
   118  		return nil, err
   119  	}
   120  	c.gcon = gcon
   121  	return c.gcon, nil
   122  }
   123  func (c *Connection) Close() {
   124  	c.lock.Lock()
   125  	defer c.lock.Unlock()
   126  	if c.gcon != nil {
   127  		c.gcon.Close()
   128  		c.gcon = nil
   129  	}
   130  }
   131  
   132  func (cm *ConnectionManager) filter(input []*ConnectionTarget) []*ConnectionTarget {
   133  	ipmap := make(map[string]*ConnectionTarget)
   134  	for _, ct := range input {
   135  		key := ct.ip
   136  		if !cm.one_per_ip {
   137  			key = fmt.Sprintf("%s:%d", ct.ip, ct.port)
   138  		}
   139  
   140  		_, fd := ipmap[key]
   141  		if fd {
   142  			continue
   143  		}
   144  		ipmap[key] = ct
   145  	}
   146  	var res []*ConnectionTarget
   147  	for _, v := range ipmap {
   148  		res = append(res, v)
   149  	}
   150  	return res
   151  }
   152  
   153  func (cm *ConnectionManager) debugf(format string, args ...interface{}) {
   154  	if !*debug {
   155  		return
   156  	}
   157  	prefix := fmt.Sprintf("[go-easyops router/cntmgr %s]", cm.servicename)
   158  	txt := fmt.Sprintf(format, args...)
   159  	fmt.Print(prefix + txt)
   160  }
   161  func (ct *ConnectionTarget) Close() {
   162  	ct.lock.Lock()
   163  	defer ct.lock.Unlock()
   164  	if ct.connection == nil {
   165  		return
   166  	}
   167  	ct.connection.Close()
   168  }
   169  

View as plain text