1
4 package objectstore
5
6 import (
7 "context"
8 "github.com/golang/protobuf/proto"
9 os "golang.conradwood.net/apis/objectstore"
10 "golang.conradwood.net/go-easyops/client"
11 "golang.conradwood.net/go-easyops/errors"
12 "golang.conradwood.net/go-easyops/utils"
13 "io"
14 "time"
15 )
16
17 var (
18 ostore os.ObjectStoreClient
19 )
20
21
22 func StoreProto(ctx context.Context, key string, p proto.Message) error {
23 b, err := utils.MarshalBytes(p)
24 if err != nil {
25 return err
26 }
27 return PutWithID(ctx, key, b)
28 }
29
30
31 func RetrieveProto(ctx context.Context, key string, p proto.Message) error {
32 b, err := Get(ctx, key)
33 if err != nil {
34 return err
35 }
36 err = utils.UnmarshalBytes(b, p)
37 if err != nil {
38 return err
39 }
40 return nil
41 }
42
43 func getostore() {
44 if ostore != nil {
45 return
46 }
47 ostore = os.NewObjectStoreClient(client.Connect("objectstore.ObjectStore"))
48 }
49
50 func PutWithID(ctx context.Context, key string, buf []byte) error {
51 if key == "" {
52 return errors.InvalidArgs(ctx, "missing key to store in objectstore", "missing key to store in objectstore")
53 }
54 return PutWithIDAndExpiry(ctx, key, buf, time.Time{})
55 }
56 func PutWithIDAndExpiry(ctx context.Context, key string, buf []byte, expiry time.Time) error {
57 if key == "" {
58 return errors.InvalidArgs(ctx, "missing key to store in objectstore", "missing key to store in objectstore")
59 }
60 getostore()
61 stream, err := ostore.LPutWithID(ctx)
62 if err != nil {
63 return err
64 }
65 size := 8192
66 repeat := true
67 offset := 0
68 for repeat {
69 if offset+size > len(buf) {
70 size = len(buf) - offset
71 repeat = false
72 }
73 n := buf[offset : offset+size]
74 offset = offset + size
75 pwr := &os.PutWithIDRequest{ID: key, Content: n}
76 if !expiry.IsZero() {
77 pwr.Expiry = uint32(expiry.Unix())
78 }
79 err := stream.Send(pwr)
80 if err != nil {
81 return err
82 }
83 }
84 _, err = stream.CloseAndRecv()
85 if err != nil {
86 return err
87 }
88 return err
89 }
90
91
92 func Evict(ctx context.Context, key string) ([]byte, error) {
93 if key == "" {
94 return nil, errors.InvalidArgs(ctx, "missing key to evict from objectstore", "missing key to evict from objectstore")
95 }
96 getostore()
97 gr := &os.EvictRequest{ID: key}
98 _, err := ostore.Evict(ctx, gr)
99 return nil, err
100 }
101
102
103 func Get(ctx context.Context, key string) ([]byte, error) {
104 if key == "" {
105 return nil, errors.InvalidArgs(ctx, "missing key to retrieve from objectstore", "missing key to retrieve from objectstore")
106 }
107 getostore()
108 gr := &os.GetRequest{ID: key}
109 stream, err := ostore.LGet(ctx, gr)
110 if err != nil {
111 return nil, err
112 }
113 var buf []byte
114 for {
115 ct, err := stream.Recv()
116 if err == nil {
117 buf = append(buf, ct.Content...)
118 continue
119 }
120 if err == io.EOF {
121 break
122 }
123 return nil, err
124
125 }
126 return buf, nil
127 }
128
View as plain text