...

Source file src/golang.conradwood.net/go-easyops/server/stream_interceptor.go

Documentation: golang.conradwood.net/go-easyops/server

     1  package server
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	fw "golang.conradwood.net/apis/framework"
     8  
     9  	//	"golang.conradwood.net/go-easyops/auth"
    10  	"golang.conradwood.net/go-easyops/auth"
    11  	"golang.conradwood.net/go-easyops/cmdline"
    12  
    13  	//	"golang.conradwood.net/go-easyops/ctx"
    14  	"strings"
    15  	"time"
    16  
    17  	"golang.conradwood.net/go-easyops/errors"
    18  	pp "golang.conradwood.net/go-easyops/profiling"
    19  	"golang.conradwood.net/go-easyops/prometheus"
    20  	"google.golang.org/grpc"
    21  	"google.golang.org/grpc/codes"
    22  	"google.golang.org/grpc/metadata"
    23  	"google.golang.org/grpc/status"
    24  )
    25  
    26  func (sd *serverDef) StreamAuthInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    27  	var err error
    28  	pp.ServerRpcEntered()
    29  	defer pp.ServerRpcDone()
    30  	startRPC()
    31  	defer stopRPC()
    32  
    33  	name := ServiceNameFromStreamInfo(info)
    34  	method := MethodNameFromStreamInfo(info)
    35  	rc := &rpccall{ServiceName: name, MethodName: method, Started: time.Now()}
    36  	stdMetrics.concurrent_server_requests.With(prometheus.Labels{
    37  		"method":      method,
    38  		"servicename": name,
    39  	}).Inc()
    40  	defer stdMetrics.concurrent_server_requests.With(prometheus.Labels{
    41  		"method":      method,
    42  		"servicename": name,
    43  	}).Dec()
    44  
    45  	if cmdline.IsDebugRPCServer() {
    46  		fmt.Printf("[go-easyops] Debug-rpc: called streaming rpc %s/%s\n", name, method)
    47  	}
    48  	//fmt.Printf("Method: \"%s\"\n", method)
    49  	if isInternalService(name) {
    50  		if cmdline.IsDebugRPCServer() {
    51  			fmt.Printf("Invoking internal service stream handler\n")
    52  		}
    53  		res := handler(srv, stream)
    54  		if cmdline.IsDebugRPCServer() {
    55  			fmt.Printf("internal service stream handler returned: %s\n", res)
    56  		}
    57  		return res
    58  	}
    59  
    60  	def := getServerDefByName(name)
    61  	if def == nil {
    62  		s := fmt.Sprintf("[go-easyops] Service not registered! %s", name)
    63  		fmt.Println(s)
    64  		return errors.Error(stream.Context(), codes.Unimplemented, "service unavailable", "service %s is not known here", rc.ServiceName)
    65  	}
    66  
    67  	grpc_server_requests.With(prometheus.Labels{
    68  		"method":      method,
    69  		"servicename": def.name,
    70  	}).Inc()
    71  
    72  	var out_ctx context.Context
    73  
    74  	if cmdline.ContextWithBuilder() {
    75  		out_ctx, _, err = sd.V1inbound2outbound(stream.Context(), rc)
    76  		if err != nil {
    77  			return err
    78  		}
    79  	} else {
    80  		panic("obsolete code path")
    81  	}
    82  	track_inbound_call(name, method, auth.GetService(out_ctx))
    83  	print_inbound_debug(rc, out_ctx)
    84  
    85  	nstream := newServerStream(stream, out_ctx)
    86  	err = handler(srv, nstream)
    87  	if err == nil {
    88  		return nil
    89  	}
    90  	if cmdline.IsDebugRPCServer() || *print_errs {
    91  		fmt.Printf("[go-easyops] Call %s.%s failed: %s\n", def.name, method, errors.ErrorStringWithStackTrace(err))
    92  	}
    93  	incFailure(def.name, method, err)
    94  
    95  	// get status from error
    96  	st := status.Convert(err)
    97  	fm := fw.CallTrace{
    98  		Message: fmt.Sprintf("[go-easyops] GRPC error in method %s.%s()", def.name, method),
    99  		Method:  method,
   100  		Service: def.name,
   101  	}
   102  
   103  	// add details
   104  	st, errx := st.WithDetails(&fm)
   105  
   106  	// if adding details failed, just return the undecorated error message
   107  	if errx != nil {
   108  		sd.logError(out_ctx, rc, err)
   109  		return err
   110  	}
   111  
   112  	re := st.Err()
   113  	sd.logError(out_ctx, rc, re)
   114  	return re
   115  }
   116  func MethodNameFromStreamInfo(info *grpc.StreamServerInfo) string {
   117  	full := info.FullMethod
   118  	if full[0] == '/' {
   119  		full = full[1:]
   120  	}
   121  	ns := strings.SplitN(full, "/", 2)
   122  	if len(ns) < 2 {
   123  		return ""
   124  	}
   125  	res := ns[1]
   126  	if res[0] == '/' {
   127  		res = res[1:]
   128  	}
   129  	return ns[1]
   130  }
   131  func ServiceNameFromStreamInfo(info *grpc.StreamServerInfo) string {
   132  	full := info.FullMethod
   133  	if full[0] == '/' {
   134  		full = full[1:]
   135  	}
   136  	ns := strings.SplitN(full, "/", 2)
   137  	return ns[0]
   138  }
   139  
   140  type customServerStream struct {
   141  	stream grpc.ServerStream
   142  	ctx    context.Context
   143  }
   144  
   145  func newServerStream(in grpc.ServerStream, ctx context.Context) grpc.ServerStream {
   146  	res := &customServerStream{stream: in, ctx: ctx}
   147  	return res
   148  }
   149  
   150  func (c *customServerStream) SetHeader(m metadata.MD) error {
   151  	return c.stream.SetHeader(m)
   152  }
   153  func (c *customServerStream) SendHeader(m metadata.MD) error {
   154  	return c.stream.SendHeader(m)
   155  }
   156  func (c *customServerStream) SetTrailer(m metadata.MD) {
   157  	c.stream.SetTrailer(m)
   158  }
   159  func (c *customServerStream) Context() context.Context {
   160  	//	return c.stream.Context()
   161  	return c.ctx
   162  }
   163  func (c *customServerStream) SendMsg(m interface{}) error {
   164  	return c.stream.SendMsg(m)
   165  }
   166  func (c *customServerStream) RecvMsg(m interface{}) error {
   167  	return c.stream.RecvMsg(m)
   168  }
   169  

View as plain text