...

Source file src/golang.conradwood.net/go-easyops/utils/stream_sender.go

Documentation: golang.conradwood.net/go-easyops/utils

     1  package utils
     2  
     3  import (
     4  	"fmt"
     5  )
     6  
     7  type Send_data_type func(b []byte) error
     8  type Send_new_file_type func(key, filename string) error
     9  
    10  // sends a bunch of bytes down a grpc stream
    11  type ByteStreamSender struct {
    12  	new_file     Send_new_file_type
    13  	send_data    Send_data_type
    14  	file_counter int
    15  }
    16  
    17  /*
    18  create a new bytestream sender.
    19  (key and filename are opaque to this sender)
    20  f1 - a function that sends a message on the stream to indicate start of a new file and key
    21  f2 - a function that sends a bunch of data on the stream
    22  
    23  once stream sender is created, call SendBytes with filename and content.
    24  it will break the file into small pieces and call f2() with small arrays suitable for sending in a packet
    25  */
    26  func NewByteStreamSender(f1 Send_new_file_type, f2 Send_data_type) *ByteStreamSender {
    27  	res := &ByteStreamSender{
    28  		new_file:  f1,
    29  		send_data: f2,
    30  	}
    31  	return res
    32  }
    33  
    34  // how many files were sent?
    35  func (bss *ByteStreamSender) FileCount() int {
    36  	return bss.file_counter
    37  }
    38  
    39  func (bss *ByteStreamSender) SendBytes(key, filename string, b []byte) error {
    40  	err := bss.new_file(key, filename)
    41  	if err != nil {
    42  		return err
    43  	}
    44  	bss.file_counter++
    45  	size := 8192
    46  	offset := 0
    47  	for {
    48  		if size+offset > len(b) {
    49  			size = len(b) - offset
    50  		}
    51  		if size == 0 {
    52  			break
    53  		}
    54  		//bss.debugf("Sending %s [%d - %d]\n", filename, offset, size)
    55  		err := bss.send_data(b[offset : offset+size])
    56  		if err != nil {
    57  			return err
    58  		}
    59  		offset = offset + size
    60  	}
    61  	return nil
    62  }
    63  
    64  func (bss *ByteStreamSender) debugf(format string, args ...interface{}) {
    65  	fmt.Printf("[bss] "+format, args...)
    66  }
    67  

View as plain text