aboutsummaryrefslogtreecommitdiff
path: root/stream_handler.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-03 10:47:23 +0000
committerEmiliano Ciavatta2020-04-03 10:47:23 +0000
commit9883cd346f694ad09aac839f9ddc4a25df0e0b0a (patch)
tree7a5bfdf282b273ffc410a9e8c758fb8db73072c7 /stream_handler.go
parentb02ee06a2dad56650f539f69df5660a88e442059 (diff)
Add connection_handler and stream_handler
Diffstat (limited to 'stream_handler.go')
-rw-r--r--stream_handler.go310
1 files changed, 140 insertions, 170 deletions
diff --git a/stream_handler.go b/stream_handler.go
index ad59856..80d91d6 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -1,206 +1,176 @@
-// Package tcpreader provides an implementation for tcpassembly.Stream which presents
-// the caller with an io.Reader for easy processing.
-//
-// The assembly package handles packet data reordering, but its output is
-// library-specific, thus not usable by the majority of external Go libraries.
-// The io.Reader interface, on the other hand, is used throughout much of Go
-// code as an easy mechanism for reading in data streams and decoding them. For
-// example, the net/http package provides the ReadRequest function, which can
-// parse an HTTP request from a live data stream, just what we'd want when
-// sniffing HTTP traffic. Using StreamHandler, this is relatively easy to set
-// up:
-//
-// // Create our StreamFactory
-// type httpStreamFactory struct {}
-// func (f *httpStreamFactory) New(a, b gopacket.Flow) {
-// r := tcpreader.NewReaderStream(false)
-// go printRequests(r)
-// return &r
-// }
-// func printRequests(r io.Reader) {
-// // Convert to bufio, since that's what ReadRequest wants.
-// buf := bufio.NewReader(r)
-// for {
-// if req, err := http.ReadRequest(buf); err == io.EOF {
-// return
-// } else if err != nil {
-// log.Println("Error parsing HTTP requests:", err)
-// } else {
-// fmt.Println("HTTP REQUEST:", req)
-// fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes")
-// }
-// }
-// }
-//
-// Using just this code, we're able to reference a powerful, built-in library
-// for HTTP request parsing to do all the dirty-work of parsing requests from
-// the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool,
-// start up an tcpassembly.Assembler, and you're good to go!
package main
import (
-"errors"
-"github.com/google/gopacket/tcpassembly"
-"io"
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "github.com/flier/gohs/hyperscan"
+ "github.com/google/gopacket/tcpassembly"
+ "log"
+ "time"
)
-var discardBuffer = make([]byte, 4096)
-
-// DiscardBytesToFirstError will read in all bytes up to the first error
-// reported by the given reader, then return the number of bytes discarded
-// and the error encountered.
-func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) {
- for {
- n, e := r.Read(discardBuffer)
- discarded += n
- if e != nil {
- return discarded, e
- }
- }
-}
+const MaxDocumentSize = 1024 * 1024
+const InitialBlockCount = 1024
+const InitialPatternSliceSize = 8
-// DiscardBytesToEOF will read in all bytes from a Reader until it
-// encounters an io.EOF, then return the number of bytes. Be careful
-// of this... if used on a Reader that returns a non-io.EOF error
-// consistently, this will loop forever discarding that error while
-// it waits for an EOF.
-func DiscardBytesToEOF(r io.Reader) (discarded int) {
- for {
- n, e := DiscardBytesToFirstError(r)
- discarded += n
- if e == io.EOF {
- return
- }
- }
-}
-
-// StreamHandler implements both tcpassembly.Stream and io.Reader. You can use it
-// as a building block to make simple, easy stream handlers.
-//
// IMPORTANT: If you use a StreamHandler, you MUST read ALL BYTES from it,
// quickly. Not reading available bytes will block TCP stream reassembly. It's
// a common pattern to do this by starting a goroutine in the factory's New
// method:
-//
-// type myStreamHandler struct {
-// r StreamHandler
-// }
-// func (m *myStreamHandler) run() {
-// // Do something here that reads all of the StreamHandler, or your assembly
-// // will block.
-// fmt.Println(tcpreader.DiscardBytesToEOF(&m.r))
-// }
-// func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
-// s := &myStreamHandler{}
-// go s.run()
-// // Return the StreamHandler as the stream that assembly should populate.
-// return &s.r
-// }
type StreamHandler struct {
- ReaderStreamOptions
- reassembled chan []tcpassembly.Reassembly
- done chan bool
- current []tcpassembly.Reassembly
- closed bool
- lossReported bool
- first bool
- initiated bool
+ connection ConnectionHandler
+ streamKey StreamKey
+ buffer *bytes.Buffer
+ indexes []int
+ timestamps []time.Time
+ lossBlocks []bool
+ currentIndex int
+ firstPacketSeen time.Time
+ lastPacketSeen time.Time
+ documentsKeys []string
+ streamLength int
+ patternStream hyperscan.Stream
+ patternMatches map[uint][]PatternSlice
}
-// ReaderStreamOptions provides user-resettable options for a StreamHandler.
-type ReaderStreamOptions struct {
- // LossErrors determines whether this stream will return
- // ReaderStreamDataLoss errors from its Read function whenever it
- // determines data has been lost.
- LossErrors bool
-}
+type PatternSlice [2]uint64
// NewReaderStream returns a new StreamHandler object.
-func NewStreamHandler() StreamHandler {
- r := StreamHandler{
- reassembled: make(chan []tcpassembly.Reassembly),
- done: make(chan bool),
- first: true,
- initiated: true,
+func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler {
+ handler := StreamHandler{
+ connection: connection,
+ streamKey: key,
+ buffer: new(bytes.Buffer),
+ indexes: make([]int, 0, InitialBlockCount),
+ timestamps: make([]time.Time, 0, InitialBlockCount),
+ lossBlocks: make([]bool, 0, InitialBlockCount),
+ documentsKeys: make([]string, 0, 1), // most of the time the stream fit in one document
+ patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value
}
- return r
+
+ stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil)
+ if err != nil {
+ log.Println("failed to create a stream: ", err)
+ }
+ handler.patternStream = stream
+
+ return handler
}
// Reassembled implements tcpassembly.Stream's Reassembled function.
-func (r *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
- if !r.initiated {
- panic("StreamHandler not created via NewReaderStream")
+func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
+ for _, r := range reassembly {
+ skip := r.Skip
+ if r.Start {
+ skip = 0
+ sh.firstPacketSeen = r.Seen
+ }
+ if r.End {
+ sh.lastPacketSeen = r.Seen
+ }
+
+ reassemblyLen := len(r.Bytes)
+ if reassemblyLen == 0 {
+ continue
+ }
+
+ if sh.buffer.Len()+len(r.Bytes)-skip > MaxDocumentSize {
+ sh.storageCurrentDocument()
+ sh.resetCurrentDocument()
+ }
+ n, err := sh.buffer.Write(r.Bytes[skip:])
+ if err != nil {
+ log.Println("error while copying bytes from Reassemble in stream_handler")
+ return
+ }
+ sh.indexes = append(sh.indexes, sh.currentIndex)
+ sh.timestamps = append(sh.timestamps, r.Seen)
+ sh.lossBlocks = append(sh.lossBlocks, skip != 0)
+ sh.currentIndex += n
+ sh.streamLength += n
+
+ err = sh.patternStream.Scan(r.Bytes)
+ if err != nil {
+ log.Println("failed to scan packet buffer: ", err)
+ }
}
- r.reassembled <- reassembly
- <-r.done
}
// ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
-func (r *StreamHandler) ReassemblyComplete() {
- close(r.reassembled)
- close(r.done)
-}
+func (sh *StreamHandler) ReassemblyComplete() {
+ err := sh.patternStream.Close()
+ if err != nil {
+ log.Println("failed to close pattern stream: ", err)
+ }
-// stripEmpty strips empty reassembly slices off the front of its current set of
-// slices.
-func (r *StreamHandler) stripEmpty() {
- for len(r.current) > 0 && len(r.current[0].Bytes) == 0 {
- r.current = r.current[1:]
- r.lossReported = false
+ if sh.currentIndex > 0 {
+ sh.storageCurrentDocument()
}
+ sh.connection.Complete(sh)
}
-// DataLost is returned by the StreamHandler's Read function when it encounters
-// a Reassembly with Skip != 0.
-var DataLost = errors.New("lost data")
-
-// Read implements io.Reader's Read function.
-// Given a byte slice, it will either copy a non-zero number of bytes into
-// that slice and return the number of bytes and a nil error, or it will
-// leave slice p as is and return 0, io.EOF.
-func (r *StreamHandler) Read(p []byte) (int, error) {
- if !r.initiated {
- panic("StreamHandler not created via NewReaderStream")
- }
- var ok bool
- r.stripEmpty()
- for !r.closed && len(r.current) == 0 {
- if r.first {
- r.first = false
- } else {
- r.done <- true
- }
- if r.current, ok = <-r.reassembled; ok {
- r.stripEmpty()
- } else {
- r.closed = true
- }
+func (sh *StreamHandler) resetCurrentDocument() {
+ sh.buffer.Reset()
+ sh.indexes = sh.indexes[:0]
+ sh.timestamps = sh.timestamps[:0]
+ sh.lossBlocks = sh.lossBlocks[:0]
+ sh.currentIndex = 0
+
+ for _, val := range sh.patternMatches {
+ val = val[:0]
}
- if len(r.current) > 0 {
- current := &r.current[0]
- if r.LossErrors && !r.lossReported && current.Skip != 0 {
- r.lossReported = true
- return 0, DataLost
+}
+
+func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, context interface{}) error {
+ patternSlices, isPresent := sh.patternMatches[id]
+ if isPresent {
+ if len(patternSlices) > 0 {
+ lastElement := &patternSlices[len(patternSlices)-1]
+ if lastElement[0] == from { // make the regex greedy to match the maximum number of chars
+ lastElement[1] = to
+ return nil
+ }
}
- length := copy(p, current.Bytes)
- current.Bytes = current.Bytes[length:]
- return length, nil
+ // new from == new match
+ sh.patternMatches[id] = append(patternSlices, PatternSlice{from, to})
+ } else {
+ patternSlices = make([]PatternSlice, InitialPatternSliceSize)
+ patternSlices[0] = PatternSlice{from, to}
+ sh.patternMatches[id] = patternSlices
}
- return 0, io.EOF
+
+ return nil
}
-// Close implements io.Closer's Close function, making StreamHandler a
-// io.ReadCloser. It discards all remaining bytes in the reassembly in a
-// manner that's safe for the assembler (IE: it doesn't block).
-func (r *StreamHandler) Close() error {
- r.current = nil
- r.closed = true
- for {
- if _, ok := <-r.reassembled; !ok {
- return nil
- }
- r.done <- true
+func (sh *StreamHandler) storageCurrentDocument() {
+ streamKey := sh.generateDocumentKey()
+
+ _, err := sh.connection.Storage().InsertOne(sh.connection.Context(), "connection_streams", OrderedDocument{
+ {"_id", streamKey},
+ {"connection_id", nil},
+ {"document_index", len(sh.documentsKeys)},
+ {"payload", sh.buffer.Bytes()},
+ {"blocks_indexes", sh.indexes},
+ {"blocks_timestamps", sh.timestamps},
+ {"blocks_loss", sh.lossBlocks},
+ {"pattern_matches", sh.patternMatches},
+ })
+
+ if err != nil {
+ log.Println("failed to insert connection stream: ", err)
}
+
+ sh.documentsKeys = append(sh.documentsKeys, streamKey)
}
+func (sh *StreamHandler) generateDocumentKey() string {
+ hash := make([]byte, 16)
+ endpointsHash := sh.streamKey[0].FastHash() ^ sh.streamKey[1].FastHash() ^
+ sh.streamKey[2].FastHash() ^ sh.streamKey[3].FastHash()
+ binary.BigEndian.PutUint64(hash, endpointsHash)
+ binary.BigEndian.PutUint64(hash[8:], uint64(sh.timestamps[0].UnixNano()))
+ binary.BigEndian.PutUint16(hash[8:], uint16(len(sh.documentsKeys)))
+ return fmt.Sprintf("%x", hash)
+}