diff options
author | Emiliano Ciavatta | 2020-04-09 10:37:48 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-09 10:37:48 +0000 |
commit | 7ca2f30a0eb21e22071f4e6b04a5207fa273d283 (patch) | |
tree | 63acb98147ffda7606bdf81abe2894e5f8363bd9 /stream_handler.go | |
parent | 0520dab47d61e2c4de246459bf4f5c72d69182d3 (diff) |
Refactor connection_handler
Diffstat (limited to 'stream_handler.go')
-rw-r--r-- | stream_handler.go | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/stream_handler.go b/stream_handler.go index 3fafa21..2d80f60 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -18,7 +18,7 @@ const InitialPatternSliceSize = 8 // method: type StreamHandler struct { connection ConnectionHandler - streamKey StreamKey + streamFlow StreamFlow buffer *bytes.Buffer indexes []int timestamps []time.Time @@ -26,28 +26,31 @@ type StreamHandler struct { currentIndex int firstPacketSeen time.Time lastPacketSeen time.Time - documentsKeys []RowID + documentsIDs []RowID streamLength int + packetsCount int patternStream hyperscan.Stream patternMatches map[uint][]PatternSlice + scanner Scanner } // NewReaderStream returns a new StreamHandler object. -func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler { +func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner) StreamHandler { handler := StreamHandler{ connection: connection, - streamKey: key, + streamFlow: streamFlow, buffer: new(bytes.Buffer), indexes: make([]int, 0, InitialBlockCount), timestamps: make([]time.Time, 0, InitialBlockCount), lossBlocks: make([]bool, 0, InitialBlockCount), - documentsKeys: make([]RowID, 0, 1), // most of the time the stream fit in one document + documentsIDs: make([]RowID, 0, 1), // most of the time the stream fit in one document patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value + scanner: scanner, } - stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil) + stream, err := connection.PatternsDatabase().Open(0, scanner.scratch, handler.onMatch, nil) if err != nil { - log.WithField("streamKey", key).WithError(err).Error("failed to create a stream") + log.WithField("streamFlow", streamFlow).WithError(err).Error("failed to create a stream") } handler.patternStream = stream @@ -57,6 +60,8 @@ func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hype // Reassembled implements tcpassembly.Stream's Reassembled function. func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { for _, r := range reassembly { + sh.packetsCount++ + skip := r.Skip if r.Start { skip = 0 @@ -78,7 +83,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { n, err := sh.buffer.Write(r.Bytes[skip:]) if err != nil { log.WithError(err).Error("failed to copy bytes from a Reassemble") - return + continue } sh.indexes = append(sh.indexes, sh.currentIndex) sh.timestamps = append(sh.timestamps, r.Seen) @@ -86,18 +91,22 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { sh.currentIndex += n sh.streamLength += n - err = sh.patternStream.Scan(r.Bytes) - if err != nil { - log.WithError(err).Error("failed to scan packet buffer") + if sh.patternStream != nil { + err = sh.patternStream.Scan(r.Bytes) + if err != nil { + log.WithError(err).Error("failed to scan packet buffer") + } } } } // ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function. func (sh *StreamHandler) ReassemblyComplete() { - err := sh.patternStream.Close() - if err != nil { - log.WithError(err).Error("failed to close pattern stream") + if sh.patternStream != nil { + err := sh.patternStream.Close() + if err != nil { + log.WithError(err).Error("failed to close pattern stream") + } } if sh.currentIndex > 0 { @@ -140,16 +149,14 @@ func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, co } func (sh *StreamHandler) storageCurrentDocument() { - payload := (sh.streamKey[0].FastHash()^sh.streamKey[1].FastHash()^sh.streamKey[2].FastHash()^ - sh.streamKey[3].FastHash())&uint64(0xffffffffffffff00) | uint64(len(sh.documentsKeys)) // LOL - streamKey := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen) + payload := sh.streamFlow.Hash()&uint64(0xffffffffffffff00) | uint64(len(sh.documentsIDs)) // LOL + streamID := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen) _, err := sh.connection.Storage().Insert(ConnectionStreams). - Context(sh.connection.Context()). One(ConnectionStream{ - ID: streamKey, + ID: streamID, ConnectionID: ZeroRowID, - DocumentIndex: len(sh.documentsKeys), + DocumentIndex: len(sh.documentsIDs), Payload: sh.buffer.Bytes(), BlocksIndexes: sh.indexes, BlocksTimestamps: sh.timestamps, @@ -159,7 +166,7 @@ func (sh *StreamHandler) storageCurrentDocument() { if err != nil { log.WithError(err).Error("failed to insert connection stream") + } else { + sh.documentsIDs = append(sh.documentsIDs, streamID) } - - sh.documentsKeys = append(sh.documentsKeys, streamKey) } |