...
1 package utils
2
3 import (
4 "bytes"
5 "fmt"
6 "os"
7 "path/filepath"
8 "reflect"
9 "strings"
10 "sync"
11 )
12
13 type ByteStreamReceiver struct {
14 sync.Mutex
15 open_files map[string]*open_file
16 last_file *open_file
17 path string
18 custom_function func(filename string, content []byte) error
19 files map[string]uint64
20 }
21
22
23 type StreamData interface {
24 GetFilename() string
25 GetData() []byte
26 }
27
28
29
30 func NewByteStreamReceiverWithFunction(newfile func(filename string, content []byte) error) *ByteStreamReceiver {
31 res := NewByteStreamReceiver("")
32 res.custom_function = newfile
33 return res
34 }
35
36 func NewByteStreamReceiver(path string) *ByteStreamReceiver {
37 p, err := filepath.Abs(path)
38 if err != nil {
39 fmt.Printf("[go-easyops] byte-stream receiver failed filepath.Abs(%s): %s", path, err)
40 return nil
41 }
42 for strings.HasSuffix(p, "/") {
43 p = p[:len(p)-1]
44 }
45 res := &ByteStreamReceiver{
46 path: p,
47 open_files: make(map[string]*open_file),
48 files: make(map[string]uint64),
49 }
50 return res
51 }
52 func (bsr *ByteStreamReceiver) Files() map[string]uint64 {
53 return bsr.files
54 }
55
56
57 func (bsr *ByteStreamReceiver) NewData(data StreamData) error {
58 if data == nil || reflect.ValueOf(data).IsNil() {
59 return nil
60 }
61 write_to := bsr.last_file
62 if data.GetFilename() != "" {
63
64 write_to = bsr.get_file_by_name(data.GetFilename())
65 bsr.last_file = write_to
66 err := write_to.Write(bsr.path, make([]byte, 0))
67 if err != nil {
68 return err
69 }
70 }
71 if len(data.GetData()) == 0 {
72 return nil
73 }
74 if write_to == nil {
75 return fmt.Errorf("premature data received without filename")
76 }
77 b := data.GetData()
78 err := write_to.Write(bsr.path, b)
79 if err != nil {
80 return err
81 }
82 return nil
83 }
84
85
86 func (bsr *ByteStreamReceiver) FileCount() int {
87 bsr.Lock()
88 defer bsr.Unlock()
89 return len(bsr.open_files)
90 }
91 func (bsr *ByteStreamReceiver) Close() error {
92 bsr.Lock()
93 defer bsr.Unlock()
94 var err error
95 for _, of := range bsr.open_files {
96 xerr := of.Close()
97 if xerr != nil {
98 err = xerr
99 }
100 }
101 return err
102 }
103
104 func (bsr *ByteStreamReceiver) get_file_by_name(name string) *open_file {
105 bsr.Lock()
106 defer bsr.Unlock()
107 of, fd := bsr.open_files[name]
108 if fd {
109 return of
110 }
111 of = &open_file{bsr: bsr, filename: name, content: &bytes.Buffer{}}
112 bsr.open_files[name] = of
113 return of
114
115 }
116 func (bsr *ByteStreamReceiver) TotalBytesReceived() uint64 {
117 bsr.Lock()
118 defer bsr.Unlock()
119 res := uint64(0)
120 for _, of := range bsr.open_files {
121 res = res + of.size
122 }
123 return res
124 }
125
126 type open_file struct {
127 bsr *ByteStreamReceiver
128 filename string
129 size uint64
130 fd *os.File
131 content *bytes.Buffer
132 }
133
134 func (of *open_file) Write(path string, buf []byte) error {
135 if of.bsr.custom_function != nil {
136 of.size = of.size + uint64(len(buf))
137 _, err := of.content.Write(buf)
138 if err != nil {
139 return err
140 }
141 }
142 if of.bsr.path != "" {
143 if of.fd == nil {
144 if strings.Contains(of.filename, "..") {
145 return fmt.Errorf("Error: filename contains '..'")
146 }
147 os.MkdirAll(filepath.Dir(path+"/"+of.filename), 0777)
148 f, err := os.Create(path + "/" + of.filename)
149 if err != nil {
150 return err
151 }
152 of.fd = f
153 }
154 of.size = of.size + uint64(len(buf))
155 n, err := of.fd.Write(buf)
156 if n != len(buf) {
157 return fmt.Errorf("short write")
158 }
159 if err != nil {
160 return err
161 }
162 }
163 return nil
164 }
165 func (of *open_file) Close() error {
166 of.bsr.files[of.filename] = of.size
167 cf := of.bsr.custom_function
168 if cf != nil {
169 err := cf(of.filename, of.content.Bytes())
170 if err != nil {
171 return err
172 }
173 }
174 if of.fd != nil {
175 err := of.fd.Close()
176 of.fd = nil
177 return err
178 }
179 return nil
180 }
181
View as plain text