aboutsummaryrefslogtreecommitdiff
path: root/stream_handler.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-09 10:37:48 +0000
committerEmiliano Ciavatta2020-04-09 10:37:48 +0000
commit7ca2f30a0eb21e22071f4e6b04a5207fa273d283 (patch)
tree63acb98147ffda7606bdf81abe2894e5f8363bd9 /stream_handler.go
parent0520dab47d61e2c4de246459bf4f5c72d69182d3 (diff)
Refactor connection_handler
Diffstat (limited to 'stream_handler.go')
-rw-r--r--stream_handler.go51
1 files changed, 29 insertions, 22 deletions
diff --git a/stream_handler.go b/stream_handler.go
index 3fafa21..2d80f60 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -18,7 +18,7 @@ const InitialPatternSliceSize = 8
// method:
type StreamHandler struct {
connection ConnectionHandler
- streamKey StreamKey
+ streamFlow StreamFlow
buffer *bytes.Buffer
indexes []int
timestamps []time.Time
@@ -26,28 +26,31 @@ type StreamHandler struct {
currentIndex int
firstPacketSeen time.Time
lastPacketSeen time.Time
- documentsKeys []RowID
+ documentsIDs []RowID
streamLength int
+ packetsCount int
patternStream hyperscan.Stream
patternMatches map[uint][]PatternSlice
+ scanner Scanner
}
// NewReaderStream returns a new StreamHandler object.
-func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hyperscan.Scratch) StreamHandler {
+func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner) StreamHandler {
handler := StreamHandler{
connection: connection,
- streamKey: key,
+ streamFlow: streamFlow,
buffer: new(bytes.Buffer),
indexes: make([]int, 0, InitialBlockCount),
timestamps: make([]time.Time, 0, InitialBlockCount),
lossBlocks: make([]bool, 0, InitialBlockCount),
- documentsKeys: make([]RowID, 0, 1), // most of the time the stream fit in one document
+ documentsIDs: make([]RowID, 0, 1), // most of the time the stream fit in one document
patternMatches: make(map[uint][]PatternSlice, 10), // TODO: change with exactly value
+ scanner: scanner,
}
- stream, err := connection.Patterns().Open(0, scratch, handler.onMatch, nil)
+ stream, err := connection.PatternsDatabase().Open(0, scanner.scratch, handler.onMatch, nil)
if err != nil {
- log.WithField("streamKey", key).WithError(err).Error("failed to create a stream")
+ log.WithField("streamFlow", streamFlow).WithError(err).Error("failed to create a stream")
}
handler.patternStream = stream
@@ -57,6 +60,8 @@ func NewStreamHandler(connection ConnectionHandler, key StreamKey, scratch *hype
// Reassembled implements tcpassembly.Stream's Reassembled function.
func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
for _, r := range reassembly {
+ sh.packetsCount++
+
skip := r.Skip
if r.Start {
skip = 0
@@ -78,7 +83,7 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
n, err := sh.buffer.Write(r.Bytes[skip:])
if err != nil {
log.WithError(err).Error("failed to copy bytes from a Reassemble")
- return
+ continue
}
sh.indexes = append(sh.indexes, sh.currentIndex)
sh.timestamps = append(sh.timestamps, r.Seen)
@@ -86,18 +91,22 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
sh.currentIndex += n
sh.streamLength += n
- err = sh.patternStream.Scan(r.Bytes)
- if err != nil {
- log.WithError(err).Error("failed to scan packet buffer")
+ if sh.patternStream != nil {
+ err = sh.patternStream.Scan(r.Bytes)
+ if err != nil {
+ log.WithError(err).Error("failed to scan packet buffer")
+ }
}
}
}
// ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
func (sh *StreamHandler) ReassemblyComplete() {
- err := sh.patternStream.Close()
- if err != nil {
- log.WithError(err).Error("failed to close pattern stream")
+ if sh.patternStream != nil {
+ err := sh.patternStream.Close()
+ if err != nil {
+ log.WithError(err).Error("failed to close pattern stream")
+ }
}
if sh.currentIndex > 0 {
@@ -140,16 +149,14 @@ func (sh *StreamHandler) onMatch(id uint, from uint64, to uint64, flags uint, co
}
func (sh *StreamHandler) storageCurrentDocument() {
- payload := (sh.streamKey[0].FastHash()^sh.streamKey[1].FastHash()^sh.streamKey[2].FastHash()^
- sh.streamKey[3].FastHash())&uint64(0xffffffffffffff00) | uint64(len(sh.documentsKeys)) // LOL
- streamKey := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen)
+ payload := sh.streamFlow.Hash()&uint64(0xffffffffffffff00) | uint64(len(sh.documentsIDs)) // LOL
+ streamID := sh.connection.Storage().NewCustomRowID(payload, sh.firstPacketSeen)
_, err := sh.connection.Storage().Insert(ConnectionStreams).
- Context(sh.connection.Context()).
One(ConnectionStream{
- ID: streamKey,
+ ID: streamID,
ConnectionID: ZeroRowID,
- DocumentIndex: len(sh.documentsKeys),
+ DocumentIndex: len(sh.documentsIDs),
Payload: sh.buffer.Bytes(),
BlocksIndexes: sh.indexes,
BlocksTimestamps: sh.timestamps,
@@ -159,7 +166,7 @@ func (sh *StreamHandler) storageCurrentDocument() {
if err != nil {
log.WithError(err).Error("failed to insert connection stream")
+ } else {
+ sh.documentsIDs = append(sh.documentsIDs, streamID)
}
-
- sh.documentsKeys = append(sh.documentsKeys, streamKey)
}