aboutsummaryrefslogtreecommitdiff
path: root/stream_handler_test.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-09 08:26:15 +0000
committerEmiliano Ciavatta2020-04-09 08:26:15 +0000
commit0520dab47d61e2c4de246459bf4f5c72d69182d3 (patch)
treed87df19c87a300d1022324f2ecad66380643d2f1 /stream_handler_test.go
parent468690c60ee2e57ed2ccb4375e9ada5d2fed9473 (diff)
Refactor storage
Diffstat (limited to 'stream_handler_test.go')
-rw-r--r--stream_handler_test.go219
1 files changed, 107 insertions, 112 deletions
diff --git a/stream_handler_test.go b/stream_handler_test.go
index cb5ecc7..ece3190 100644
--- a/stream_handler_test.go
+++ b/stream_handler_test.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "fmt"
"github.com/flier/gohs/hyperscan"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/tcpassembly"
@@ -19,13 +18,14 @@ const testDstIp = "10.10.10.1"
const srcPort = 44444
const dstPort = 8080
-
func TestReassemblingEmptyStream(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ConnectionStreams)
patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/nope/", 0))
- require.Nil(t, err)
+ require.NoError(t, err)
scratch, err := hyperscan.NewScratch(patterns)
- require.Nil(t, err)
- streamHandler := createTestStreamHandler(testStorage{}, patterns, scratch)
+ require.NoError(t, err)
+ streamHandler := createTestStreamHandler(wrapper, patterns, scratch)
streamHandler.Reassembled([]tcpassembly.Reassembly{{
Bytes: []byte{},
@@ -51,19 +51,20 @@ func TestReassemblingEmptyStream(t *testing.T) {
assert.Equal(t, true, completed)
err = scratch.Free()
- require.Nil(t, err, "free scratch")
+ require.NoError(t, err, "free scratch")
err = patterns.Close()
- require.Nil(t, err, "close stream database")
+ require.NoError(t, err, "close stream database")
+ wrapper.Destroy(t)
}
-
func TestReassemblingSingleDocument(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ConnectionStreams)
patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/impossible_to_match/", 0))
- require.Nil(t, err)
+ require.NoError(t, err)
scratch, err := hyperscan.NewScratch(patterns)
- require.Nil(t, err)
- storage := &testStorage{}
- streamHandler := createTestStreamHandler(storage, patterns, scratch)
+ require.NoError(t, err)
+ streamHandler := createTestStreamHandler(wrapper, patterns, scratch)
payloadLen := 256
firstTime := time.Unix(0, 0)
@@ -71,10 +72,10 @@ func TestReassemblingSingleDocument(t *testing.T) {
lastTime := time.Unix(20, 0)
data := make([]byte, MaxDocumentSize)
rand.Read(data)
- reassembles := make([]tcpassembly.Reassembly, MaxDocumentSize / payloadLen)
- indexes := make([]int, MaxDocumentSize / payloadLen)
- timestamps := make([]time.Time, MaxDocumentSize / payloadLen)
- lossBlocks := make([]bool, MaxDocumentSize / payloadLen)
+ reassembles := make([]tcpassembly.Reassembly, MaxDocumentSize/payloadLen)
+ indexes := make([]int, MaxDocumentSize/payloadLen)
+ timestamps := make([]time.Time, MaxDocumentSize/payloadLen)
+ lossBlocks := make([]bool, MaxDocumentSize/payloadLen)
for i := 0; i < len(reassembles); i++ {
var seen time.Time
if i == 0 {
@@ -86,36 +87,22 @@ func TestReassemblingSingleDocument(t *testing.T) {
}
reassembles[i] = tcpassembly.Reassembly{
- Bytes: data[i*payloadLen:(i+1)*payloadLen],
+ Bytes: data[i*payloadLen : (i+1)*payloadLen],
Skip: 0,
Start: i == 0,
End: i == len(reassembles)-1,
Seen: seen,
}
- indexes[i] = i*payloadLen
+ indexes[i] = i * payloadLen
timestamps[i] = seen
}
- inserted := false
- storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) {
- od := document.(OrderedDocument)
- assert.Equal(t, "connection_streams", collectionName)
- assert.Equal(t, "bb41a60281cfae830000000000000000", od[0].Value)
- assert.Equal(t, nil, od[1].Value)
- assert.Equal(t, 0, od[2].Value)
- assert.Equal(t, data, od[3].Value)
- assert.Equal(t, indexes, od[4].Value)
- assert.Equal(t, timestamps, od[5].Value)
- assert.Equal(t, lossBlocks, od[6].Value)
- assert.Len(t, od[7].Value, 0)
- inserted = true
- return nil, nil
- }
+ var results []ConnectionStream
streamHandler.Reassembled(reassembles)
- if !assert.Equal(t, false, inserted) {
- inserted = false
- }
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 0)
completed := false
streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) {
@@ -123,6 +110,18 @@ func TestReassemblingSingleDocument(t *testing.T) {
}
streamHandler.ReassemblyComplete()
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 1)
+ assert.Equal(t, firstTime.Unix(), results[0].ID.Timestamp().Unix())
+ assert.Zero(t, results[0].ConnectionID)
+ assert.Equal(t, 0, results[0].DocumentIndex)
+ assert.Equal(t, data, results[0].Payload)
+ assert.Equal(t, indexes, results[0].BlocksIndexes)
+ assert.Len(t, results[0].BlocksTimestamps, len(timestamps)) // should be compared one by one
+ assert.Equal(t, lossBlocks, results[0].BlocksLoss)
+ assert.Len(t, results[0].PatternMatches, 0)
+
assert.Equal(t, len(data), streamHandler.currentIndex)
assert.Equal(t, firstTime, streamHandler.firstPacketSeen)
assert.Equal(t, lastTime, streamHandler.lastPacketSeen)
@@ -130,35 +129,35 @@ func TestReassemblingSingleDocument(t *testing.T) {
assert.Equal(t, len(data), streamHandler.streamLength)
assert.Len(t, streamHandler.patternMatches, 0)
- assert.Equal(t, true, inserted, "inserted")
assert.Equal(t, true, completed, "completed")
err = scratch.Free()
- require.Nil(t, err, "free scratch")
+ require.NoError(t, err, "free scratch")
err = patterns.Close()
- require.Nil(t, err, "close stream database")
+ require.NoError(t, err, "close stream database")
+ wrapper.Destroy(t)
}
-
func TestReassemblingMultipleDocuments(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ConnectionStreams)
patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/impossible_to_match/", 0))
- require.Nil(t, err)
+ require.NoError(t, err)
scratch, err := hyperscan.NewScratch(patterns)
- require.Nil(t, err)
- storage := &testStorage{}
- streamHandler := createTestStreamHandler(storage, patterns, scratch)
+ require.NoError(t, err)
+ streamHandler := createTestStreamHandler(wrapper, patterns, scratch)
payloadLen := 256
firstTime := time.Unix(0, 0)
middleTime := time.Unix(10, 0)
lastTime := time.Unix(20, 0)
- dataSize := MaxDocumentSize*2
+ dataSize := MaxDocumentSize * 2
data := make([]byte, dataSize)
rand.Read(data)
- reassembles := make([]tcpassembly.Reassembly, dataSize / payloadLen)
- indexes := make([]int, dataSize / payloadLen)
- timestamps := make([]time.Time, dataSize / payloadLen)
- lossBlocks := make([]bool, dataSize / payloadLen)
+ reassembles := make([]tcpassembly.Reassembly, dataSize/payloadLen)
+ indexes := make([]int, dataSize/payloadLen)
+ timestamps := make([]time.Time, dataSize/payloadLen)
+ lossBlocks := make([]bool, dataSize/payloadLen)
for i := 0; i < len(reassembles); i++ {
var seen time.Time
if i == 0 {
@@ -170,38 +169,22 @@ func TestReassemblingMultipleDocuments(t *testing.T) {
}
reassembles[i] = tcpassembly.Reassembly{
- Bytes: data[i*payloadLen:(i+1)*payloadLen],
+ Bytes: data[i*payloadLen : (i+1)*payloadLen],
Skip: 0,
Start: i == 0,
End: i == len(reassembles)-1,
Seen: seen,
}
- indexes[i] = i*payloadLen % MaxDocumentSize
+ indexes[i] = i * payloadLen % MaxDocumentSize
timestamps[i] = seen
}
- inserted := 0
- storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) {
- od := document.(OrderedDocument)
- blockLen := MaxDocumentSize / payloadLen
- assert.Equal(t, "connection_streams", collectionName)
- assert.Equal(t, fmt.Sprintf("bb41a60281cfae83000%v000000000000", inserted), od[0].Value)
- assert.Equal(t, nil, od[1].Value)
- assert.Equal(t, inserted, od[2].Value)
- assert.Equal(t, data[MaxDocumentSize*inserted:MaxDocumentSize*(inserted+1)], od[3].Value)
- assert.Equal(t, indexes[blockLen*inserted:blockLen*(inserted+1)], od[4].Value)
- assert.Equal(t, timestamps[blockLen*inserted:blockLen*(inserted+1)], od[5].Value)
- assert.Equal(t, lossBlocks[blockLen*inserted:blockLen*(inserted+1)], od[6].Value)
- assert.Len(t, od[7].Value, 0)
- inserted += 1
-
- return nil, nil
- }
-
streamHandler.Reassembled(reassembles)
- if !assert.Equal(t, 1, inserted) {
- inserted = 1
- }
+
+ var results []ConnectionStream
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 1)
completed := false
streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) {
@@ -209,6 +192,21 @@ func TestReassemblingMultipleDocuments(t *testing.T) {
}
streamHandler.ReassemblyComplete()
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 2)
+ for i := 0; i < 2; i++ {
+ blockLen := MaxDocumentSize / payloadLen
+ assert.Equal(t, firstTime.Unix(), results[i].ID.Timestamp().Unix())
+ assert.Zero(t, results[i].ConnectionID)
+ assert.Equal(t, i, results[i].DocumentIndex)
+ assert.Equal(t, data[MaxDocumentSize*i:MaxDocumentSize*(i+1)], results[i].Payload)
+ assert.Equal(t, indexes[blockLen*i:blockLen*(i+1)], results[i].BlocksIndexes)
+ assert.Len(t, results[i].BlocksTimestamps, len(timestamps[blockLen*i:blockLen*(i+1)])) // should be compared one by one
+ assert.Equal(t, lossBlocks[blockLen*i:blockLen*(i+1)], results[i].BlocksLoss)
+ assert.Len(t, results[i].PatternMatches, 0)
+ }
+
assert.Equal(t, MaxDocumentSize, streamHandler.currentIndex)
assert.Equal(t, firstTime, streamHandler.firstPacketSeen)
assert.Equal(t, lastTime, streamHandler.lastPacketSeen)
@@ -216,26 +214,28 @@ func TestReassemblingMultipleDocuments(t *testing.T) {
assert.Equal(t, len(data), streamHandler.streamLength)
assert.Len(t, streamHandler.patternMatches, 0)
- assert.Equal(t, 2, inserted, "inserted")
assert.Equal(t, true, completed, "completed")
err = scratch.Free()
- require.Nil(t, err, "free scratch")
+ require.NoError(t, err, "free scratch")
err = patterns.Close()
- require.Nil(t, err, "close stream database")
+ require.NoError(t, err, "close stream database")
+ wrapper.Destroy(t)
}
func TestReassemblingPatternMatching(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ConnectionStreams)
a, err := hyperscan.ParsePattern("/a{8}/i")
- require.Nil(t, err)
+ require.NoError(t, err)
a.Id = 0
a.Flags |= hyperscan.SomLeftMost
b, err := hyperscan.ParsePattern("/b[c]+b/i")
- require.Nil(t, err)
+ require.NoError(t, err)
b.Id = 1
b.Flags |= hyperscan.SomLeftMost
d, err := hyperscan.ParsePattern("/[d]+e[d]+/i")
- require.Nil(t, err)
+ require.NoError(t, err)
d.Id = 2
d.Flags |= hyperscan.SomLeftMost
@@ -247,30 +247,12 @@ func TestReassemblingPatternMatching(t *testing.T) {
}
patterns, err := hyperscan.NewStreamDatabase(a, b, d)
- require.Nil(t, err)
+ require.NoError(t, err)
scratch, err := hyperscan.NewScratch(patterns)
- require.Nil(t, err)
- storage := &testStorage{}
- streamHandler := createTestStreamHandler(storage, patterns, scratch)
+ require.NoError(t, err)
+ streamHandler := createTestStreamHandler(wrapper, patterns, scratch)
seen := time.Unix(0, 0)
- inserted := false
- storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) {
- od := document.(OrderedDocument)
- assert.Equal(t, "connection_streams", collectionName)
- assert.Equal(t, "bb41a60281cfae830000000000000000", od[0].Value)
- assert.Equal(t, nil, od[1].Value)
- assert.Equal(t, 0, od[2].Value)
- assert.Equal(t, []byte(payload), od[3].Value)
- assert.Equal(t, []int{0}, od[4].Value)
- assert.Equal(t, []time.Time{seen}, od[5].Value)
- assert.Equal(t, []bool{false}, od[6].Value)
- assert.Equal(t, expected, od[7].Value)
- inserted = true
-
- return nil, nil
- }
-
streamHandler.Reassembled([]tcpassembly.Reassembly{{
Bytes: []byte(payload),
Skip: 0,
@@ -278,7 +260,11 @@ func TestReassemblingPatternMatching(t *testing.T) {
End: true,
Seen: seen,
}})
- assert.Equal(t, false, inserted)
+
+ var results []ConnectionStream
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 0)
completed := false
streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) {
@@ -286,26 +272,36 @@ func TestReassemblingPatternMatching(t *testing.T) {
}
streamHandler.ReassemblyComplete()
+ err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results)
+ require.NoError(t, err)
+ assert.Len(t, results, 1)
+ assert.Equal(t, seen.Unix(), results[0].ID.Timestamp().Unix())
+ assert.Zero(t, results[0].ConnectionID)
+ assert.Equal(t, 0, results[0].DocumentIndex)
+ assert.Equal(t, []byte(payload), results[0].Payload)
+ assert.Equal(t, []int{0}, results[0].BlocksIndexes)
+ assert.Len(t, results[0].BlocksTimestamps, 1) // should be compared one by one
+ assert.Equal(t, []bool{false}, results[0].BlocksLoss)
+ assert.Equal(t, expected, results[0].PatternMatches)
+
assert.Equal(t, len(payload), streamHandler.currentIndex)
assert.Equal(t, seen, streamHandler.firstPacketSeen)
assert.Equal(t, seen, streamHandler.lastPacketSeen)
assert.Len(t, streamHandler.documentsKeys, 1)
assert.Equal(t, len(payload), streamHandler.streamLength)
- assert.Equal(t, true, inserted, "inserted")
assert.Equal(t, true, completed, "completed")
err = scratch.Free()
- require.Nil(t, err, "free scratch")
+ require.NoError(t, err, "free scratch")
err = patterns.Close()
- require.Nil(t, err, "close stream database")
+ require.NoError(t, err, "close stream database")
+ wrapper.Destroy(t)
}
-
-func createTestStreamHandler(storage Storage, patterns hyperscan.StreamDatabase, scratch *hyperscan.Scratch) StreamHandler {
+func createTestStreamHandler(wrapper *TestStorageWrapper, patterns hyperscan.StreamDatabase, scratch *hyperscan.Scratch) StreamHandler {
testConnectionHandler := &testConnectionHandler{
- storage: storage,
- context: context.Background(),
+ wrapper: wrapper,
patterns: patterns,
}
@@ -318,18 +314,17 @@ func createTestStreamHandler(storage Storage, patterns hyperscan.StreamDatabase,
}
type testConnectionHandler struct {
- storage Storage
- context context.Context
- patterns hyperscan.StreamDatabase
+ wrapper *TestStorageWrapper
+ patterns hyperscan.StreamDatabase
onComplete func(*StreamHandler)
}
func (tch *testConnectionHandler) Storage() Storage {
- return tch.storage
+ return tch.wrapper.Storage
}
func (tch *testConnectionHandler) Context() context.Context {
- return tch.context
+ return tch.wrapper.Context
}
func (tch *testConnectionHandler) Patterns() hyperscan.StreamDatabase {