...

Source file src/golang.conradwood.net/go-easyops/utils/stream_receiver.go

Documentation: golang.conradwood.net/go-easyops/utils

     1  package utils
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"os"
     7  	"path/filepath"
     8  	"reflect"
     9  	"strings"
    10  	"sync"
    11  )
    12  
    13  type ByteStreamReceiver struct {
    14  	sync.Mutex
    15  	open_files      map[string]*open_file
    16  	last_file       *open_file
    17  	path            string
    18  	custom_function func(filename string, content []byte) error
    19  	files           map[string]uint64 //filename->size
    20  }
    21  
    22  // the proto must be compatible with this interface
    23  type StreamData interface {
    24  	GetFilename() string
    25  	GetData() []byte
    26  }
    27  
    28  // report each file to the function. This usually happens when Close() is called, prior to that the
    29  // receiver has no means of telling if the file is completed. Perhaps a flaw in the protocol?
    30  func NewByteStreamReceiverWithFunction(newfile func(filename string, content []byte) error) *ByteStreamReceiver {
    31  	res := NewByteStreamReceiver("")
    32  	res.custom_function = newfile
    33  	return res
    34  }
    35  
    36  func NewByteStreamReceiver(path string) *ByteStreamReceiver {
    37  	p, err := filepath.Abs(path)
    38  	if err != nil {
    39  		fmt.Printf("[go-easyops] byte-stream receiver failed filepath.Abs(%s): %s", path, err)
    40  		return nil
    41  	}
    42  	for strings.HasSuffix(p, "/") {
    43  		p = p[:len(p)-1]
    44  	}
    45  	res := &ByteStreamReceiver{
    46  		path:       p,
    47  		open_files: make(map[string]*open_file),
    48  		files:      make(map[string]uint64),
    49  	}
    50  	return res
    51  }
    52  func (bsr *ByteStreamReceiver) Files() map[string]uint64 {
    53  	return bsr.files
    54  }
    55  
    56  // the result of srv.Recv()
    57  func (bsr *ByteStreamReceiver) NewData(data StreamData) error {
    58  	if data == nil || reflect.ValueOf(data).IsNil() {
    59  		return nil
    60  	}
    61  	write_to := bsr.last_file
    62  	if data.GetFilename() != "" {
    63  		//		fmt.Printf("Receiving: \"%s\"\n", data.GetFilename())
    64  		write_to = bsr.get_file_by_name(data.GetFilename())
    65  		bsr.last_file = write_to
    66  		err := write_to.Write(bsr.path, make([]byte, 0)) //create file
    67  		if err != nil {
    68  			return err
    69  		}
    70  	}
    71  	if len(data.GetData()) == 0 {
    72  		return nil
    73  	}
    74  	if write_to == nil {
    75  		return fmt.Errorf("premature data received without filename")
    76  	}
    77  	b := data.GetData()
    78  	err := write_to.Write(bsr.path, b)
    79  	if err != nil {
    80  		return err
    81  	}
    82  	return nil
    83  }
    84  
    85  // how many files were retrieved?
    86  func (bsr *ByteStreamReceiver) FileCount() int {
    87  	bsr.Lock()
    88  	defer bsr.Unlock()
    89  	return len(bsr.open_files)
    90  }
    91  func (bsr *ByteStreamReceiver) Close() error {
    92  	bsr.Lock()
    93  	defer bsr.Unlock()
    94  	var err error
    95  	for _, of := range bsr.open_files {
    96  		xerr := of.Close()
    97  		if xerr != nil {
    98  			err = xerr
    99  		}
   100  	}
   101  	return err
   102  }
   103  
   104  func (bsr *ByteStreamReceiver) get_file_by_name(name string) *open_file {
   105  	bsr.Lock()
   106  	defer bsr.Unlock()
   107  	of, fd := bsr.open_files[name]
   108  	if fd {
   109  		return of
   110  	}
   111  	of = &open_file{bsr: bsr, filename: name, content: &bytes.Buffer{}}
   112  	bsr.open_files[name] = of
   113  	return of
   114  
   115  }
   116  func (bsr *ByteStreamReceiver) TotalBytesReceived() uint64 {
   117  	bsr.Lock()
   118  	defer bsr.Unlock()
   119  	res := uint64(0)
   120  	for _, of := range bsr.open_files {
   121  		res = res + of.size
   122  	}
   123  	return res
   124  }
   125  
   126  type open_file struct {
   127  	bsr      *ByteStreamReceiver
   128  	filename string
   129  	size     uint64
   130  	fd       *os.File
   131  	content  *bytes.Buffer
   132  }
   133  
   134  func (of *open_file) Write(path string, buf []byte) error {
   135  	if of.bsr.custom_function != nil {
   136  		of.size = of.size + uint64(len(buf))
   137  		_, err := of.content.Write(buf)
   138  		if err != nil {
   139  			return err
   140  		}
   141  	}
   142  	if of.bsr.path != "" {
   143  		if of.fd == nil {
   144  			if strings.Contains(of.filename, "..") {
   145  				return fmt.Errorf("Error: filename contains '..'")
   146  			}
   147  			os.MkdirAll(filepath.Dir(path+"/"+of.filename), 0777)
   148  			f, err := os.Create(path + "/" + of.filename)
   149  			if err != nil {
   150  				return err
   151  			}
   152  			of.fd = f
   153  		}
   154  		of.size = of.size + uint64(len(buf))
   155  		n, err := of.fd.Write(buf)
   156  		if n != len(buf) {
   157  			return fmt.Errorf("short write")
   158  		}
   159  		if err != nil {
   160  			return err
   161  		}
   162  	}
   163  	return nil
   164  }
   165  func (of *open_file) Close() error {
   166  	of.bsr.files[of.filename] = of.size
   167  	cf := of.bsr.custom_function
   168  	if cf != nil {
   169  		err := cf(of.filename, of.content.Bytes())
   170  		if err != nil {
   171  			return err
   172  		}
   173  	}
   174  	if of.fd != nil {
   175  		err := of.fd.Close()
   176  		of.fd = nil
   177  		return err
   178  	}
   179  	return nil
   180  }
   181  

View as plain text