1
4
5 package ctxv2
6
7 import (
8 "context"
9 "flag"
10 "fmt"
11 "time"
12
13 "golang.conradwood.net/apis/auth"
14 ge "golang.conradwood.net/apis/goeasyops"
15 "golang.conradwood.net/go-easyops/cmdline"
16 "golang.conradwood.net/go-easyops/common"
17 "golang.conradwood.net/go-easyops/ctx/shared"
18 "golang.conradwood.net/go-easyops/utils"
19 "golang.yacloud.eu/apis/session"
20 "google.golang.org/grpc/metadata"
21 )
22
23
28 const (
29 METANAME = "goeasyops_meta_v2"
30 )
31
32 var (
33 ser_prefix = []byte("SER-CTX-V2")
34 do_panic = flag.Bool("ge_panic_v2_on_error", false, "if true panic very often")
35 )
36
37
38 type contextBuilder struct {
39 requestid string
40 timeout time.Duration
41 parent context.Context
42 got_parent bool
43
55 ge_context *ge.InContext
56 }
57
58
61 func (c *contextBuilder) Context() (context.Context, context.CancelFunc) {
62 ctx, cf, _ := c.contextWithLocalState()
63 return ctx, cf
64 }
65
66
69 func (c *contextBuilder) contextWithLocalState() (context.Context, context.CancelFunc, *localState) {
70 inctx := c.ge_context
71 b, err := utils.Marshal(inctx)
72 if err != nil {
73 panic(fmt.Sprintf("[go-easyops] unable to marshal context: %s", err))
74 }
75 ctx, cf := c.buildInitialContext()
76 ls := c.newLocalState()
77 ctx = context.WithValue(ctx, shared.LOCALSTATENAME, ls)
78 newmd := metadata.Pairs(METANAME, b)
79 ctx = metadata.NewOutgoingContext(ctx, newmd)
80 ls.callingservice = c.ge_context.MCtx.CallingService
81 panic_if_service_account(common.VerifySignedUser(inctx.ImCtx.User))
82 return ctx, cf, ls
83 }
84
85
86 func (c *contextBuilder) buildInitialContext() (context.Context, context.CancelFunc) {
87 var ctx context.Context
88 var cnc context.CancelFunc
89 octx := c.parent
90 if !c.got_parent {
91 octx = context.Background()
92 }
93 if c.timeout != 0 {
94 ctx, cnc = context.WithTimeout(context.Background(), c.timeout)
95 } else {
96 ctx, cnc = context.WithCancel(octx)
97 }
98 return ctx, cnc
99 }
100
101
102 func (c *contextBuilder) ContextWithAutoCancel() context.Context {
103 res, cnc := c.Context()
104 if c.timeout != 0 && cnc != nil {
105 go autocanceler(c.timeout, cnc)
106 }
107 return res
108 }
109 func autocanceler(t time.Duration, cf context.CancelFunc) {
110 time.Sleep(t)
111 cf()
112 }
113
114
117 func (c *contextBuilder) WithUser(user *auth.SignedUser) {
118 panic_if_service_account(common.VerifySignedUser(user))
119 c.ge_context.ImCtx.User = user
120 }
121 func (c *contextBuilder) WithSudoUser(user *auth.SignedUser) {
122 panic_if_service_account(common.VerifySignedUser(user))
123 c.ge_context.ImCtx.SudoUser = user
124 }
125
126
129 func (c *contextBuilder) WithCreatorService(user *auth.SignedUser) {
130 if user != nil {
131 c.ge_context.ImCtx.CreatorService = user
132 }
133 }
134
135
138 func (c *contextBuilder) WithCallingService(user *auth.SignedUser) {
139 c.ge_context.MCtx.CallingService = user
140 }
141
142
145 func (c *contextBuilder) WithSession(sess *session.Session) {
146 c.ge_context.ImCtx.Session = sess
147 }
148
149
150 func (c *contextBuilder) WithDebug() {
151 c.ge_context.MCtx.Debug = true
152 }
153
154
155 func (c *contextBuilder) WithTrace() {
156 c.ge_context.MCtx.Trace = true
157 }
158 func (c *contextBuilder) EnableExperiment(name string) {
159 for _, e := range c.ge_context.MCtx.Experiments {
160 if e.Name == name {
161 return
162 }
163 }
164 c.ge_context.MCtx.Experiments = append(c.ge_context.MCtx.Experiments, &ge.Experiment{Name: name})
165 }
166 func (c *contextBuilder) WithRoutingTags(tags *ge.CTXRoutingTags) {
167 c.ge_context.MCtx.Tags = tags
168 }
169 func (c *contextBuilder) WithRequestID(reqid string) {
170 c.ge_context.ImCtx.RequestID = reqid
171 }
172 func (c *contextBuilder) WithParentContext(context context.Context) {
173 c.parent = context
174 c.got_parent = true
175 }
176 func (c *contextBuilder) WithTimeout(t time.Duration) {
177 c.timeout = t
178 }
179 func (c *contextBuilder) WithAuthTag(tag string) {
180 c.ge_context.ImCtx.AuthTags = append(c.ge_context.ImCtx.AuthTags, tag)
181 }
182 func (c *contextBuilder) newLocalState() *localState {
183 ls := &localState{builder: c}
184 return ls
185 }
186
187 func (c *contextBuilder) Inbound2Outbound(ctx context.Context, svc *auth.SignedUser) (context.Context, bool) {
188 cmdline.DebugfContext("v2 Inbound2Outbound()...\n")
189 if svc == nil {
190 cmdline.DebugfContext("WARNING - inbound2outbound called but not within a service (service==nil)\n")
191 }
192 md, ex := metadata.FromIncomingContext(ctx)
193 if !ex {
194
195 cmdline.DebugfContext("v2 Inbound2Outbound() -> no metadata...\n")
196 return nil, false
197 }
198 mdas, fd := md[METANAME]
199 if !fd || mdas == nil || len(mdas) != 1 {
200
201 cmdline.DebugfContext("v2 Inbound2Outbound() -> metadata without our key...\n")
202 return nil, false
203 }
204 mds := mdas[0]
205 res := &ge.InContext{}
206 err := utils.Unmarshal(mds, res)
207 if err != nil {
208 fmt.Printf("[go-easyops] warning invalid inbound v2 context (%s)\n", err)
209 return nil, false
210 }
211
212 calling_me := res.MCtx.CallingService
213
218 cmdline.DebugfContext("Unmarshalled context:\n%s\n", shared.ContextProto2string(" ", res))
219
220 cb := NewContextBuilder()
221 cb.ge_context = res
222
223 if svc != nil {
224 cb.ge_context.MCtx.CallingService = svc
225 svcu := common.VerifySignedUser(svc)
226 cb.ge_context.MCtx.Services = append(cb.ge_context.MCtx.Services, &ge.ServiceTrace{ID: svcu.ID})
227 cmdline.DebugfContext("added service \"%s\" to list of services\n", svcu.ID)
228 }
229
230 cb.WithParentContext(ctx)
231 ctx, _, ls := cb.contextWithLocalState()
232
233 ls.callingservice = calling_me
234 panic_if_service_account(common.VerifySignedUser(res.ImCtx.User))
235 return ctx, true
236 }
237 func NewContextBuilder() *contextBuilder {
238 cb := &contextBuilder{ge_context: &ge.InContext{
239 ImCtx: &ge.ImmutableContext{},
240 MCtx: &ge.MutableContext{},
241 }}
242 for _, ex := range cmdline.EnabledExperiments() {
243 cb.EnableExperiment(ex)
244 }
245 return cb
246 }
247
248 func metadata_to_ctx(md metadata.MD, found bool) (*ge.InContext, error) {
249 if !found {
250 return nil, nil
251 }
252 mdas, fd := md[METANAME]
253 if !fd || mdas == nil || len(mdas) != 1 {
254
255 return nil, nil
256 }
257 mds := mdas[0]
258 res := &ge.InContext{}
259 err := utils.Unmarshal(mds, res)
260 if err != nil {
261
262 return nil, err
263 }
264 panic_if_service_account(common.VerifySignedUser(res.ImCtx.User))
265 return res, nil
266
267 }
268 func get_metadata(ctx context.Context) (*ge.InContext, error) {
269 ic, err := metadata_to_ctx(metadata.FromIncomingContext(ctx))
270 if err == nil && ic != nil {
271 return ic, nil
272 }
273 ic, err = metadata_to_ctx(metadata.FromOutgoingContext(ctx))
274 return ic, err
275 }
276
277
280 func Serialise(ctx context.Context) ([]byte, error) {
281 ls := shared.GetLocalState(ctx)
282
283 ic := &ge.InContext{
284 ImCtx: &ge.ImmutableContext{
285 User: ls.User(),
286 SudoUser: ls.SudoUser(),
287 CreatorService: ls.CreatorService(),
288 RequestID: ls.RequestID(),
289 Session: ls.Session(),
290 AuthTags: ls.AuthTags(),
291 },
292 MCtx: &ge.MutableContext{
293 CallingService: ls.CallingService(),
294 Debug: ls.Debug(),
295 Trace: ls.Trace(),
296 Tags: ls.RoutingTags(),
297 Experiments: ls.Experiments(),
298 Services: ls.Services(),
299 },
300 }
301
302 panic_if_service_account(common.VerifySignedUser(ic.ImCtx.User))
303 var b []byte
304 var err error
305 b, err = utils.MarshalBytes(ic)
306 if err != nil {
307 return nil, err
308 }
309
310 prefix := ser_prefix
311 b = append(prefix, b...)
312 return b, nil
313 }
314
315
330 func DeserialiseContextWithTimeout(t time.Duration, buf []byte) (context.Context, error) {
331 if len(buf) < len(ser_prefix) {
332 return nil, fmt.Errorf("v1 context too short to deserialise (len=%d)", len(buf))
333 }
334 for i, b := range ser_prefix {
335 if buf[i] != b {
336 show := buf
337 if len(show) > 18 {
338 show = show[:18]
339 }
340 fmt.Printf("\nEXPECTED: %s\n", utils.HexStr(ser_prefix))
341 fmt.Printf("GOT : %s\n", utils.HexStr(buf))
342 return nil, fmt.Errorf("v2 context has invalid prefix at pos %d (first 10 bytes: %s)", i, utils.HexStr(show))
343 }
344 }
345 ud := buf[len(ser_prefix):]
346 cmdline.DebugfContext("a v2deserialise: %s", utils.HexStr(buf))
347 cmdline.DebugfContext("b v2deserialise: %s", utils.HexStr(ud))
348 ic := &ge.InContext{}
349 err := utils.UnmarshalBytes(ud, ic)
350 if err != nil {
351 return nil, err
352 }
353 cb := NewContextBuilder()
354
355 if ic.ImCtx != nil {
356 panic_if_service_account(common.VerifySignedUser(ic.ImCtx.User))
357 cb.ge_context.ImCtx = ic.ImCtx
358 } else {
359 panic("no imctx")
360 }
361 if ic.MCtx != nil {
362 cb.ge_context.MCtx = ic.MCtx
363 }
364 cb.WithTimeout(t)
365 return cb.ContextWithAutoCancel(), nil
366 }
367
368 func panic_if_service_account(u *auth.User) {
369 if u == nil {
370 return
371 }
372 if u.ServiceAccount {
373 if *do_panic {
374 panic(fmt.Sprintf("attempt to build context with serviceaccount as user %s (%s)", u.ID, u.Email))
375 }
376 fmt.Printf("[go-easyops] WARNING -- creating context with user as serviceaccount (%s) (%s)\n", u.ID, u.Email)
377 }
378 }
379
View as plain text