diff options
author | Emiliano Ciavatta | 2020-04-03 10:47:23 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-03 10:47:23 +0000 |
commit | 9883cd346f694ad09aac839f9ddc4a25df0e0b0a (patch) | |
tree | 7a5bfdf282b273ffc410a9e8c758fb8db73072c7 /stream_handler.go | |
parent | b02ee06a2dad56650f539f69df5660a88e442059 (diff) |
Add connection_handler and stream_handler
Diffstat (limited to 'stream_handler.go')
-rw-r--r-- | stream_handler.go | 310 |
1 files changed, 140 insertions, 170 deletions
diff --git a/stream_handler.go b/stream_handler.go index ad59856..80d91d6 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -1,206 +1,176 @@ -// Package tcpreader provides an implementation for tcpassembly.Stream which presents -// the caller with an io.Reader for easy processing. -// -// The assembly package handles packet data reordering, but its output is -// library-specific, thus not usable by the majority of external Go libraries. -// The io.Reader interface, on the other hand, is used throughout much of Go -// code as an easy mechanism for reading in data streams and decoding them. For -// example, the net/http package provides the ReadRequest function, which can -// parse an HTTP request from a live data stream, just what we'd want when -// sniffing HTTP traffic. Using StreamHandler, this is relatively easy to set -// up: -// -// // Create our StreamFactory -// type httpStreamFactory struct {} -// func (f *httpStreamFactory) New(a, b gopacket.Flow) { -// r := tcpreader.NewReaderStream(false) -// go printRequests(r) -// return &r -// } -// func printRequests(r io.Reader) { -// // Convert to bufio, since that's what ReadRequest wants. -// buf := bufio.NewReader(r) -// for { -// if req, err := http.ReadRequest(buf); err == io.EOF { -// return -// } else if err != nil { -// log.Println("Error parsing HTTP requests:", err) -// } else { -// fmt.Println("HTTP REQUEST:", req) -// fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes") -// } -// } -// } -// -// Using just this code, we're able to reference a powerful, built-in library -// for HTTP request parsing to do all the dirty-work of parsing requests from -// the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool, -// start up an tcpassembly.Assembler, and you're good to go! package main import ( -"errors" -"github.com/google/gopacket/tcpassembly" -"io" + "bytes" + "encoding/binary" + "fmt" + "github.com/flier/gohs/hyperscan" + "github.com/google/gopacket/tcpassembly" + "log" + "time" ) -var discardBuffer = make([]byte, 4096) - -// DiscardBytesToFirstError will read in all bytes up to the first error -// reported by the given reader, then return the number of bytes discarded -// and the error encountered. -func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) { - for { - n, e := r.Read(discardBuffer) - discarded += n - if e != nil { - return discarded, e - } - } -} +const MaxDocumentSize = 1024 * 1024 +const InitialBlockCount = 1024 +const InitialPatternSliceSize = 8 -// DiscardBytesToEOF will read in all bytes from a Reader until it -// encounters an io.EOF, then return the number of bytes. Be careful -// of this... if used on a Reader that returns a non-io.EOF error -// consistently, this will loop forever discarding that error while -// it waits for an EOF. -func DiscardBytesToEOF(r io.Reader) (discarded int) { - for { - n, e := DiscardBytesToFirstError(r) - discarded += n - if e == io.EOF { - return - } - } -} - -// StreamHandler implements both tcpassembly.Stream and io.Reader. You can use it -// as a building block to make simple, easy stream handlers. -// // IMPORTANT: If you use a StreamHandler, you MUST read ALL BYTES from it, // quickly. Not reading available bytes will block TCP stream reassembly. It's // a common pattern to do this by starting a goroutine in the factory's New // method: -// -// type myStreamHandler struct { -// r StreamHandler -// } -// func (m *myStreamHandler) run() { -// // Do something here that reads all of the StreamHandler, or your assembly -// // will block. -// fmt.Println(tcpreader.DiscardBytesToEOF(&m.r)) -// } -// func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream { -// s := &myStreamHandler{} -// go s.run() -// // Return the StreamHandler as the stream that assembly should populate. -// return &s.r -// } type StreamHandler struct { - ReaderStreamOptions - reassembled chan []tcpassembly.Reassembly - done chan bool - current []tcpassembly.Reassembly - closed bool - lossReported bool - first bool - initiated bool + connection ConnectionHandler + streamKey StreamKey + buffer *bytes.Buffer + indexes []int + timestamps []time.Time + lossBlocks []bool + currentIndex int + firstPacketSeen time.Time + lastPacketSeen time.Time + documentsKeys []string + streamLength int + patternStream hyperscan.Stream + patternMatches map[uint][]PatternSlice } -// ReaderStreamOptions provides user-resettable options for a StreamHandler. -type ReaderStreamOptions struct { - // LossErrors determines whether this stream will return - // ReaderStreamDataLoss errors from its Read function whenever it - // determines data has been lost. - LossErrors bool -} +type PatternSlice [2]uint64 // NewReaderStream returns a new StreamHandler object. -func NewStreamHandler() StreamHandler { - r := StreamHandler{ - reassembled: make(chan []tcpassembly.Reassembly), - done: make(chan bool), - first: true, - initiated: true, +func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler { + handler := StreamHandler{ + connection: connection, + streamKey: key, + buffer: new(bytes.Buffer), + indexes: make([]int, 0, InitialBlockCount), + timestamps: make([]time.Time, 0, InitialBlockCount), + lossBlocks: make([]bool, 0, InitialBlockCount), + documentsKeys: make([]string, 0, 1), // most of the time the stream fit in one document + patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value } - return r + + stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil) + if err != nil { + log.Println("failed to create a stream: ", err) + } + handler.patternStream = stream + + return handler } // Reassembled implements tcpassembly.Stream's Reassembled function. -func (r *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { - if !r.initiated { - panic("StreamHandler not created via NewReaderStream") +func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { + for _, r := range reassembly { + skip := r.Skip + if r.Start { + skip = 0 + sh.firstPacketSeen = r.Seen + } + if r.End { + sh.lastPacketSeen = r.Seen + } + + reassemblyLen := len(r.Bytes) + if reassemblyLen == 0 { + continue + } + + if sh.buffer.Len()+len(r.Bytes)-skip > MaxDocumentSize { + sh.storageCurrentDocument() + sh.resetCurrentDocument() + } + n, err := sh.buffer.Write(r.Bytes[skip:]) + if err != nil { + log.Println("error while copying bytes from Reassemble in stream_handler") + return + } + sh.indexes = append(sh.indexes, sh.currentIndex) + sh.timestamps = append(sh.timestamps, r.Seen) + sh.lossBlocks = append(sh.lossBlocks, skip != 0) + sh.currentIndex += n + sh.streamLength += n + + err = sh.patternStream.Scan(r.Bytes) + if err != nil { + log.Println("failed to scan packet buffer: ", err) + } } - r.reassembled <- reassembly - <-r.done } // ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function. -func (r *StreamHandler) ReassemblyComplete() { - close(r.reassembled) - close(r.done) -} +func (sh *StreamHandler) ReassemblyComplete() { + err := sh.patternStream.Close() + if err != nil { + log.Println("failed to close pattern stream: ", err) + } -// stripEmpty strips empty reassembly slices off the front of its current set of -// slices. -func (r *StreamHandler) stripEmpty() { - for len(r.current) > 0 && len(r.current[0].Bytes) == 0 { - r.current = r.current[1:] - r.lossReported = false + if sh.currentIndex > 0 { + sh.storageCurrentDocument() } + sh.connection.Complete(sh) } -// DataLost is returned by the StreamHandler's Read function when it encounters -// a Reassembly with Skip != 0. -var DataLost = errors.New("lost data") - -// Read implements io.Reader's Read function. -// Given a byte slice, it will either copy a non-zero number of bytes into -// that slice and return the number of bytes and a nil error, or it will -// leave slice p as is and return 0, io.EOF. -func (r *StreamHandler) Read(p []byte) (int, error) { - if !r.initiated { - panic("StreamHandler not created via NewReaderStream") - } - var ok bool - r.stripEmpty() - for !r.closed && len(r.current) == 0 { - if r.first { - r.first = false - } else { - r.done <- true - } - if r.current, ok = <-r.reassembled; ok { - r.stripEmpty() - } else { - r.closed = true - } +func (sh *StreamHandler) resetCurrentDocument() { + sh.buffer.Reset() + sh.indexes = sh.indexes[:0] + sh.timestamps = sh.timestamps[:0] + sh.lossBlocks = sh.lossBlocks[:0] + sh.currentIndex = 0 + + for _, val := range sh.patternMatches { + val = val[:0] } - if len(r.current) > 0 { - current := &r.current[0] - if r.LossErrors && !r.lossReported && current.Skip != 0 { - r.lossReported = true - return 0, DataLost +} + +func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, context interface{}) error { + patternSlices, isPresent := sh.patternMatches[id] + if isPresent { + if len(patternSlices) > 0 { + lastElement := &patternSlices[len(patternSlices)-1] + if lastElement[0] == from { // make the regex greedy to match the maximum number of chars + lastElement[1] = to + return nil + } } - length := copy(p, current.Bytes) - current.Bytes = current.Bytes[length:] - return length, nil + // new from == new match + sh.patternMatches[id] = append(patternSlices, PatternSlice{from, to}) + } else { + patternSlices = make([]PatternSlice, InitialPatternSliceSize) + patternSlices[0] = PatternSlice{from, to} + sh.patternMatches[id] = patternSlices } - return 0, io.EOF + + return nil } -// Close implements io.Closer's Close function, making StreamHandler a -// io.ReadCloser. It discards all remaining bytes in the reassembly in a -// manner that's safe for the assembler (IE: it doesn't block). -func (r *StreamHandler) Close() error { - r.current = nil - r.closed = true - for { - if _, ok := <-r.reassembled; !ok { - return nil - } - r.done <- true +func (sh *StreamHandler) storageCurrentDocument() { + streamKey := sh.generateDocumentKey() + + _, err := sh.connection.Storage().InsertOne(sh.connection.Context(), "connection_streams", OrderedDocument{ + {"_id", streamKey}, + {"connection_id", nil}, + {"document_index", len(sh.documentsKeys)}, + {"payload", sh.buffer.Bytes()}, + {"blocks_indexes", sh.indexes}, + {"blocks_timestamps", sh.timestamps}, + {"blocks_loss", sh.lossBlocks}, + {"pattern_matches", sh.patternMatches}, + }) + + if err != nil { + log.Println("failed to insert connection stream: ", err) } + + sh.documentsKeys = append(sh.documentsKeys, streamKey) } +func (sh *StreamHandler) generateDocumentKey() string { + hash := make([]byte, 16) + endpointsHash := sh.streamKey[0].FastHash() ^ sh.streamKey[1].FastHash() ^ + sh.streamKey[2].FastHash() ^ sh.streamKey[3].FastHash() + binary.BigEndian.PutUint64(hash, endpointsHash) + binary.BigEndian.PutUint64(hash[8:], uint64(sh.timestamps[0].UnixNano())) + binary.BigEndian.PutUint16(hash[8:], uint16(len(sh.documentsKeys))) + return fmt.Sprintf("%x", hash) +} |