1 package server
2
3 import (
4 "context"
5 "fmt"
6
7 fw "golang.conradwood.net/apis/framework"
8
9
10 "golang.conradwood.net/go-easyops/auth"
11 "golang.conradwood.net/go-easyops/cmdline"
12
13
14 "strings"
15 "time"
16
17 "golang.conradwood.net/go-easyops/errors"
18 pp "golang.conradwood.net/go-easyops/profiling"
19 "golang.conradwood.net/go-easyops/prometheus"
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/codes"
22 "google.golang.org/grpc/metadata"
23 "google.golang.org/grpc/status"
24 )
25
26 func (sd *serverDef) StreamAuthInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
27 var err error
28 pp.ServerRpcEntered()
29 defer pp.ServerRpcDone()
30 startRPC()
31 defer stopRPC()
32
33 name := ServiceNameFromStreamInfo(info)
34 method := MethodNameFromStreamInfo(info)
35 rc := &rpccall{ServiceName: name, MethodName: method, Started: time.Now()}
36 stdMetrics.concurrent_server_requests.With(prometheus.Labels{
37 "method": method,
38 "servicename": name,
39 }).Inc()
40 defer stdMetrics.concurrent_server_requests.With(prometheus.Labels{
41 "method": method,
42 "servicename": name,
43 }).Dec()
44
45 if cmdline.IsDebugRPCServer() {
46 fmt.Printf("[go-easyops] Debug-rpc: called streaming rpc %s/%s\n", name, method)
47 }
48
49 if isInternalService(name) {
50 if cmdline.IsDebugRPCServer() {
51 fmt.Printf("Invoking internal service stream handler\n")
52 }
53 res := handler(srv, stream)
54 if cmdline.IsDebugRPCServer() {
55 fmt.Printf("internal service stream handler returned: %s\n", res)
56 }
57 return res
58 }
59
60 def := getServerDefByName(name)
61 if def == nil {
62 s := fmt.Sprintf("[go-easyops] Service not registered! %s", name)
63 fmt.Println(s)
64 return errors.Error(stream.Context(), codes.Unimplemented, "service unavailable", "service %s is not known here", rc.ServiceName)
65 }
66
67 grpc_server_requests.With(prometheus.Labels{
68 "method": method,
69 "servicename": def.name,
70 }).Inc()
71
72 var out_ctx context.Context
73
74 if cmdline.ContextWithBuilder() {
75 out_ctx, _, err = sd.V1inbound2outbound(stream.Context(), rc)
76 if err != nil {
77 return err
78 }
79 } else {
80 panic("obsolete code path")
81 }
82 track_inbound_call(name, method, auth.GetService(out_ctx))
83 print_inbound_debug(rc, out_ctx)
84
85 nstream := newServerStream(stream, out_ctx)
86 err = handler(srv, nstream)
87 if err == nil {
88 return nil
89 }
90 if cmdline.IsDebugRPCServer() || *print_errs {
91 fmt.Printf("[go-easyops] Call %s.%s failed: %s\n", def.name, method, errors.ErrorStringWithStackTrace(err))
92 }
93 incFailure(def.name, method, err)
94
95
96 st := status.Convert(err)
97 fm := fw.CallTrace{
98 Message: fmt.Sprintf("[go-easyops] GRPC error in method %s.%s()", def.name, method),
99 Method: method,
100 Service: def.name,
101 }
102
103
104 st, errx := st.WithDetails(&fm)
105
106
107 if errx != nil {
108 sd.logError(out_ctx, rc, err)
109 return err
110 }
111
112 re := st.Err()
113 sd.logError(out_ctx, rc, re)
114 return re
115 }
116 func MethodNameFromStreamInfo(info *grpc.StreamServerInfo) string {
117 full := info.FullMethod
118 if full[0] == '/' {
119 full = full[1:]
120 }
121 ns := strings.SplitN(full, "/", 2)
122 if len(ns) < 2 {
123 return ""
124 }
125 res := ns[1]
126 if res[0] == '/' {
127 res = res[1:]
128 }
129 return ns[1]
130 }
131 func ServiceNameFromStreamInfo(info *grpc.StreamServerInfo) string {
132 full := info.FullMethod
133 if full[0] == '/' {
134 full = full[1:]
135 }
136 ns := strings.SplitN(full, "/", 2)
137 return ns[0]
138 }
139
140 type customServerStream struct {
141 stream grpc.ServerStream
142 ctx context.Context
143 }
144
145 func newServerStream(in grpc.ServerStream, ctx context.Context) grpc.ServerStream {
146 res := &customServerStream{stream: in, ctx: ctx}
147 return res
148 }
149
150 func (c *customServerStream) SetHeader(m metadata.MD) error {
151 return c.stream.SetHeader(m)
152 }
153 func (c *customServerStream) SendHeader(m metadata.MD) error {
154 return c.stream.SendHeader(m)
155 }
156 func (c *customServerStream) SetTrailer(m metadata.MD) {
157 c.stream.SetTrailer(m)
158 }
159 func (c *customServerStream) Context() context.Context {
160
161 return c.ctx
162 }
163 func (c *customServerStream) SendMsg(m interface{}) error {
164 return c.stream.SendMsg(m)
165 }
166 func (c *customServerStream) RecvMsg(m interface{}) error {
167 return c.stream.RecvMsg(m)
168 }
169
View as plain text