From 0520dab47d61e2c4de246459bf4f5c72d69182d3 Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Thu, 9 Apr 2020 10:26:15 +0200 Subject: Refactor storage --- stream_handler.go | 71 +++++++++++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 41 deletions(-) (limited to 'stream_handler.go') diff --git a/stream_handler.go b/stream_handler.go index ce580fc..3fafa21 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -2,11 +2,9 @@ package main import ( "bytes" - "encoding/binary" - "fmt" "github.com/flier/gohs/hyperscan" "github.com/google/gopacket/tcpassembly" - "log" + log "github.com/sirupsen/logrus" "time" ) @@ -28,30 +26,28 @@ type StreamHandler struct { currentIndex int firstPacketSeen time.Time lastPacketSeen time.Time - documentsKeys []string + documentsKeys []RowID streamLength int patternStream hyperscan.Stream patternMatches map[uint][]PatternSlice } -type PatternSlice [2]uint64 - // NewReaderStream returns a new StreamHandler object. func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler { handler := StreamHandler{ - connection: connection, - streamKey: key, - buffer: new(bytes.Buffer), - indexes: make([]int, 0, InitialBlockCount), - timestamps: make([]time.Time, 0, InitialBlockCount), - lossBlocks: make([]bool, 0, InitialBlockCount), - documentsKeys: make([]string, 0, 1), // most of the time the stream fit in one document + connection: connection, + streamKey: key, + buffer: new(bytes.Buffer), + indexes: make([]int, 0, InitialBlockCount), + timestamps: make([]time.Time, 0, InitialBlockCount), + lossBlocks: make([]bool, 0, InitialBlockCount), + documentsKeys: make([]RowID, 0, 1), // most of the time the stream fit in one document patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value } stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil) if err != nil { - log.Println("failed to create a stream: ", err) + log.WithField("streamKey", key).WithError(err).Error("failed to create a stream") } handler.patternStream = stream @@ -81,7 +77,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { } n, err := sh.buffer.Write(r.Bytes[skip:]) if err != nil { - log.Println("error while copying bytes from Reassemble in stream_handler") + log.WithError(err).Error("failed to copy bytes from a Reassemble") return } sh.indexes = append(sh.indexes, sh.currentIndex) @@ -92,7 +88,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { err = sh.patternStream.Scan(r.Bytes) if err != nil { - log.Println("failed to scan packet buffer: ", err) + log.WithError(err).Error("failed to scan packet buffer") } } } @@ -101,7 +97,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { func (sh *StreamHandler) ReassemblyComplete() { err := sh.patternStream.Close() if err != nil { - log.Println("failed to close pattern stream: ", err) + log.WithError(err).Error("failed to close pattern stream") } if sh.currentIndex > 0 { @@ -144,33 +140,26 @@ func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, co } func (sh *StreamHandler) storageCurrentDocument() { - streamKey := sh.generateDocumentKey() - - _, err := sh.connection.Storage().InsertOne(sh.connection.Context(), "connection_streams", OrderedDocument{ - {"_id", streamKey}, - {"connection_id", nil}, - {"document_index", len(sh.documentsKeys)}, - {"payload", sh.buffer.Bytes()}, - {"blocks_indexes", sh.indexes}, - {"blocks_timestamps", sh.timestamps}, - {"blocks_loss", sh.lossBlocks}, - {"pattern_matches", sh.patternMatches}, - }) + payload := (sh.streamKey[0].FastHash()^sh.streamKey[1].FastHash()^sh.streamKey[2].FastHash()^ + sh.streamKey[3].FastHash())&uint64(0xffffffffffffff00) | uint64(len(sh.documentsKeys)) // LOL + streamKey := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen) + + _, err := sh.connection.Storage().Insert(ConnectionStreams). + Context(sh.connection.Context()). + One(ConnectionStream{ + ID: streamKey, + ConnectionID: ZeroRowID, + DocumentIndex: len(sh.documentsKeys), + Payload: sh.buffer.Bytes(), + BlocksIndexes: sh.indexes, + BlocksTimestamps: sh.timestamps, + BlocksLoss: sh.lossBlocks, + PatternMatches: sh.patternMatches, + }) if err != nil { - log.Println("failed to insert connection stream: ", err) + log.WithError(err).Error("failed to insert connection stream") } sh.documentsKeys = append(sh.documentsKeys, streamKey) } - -func (sh *StreamHandler) generateDocumentKey() string { - hash := make([]byte, 16) - endpointsHash := sh.streamKey[0].FastHash() ^ sh.streamKey[1].FastHash() ^ - sh.streamKey[2].FastHash() ^ sh.streamKey[3].FastHash() - binary.BigEndian.PutUint64(hash, endpointsHash) - binary.BigEndian.PutUint64(hash[8:], uint64(sh.firstPacketSeen.UnixNano())) - binary.BigEndian.PutUint16(hash[8:], uint16(len(sh.documentsKeys))) - - return fmt.Sprintf("%x", hash) -} -- cgit v1.2.3-70-g09d2