...

Source file src/golang.conradwood.net/go-easyops/objectstore/objectstore.go

Documentation: golang.conradwood.net/go-easyops/objectstore

     1  /*
     2  wrapper around objectstore service.
     3  */
     4  package objectstore
     5  
     6  import (
     7  	"context"
     8  	"github.com/golang/protobuf/proto"
     9  	os "golang.conradwood.net/apis/objectstore"
    10  	"golang.conradwood.net/go-easyops/client"
    11  	"golang.conradwood.net/go-easyops/errors"
    12  	"golang.conradwood.net/go-easyops/utils"
    13  	"io"
    14  	"time"
    15  )
    16  
    17  var (
    18  	ostore os.ObjectStoreClient
    19  )
    20  
    21  // store a proto in the objectstore
    22  func StoreProto(ctx context.Context, key string, p proto.Message) error {
    23  	b, err := utils.MarshalBytes(p)
    24  	if err != nil {
    25  		return err
    26  	}
    27  	return PutWithID(ctx, key, b)
    28  }
    29  
    30  // retrieve a proto in the objectstore
    31  func RetrieveProto(ctx context.Context, key string, p proto.Message) error {
    32  	b, err := Get(ctx, key)
    33  	if err != nil {
    34  		return err
    35  	}
    36  	err = utils.UnmarshalBytes(b, p)
    37  	if err != nil {
    38  		return err
    39  	}
    40  	return nil
    41  }
    42  
    43  func getostore() {
    44  	if ostore != nil {
    45  		return
    46  	}
    47  	ostore = os.NewObjectStoreClient(client.Connect("objectstore.ObjectStore"))
    48  }
    49  
    50  func PutWithID(ctx context.Context, key string, buf []byte) error {
    51  	if key == "" {
    52  		return errors.InvalidArgs(ctx, "missing key to store in objectstore", "missing key to store in objectstore")
    53  	}
    54  	return PutWithIDAndExpiry(ctx, key, buf, time.Time{})
    55  }
    56  func PutWithIDAndExpiry(ctx context.Context, key string, buf []byte, expiry time.Time) error {
    57  	if key == "" {
    58  		return errors.InvalidArgs(ctx, "missing key to store in objectstore", "missing key to store in objectstore")
    59  	}
    60  	getostore()
    61  	stream, err := ostore.LPutWithID(ctx)
    62  	if err != nil {
    63  		return err
    64  	}
    65  	size := 8192
    66  	repeat := true
    67  	offset := 0
    68  	for repeat {
    69  		if offset+size > len(buf) {
    70  			size = len(buf) - offset
    71  			repeat = false
    72  		}
    73  		n := buf[offset : offset+size]
    74  		offset = offset + size
    75  		pwr := &os.PutWithIDRequest{ID: key, Content: n}
    76  		if !expiry.IsZero() {
    77  			pwr.Expiry = uint32(expiry.Unix())
    78  		}
    79  		err := stream.Send(pwr)
    80  		if err != nil {
    81  			return err
    82  		}
    83  	}
    84  	_, err = stream.CloseAndRecv()
    85  	if err != nil {
    86  		return err
    87  	}
    88  	return err
    89  }
    90  
    91  // evict (remove) an object from the objectstore by key
    92  func Evict(ctx context.Context, key string) ([]byte, error) {
    93  	if key == "" {
    94  		return nil, errors.InvalidArgs(ctx, "missing key to evict from objectstore", "missing key to evict from objectstore")
    95  	}
    96  	getostore()
    97  	gr := &os.EvictRequest{ID: key}
    98  	_, err := ostore.Evict(ctx, gr)
    99  	return nil, err
   100  }
   101  
   102  // get an object from the objectstore by key
   103  func Get(ctx context.Context, key string) ([]byte, error) {
   104  	if key == "" {
   105  		return nil, errors.InvalidArgs(ctx, "missing key to retrieve from objectstore", "missing key to retrieve from objectstore")
   106  	}
   107  	getostore()
   108  	gr := &os.GetRequest{ID: key}
   109  	stream, err := ostore.LGet(ctx, gr)
   110  	if err != nil {
   111  		return nil, err
   112  	}
   113  	var buf []byte
   114  	for {
   115  		ct, err := stream.Recv()
   116  		if err == nil {
   117  			buf = append(buf, ct.Content...)
   118  			continue
   119  		}
   120  		if err == io.EOF {
   121  			break
   122  		}
   123  		return nil, err
   124  
   125  	}
   126  	return buf, nil
   127  }
   128  

View as plain text