...
1 package utils
2
3 import (
4 "fmt"
5 "io"
6 "sync"
7 )
8
9
26 func NewPacketReader(r io.Reader, start, escape, stop byte) (*PacketReader, error) {
27 err := check_valid(start, escape, stop)
28 if err != nil {
29 return nil, err
30 }
31 res := &PacketReader{
32 start: start,
33 stop: stop,
34 esc: escape,
35 debug: *debug_packetizer,
36 reader: r,
37 packet_chan: make(chan *readerEvent, 10),
38 }
39 go res.packet_reader_loop()
40 return res, nil
41 }
42
43 type PacketReader struct {
44 start byte
45 stop byte
46 esc byte
47 reader io.Reader
48 debug bool
49 packet_chan chan *readerEvent
50 reader_error error
51 stopped bool
52 stop_lock sync.Mutex
53 }
54
55 type readerEvent struct {
56 payload []byte
57 err error
58 }
59
60
61 func (pr *PacketReader) Read(buf []byte) (int, error) {
62 if pr.reader_error != nil {
63 return 0, pr.reader_error
64 }
65
66 packet := <-pr.packet_chan
67 if packet.err != nil {
68 pr.reader_error = packet.err
69 return 0, pr.reader_error
70 }
71 n := len(packet.payload)
72 if n > len(buf) {
73 return 0, fmt.Errorf("packetreader buf too small. got %d bytes, but need at least %d bytes", len(buf), n)
74 }
75 for i := 0; i < n; i++ {
76 buf[i] = packet.payload[i]
77 }
78 pr.debugf("Got packet %d bytes\n", n)
79 return n, nil
80 }
81
82
83 func (pr *PacketReader) Close() {
84 if pr.stopped {
85 return
86 }
87 rc, cast := pr.reader.(io.ReadCloser)
88 if cast {
89 rc.Close()
90 }
91 pr.packet_chan <- &readerEvent{err: io.EOF}
92 pr.stopped = true
93 pr.stop_lock.Lock()
94 if !pr.stopped {
95 close(pr.packet_chan)
96 pr.stopped = true
97 }
98 pr.stop_lock.Unlock()
99 }
100
101
102 func (pr *PacketReader) packet_reader_loop() {
103 buf := make([]byte, 200)
104 stp, err := NewStreamToPacket(pr.start, pr.esc, pr.stop)
105 if err != nil {
106 pr.reader_error = err
107 pr.packet_chan <- &readerEvent{err: err}
108 return
109 }
110
111 for {
112 if pr.stopped {
113 break
114 }
115 n, err := pr.reader.Read(buf)
116 if pr.stopped {
117 break
118 }
119 if n > 0 {
120 payload := buf[:n]
121 pr.debugf("Read %d bytes:\n%s\n", n, Hexdump("payload: ", payload))
122 for _, b := range payload {
123 if stp.AddByte(b) {
124 pkt := stp.ReadPacket()
125 pr.debugf("%s\n", Hexdump("packet:", pkt))
126 pr.stop_lock.Lock()
127 if pr.stopped {
128 pr.stop_lock.Unlock()
129 break
130 }
131 pr.packet_chan <- &readerEvent{payload: pkt}
132 pr.stop_lock.Unlock()
133 }
134 }
135 }
136
137 if err != nil {
138 pr.stop_lock.Lock()
139 if pr.stopped {
140 pr.stop_lock.Unlock()
141 break
142 }
143 pr.packet_chan <- &readerEvent{err: err}
144 pr.stop_lock.Unlock()
145 break
146 }
147 }
148 pr.debugf("read_loop stopped\n")
149 }
150
151 func (stp *PacketReader) debugf(format string, args ...interface{}) {
152 if !stp.debug {
153 return
154 }
155 s := fmt.Sprintf(format, args...)
156 fmt.Printf("[packetreader] %s", s)
157 }
158
View as plain text