aboutsummaryrefslogtreecommitdiff
path: root/stream_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'stream_handler.go')
-rw-r--r--stream_handler.go71
1 files changed, 30 insertions, 41 deletions
diff --git a/stream_handler.go b/stream_handler.go
index ce580fc..3fafa21 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -2,11 +2,9 @@ package main
import (
"bytes"
- "encoding/binary"
- "fmt"
"github.com/flier/gohs/hyperscan"
"github.com/google/gopacket/tcpassembly"
- "log"
+ log "github.com/sirupsen/logrus"
"time"
)
@@ -28,30 +26,28 @@ type StreamHandler struct {
currentIndex int
firstPacketSeen time.Time
lastPacketSeen time.Time
- documentsKeys []string
+ documentsKeys []RowID
streamLength int
patternStream hyperscan.Stream
patternMatches map[uint][]PatternSlice
}
-type PatternSlice [2]uint64
-
// NewReaderStream returns a new StreamHandler object.
func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler {
handler := StreamHandler{
- connection: connection,
- streamKey: key,
- buffer: new(bytes.Buffer),
- indexes: make([]int, 0, InitialBlockCount),
- timestamps: make([]time.Time, 0, InitialBlockCount),
- lossBlocks: make([]bool, 0, InitialBlockCount),
- documentsKeys: make([]string, 0, 1), // most of the time the stream fit in one document
+ connection: connection,
+ streamKey: key,
+ buffer: new(bytes.Buffer),
+ indexes: make([]int, 0, InitialBlockCount),
+ timestamps: make([]time.Time, 0, InitialBlockCount),
+ lossBlocks: make([]bool, 0, InitialBlockCount),
+ documentsKeys: make([]RowID, 0, 1), // most of the time the stream fit in one document
patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value
}
stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil)
if err != nil {
- log.Println("failed to create a stream: ", err)
+ log.WithField("streamKey", key).WithError(err).Error("failed to create a stream")
}
handler.patternStream = stream
@@ -81,7 +77,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
}
n, err := sh.buffer.Write(r.Bytes[skip:])
if err != nil {
- log.Println("error while copying bytes from Reassemble in stream_handler")
+ log.WithError(err).Error("failed to copy bytes from a Reassemble")
return
}
sh.indexes = append(sh.indexes, sh.currentIndex)
@@ -92,7 +88,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
err = sh.patternStream.Scan(r.Bytes)
if err != nil {
- log.Println("failed to scan packet buffer: ", err)
+ log.WithError(err).Error("failed to scan packet buffer")
}
}
}
@@ -101,7 +97,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
func (sh *StreamHandler) ReassemblyComplete() {
err := sh.patternStream.Close()
if err != nil {
- log.Println("failed to close pattern stream: ", err)
+ log.WithError(err).Error("failed to close pattern stream")
}
if sh.currentIndex > 0 {
@@ -144,33 +140,26 @@ func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, co
}
func (sh *StreamHandler) storageCurrentDocument() {
- streamKey := sh.generateDocumentKey()
-
- _, err := sh.connection.Storage().InsertOne(sh.connection.Context(), "connection_streams", OrderedDocument{
- {"_id", streamKey},
- {"connection_id", nil},
- {"document_index", len(sh.documentsKeys)},
- {"payload", sh.buffer.Bytes()},
- {"blocks_indexes", sh.indexes},
- {"blocks_timestamps", sh.timestamps},
- {"blocks_loss", sh.lossBlocks},
- {"pattern_matches", sh.patternMatches},
- })
+ payload := (sh.streamKey[0].FastHash()^sh.streamKey[1].FastHash()^sh.streamKey[2].FastHash()^
+ sh.streamKey[3].FastHash())&uint64(0xffffffffffffff00) | uint64(len(sh.documentsKeys)) // LOL
+ streamKey := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen)
+
+ _, err := sh.connection.Storage().Insert(ConnectionStreams).
+ Context(sh.connection.Context()).
+ One(ConnectionStream{
+ ID: streamKey,
+ ConnectionID: ZeroRowID,
+ DocumentIndex: len(sh.documentsKeys),
+ Payload: sh.buffer.Bytes(),
+ BlocksIndexes: sh.indexes,
+ BlocksTimestamps: sh.timestamps,
+ BlocksLoss: sh.lossBlocks,
+ PatternMatches: sh.patternMatches,
+ })
if err != nil {
- log.Println("failed to insert connection stream: ", err)
+ log.WithError(err).Error("failed to insert connection stream")
}
sh.documentsKeys = append(sh.documentsKeys, streamKey)
}
-
-func (sh *StreamHandler) generateDocumentKey() string {
- hash := make([]byte, 16)
- endpointsHash := sh.streamKey[0].FastHash() ^ sh.streamKey[1].FastHash() ^
- sh.streamKey[2].FastHash() ^ sh.streamKey[3].FastHash()
- binary.BigEndian.PutUint64(hash, endpointsHash)
- binary.BigEndian.PutUint64(hash[8:], uint64(sh.firstPacketSeen.UnixNano()))
- binary.BigEndian.PutUint16(hash[8:], uint16(len(sh.documentsKeys)))
-
- return fmt.Sprintf("%x", hash)
-}