...

Source file src/golang.conradwood.net/go-easyops/utils/packet_reader.go

Documentation: golang.conradwood.net/go-easyops/utils

     1  package utils
     2  
     3  import (
     4  	"fmt"
     5  	"io"
     6  	"sync"
     7  )
     8  
     9  /*
    10  This is part of the Packetizer toolset.
    11  
    12  A PacketReader reads from any stream (specifically, an io.Reader). It reads from the stream until any one of the following conditions occur:
    13  
    14   1. the io.Reader signals io.EOF
    15   2. the io.Reader signals some other error
    16   3. a complete packet, starting with the start byte and ending with the stop byte has been received through the io.Reader
    17   4. the start of a packet has been read, but its size exceeds the buffer size (specifically: the of the array passed to PacketReader.Read([]byte))
    18  
    19  A packet is defined as the data between start and stop byte. As a consequence, Any start and stop bytes in the payload must be
    20  escaped with the escape byte
    21  
    22  The data returned by the packet reader is guaranteed to be equal to the payload send by the PacketWriter. In other words:
    23  any escaping and wrapping and unescaping and unwrapping is handled by the reader and writer.
    24  see also [NewPacketWriter]
    25  */
    26  func NewPacketReader(r io.Reader, start, escape, stop byte) (*PacketReader, error) {
    27  	err := check_valid(start, escape, stop)
    28  	if err != nil {
    29  		return nil, err
    30  	}
    31  	res := &PacketReader{
    32  		start:       start,
    33  		stop:        stop,
    34  		esc:         escape,
    35  		debug:       *debug_packetizer,
    36  		reader:      r,
    37  		packet_chan: make(chan *readerEvent, 10),
    38  	}
    39  	go res.packet_reader_loop()
    40  	return res, nil
    41  }
    42  
    43  type PacketReader struct {
    44  	start        byte
    45  	stop         byte
    46  	esc          byte
    47  	reader       io.Reader
    48  	debug        bool
    49  	packet_chan  chan *readerEvent
    50  	reader_error error // any error the io.reader has returned
    51  	stopped      bool
    52  	stop_lock    sync.Mutex
    53  }
    54  
    55  type readerEvent struct {
    56  	payload []byte
    57  	err     error
    58  }
    59  
    60  // this will return the number of bytes or an error. it is guaranteed to either return an error OR a non-zero number of bytes, but never both
    61  func (pr *PacketReader) Read(buf []byte) (int, error) {
    62  	if pr.reader_error != nil {
    63  		return 0, pr.reader_error
    64  	}
    65  
    66  	packet := <-pr.packet_chan
    67  	if packet.err != nil {
    68  		pr.reader_error = packet.err
    69  		return 0, pr.reader_error
    70  	}
    71  	n := len(packet.payload)
    72  	if n > len(buf) {
    73  		return 0, fmt.Errorf("packetreader buf too small. got %d bytes, but need at least %d bytes", len(buf), n)
    74  	}
    75  	for i := 0; i < n; i++ {
    76  		buf[i] = packet.payload[i]
    77  	}
    78  	pr.debugf("Got packet %d bytes\n", n)
    79  	return n, nil
    80  }
    81  
    82  // closes underlying reader as well AND aborts a current read
    83  func (pr *PacketReader) Close() {
    84  	if pr.stopped {
    85  		return
    86  	}
    87  	rc, cast := pr.reader.(io.ReadCloser)
    88  	if cast {
    89  		rc.Close()
    90  	}
    91  	pr.packet_chan <- &readerEvent{err: io.EOF}
    92  	pr.stopped = true
    93  	pr.stop_lock.Lock()
    94  	if !pr.stopped {
    95  		close(pr.packet_chan)
    96  		pr.stopped = true
    97  	}
    98  	pr.stop_lock.Unlock()
    99  }
   100  
   101  // copy from io.reader to packets
   102  func (pr *PacketReader) packet_reader_loop() {
   103  	buf := make([]byte, 200)
   104  	stp, err := NewStreamToPacket(pr.start, pr.esc, pr.stop)
   105  	if err != nil {
   106  		pr.reader_error = err
   107  		pr.packet_chan <- &readerEvent{err: err}
   108  		return
   109  	}
   110  
   111  	for {
   112  		if pr.stopped {
   113  			break
   114  		}
   115  		n, err := pr.reader.Read(buf)
   116  		if pr.stopped {
   117  			break
   118  		}
   119  		if n > 0 {
   120  			payload := buf[:n]
   121  			pr.debugf("Read %d bytes:\n%s\n", n, Hexdump("payload: ", payload))
   122  			for _, b := range payload {
   123  				if stp.AddByte(b) {
   124  					pkt := stp.ReadPacket()
   125  					pr.debugf("%s\n", Hexdump("packet:", pkt))
   126  					pr.stop_lock.Lock()
   127  					if pr.stopped {
   128  						pr.stop_lock.Unlock()
   129  						break
   130  					}
   131  					pr.packet_chan <- &readerEvent{payload: pkt}
   132  					pr.stop_lock.Unlock()
   133  				}
   134  			}
   135  		}
   136  
   137  		if err != nil {
   138  			pr.stop_lock.Lock()
   139  			if pr.stopped {
   140  				pr.stop_lock.Unlock()
   141  				break
   142  			}
   143  			pr.packet_chan <- &readerEvent{err: err}
   144  			pr.stop_lock.Unlock()
   145  			break
   146  		}
   147  	}
   148  	pr.debugf("read_loop stopped\n")
   149  }
   150  
   151  func (stp *PacketReader) debugf(format string, args ...interface{}) {
   152  	if !stp.debug {
   153  		return
   154  	}
   155  	s := fmt.Sprintf(format, args...)
   156  	fmt.Printf("[packetreader] %s", s)
   157  }
   158  

View as plain text