diff options
author | Emiliano Ciavatta | 2020-04-09 08:26:15 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-09 08:26:15 +0000 |
commit | 0520dab47d61e2c4de246459bf4f5c72d69182d3 (patch) | |
tree | d87df19c87a300d1022324f2ecad66380643d2f1 /stream_handler_test.go | |
parent | 468690c60ee2e57ed2ccb4375e9ada5d2fed9473 (diff) |
Refactor storage
Diffstat (limited to 'stream_handler_test.go')
-rw-r--r-- | stream_handler_test.go | 219 |
1 files changed, 107 insertions, 112 deletions
diff --git a/stream_handler_test.go b/stream_handler_test.go index cb5ecc7..ece3190 100644 --- a/stream_handler_test.go +++ b/stream_handler_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "github.com/flier/gohs/hyperscan" "github.com/google/gopacket/layers" "github.com/google/gopacket/tcpassembly" @@ -19,13 +18,14 @@ const testDstIp = "10.10.10.1" const srcPort = 44444 const dstPort = 8080 - func TestReassemblingEmptyStream(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ConnectionStreams) patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/nope/", 0)) - require.Nil(t, err) + require.NoError(t, err) scratch, err := hyperscan.NewScratch(patterns) - require.Nil(t, err) - streamHandler := createTestStreamHandler(testStorage{}, patterns, scratch) + require.NoError(t, err) + streamHandler := createTestStreamHandler(wrapper, patterns, scratch) streamHandler.Reassembled([]tcpassembly.Reassembly{{ Bytes: []byte{}, @@ -51,19 +51,20 @@ func TestReassemblingEmptyStream(t *testing.T) { assert.Equal(t, true, completed) err = scratch.Free() - require.Nil(t, err, "free scratch") + require.NoError(t, err, "free scratch") err = patterns.Close() - require.Nil(t, err, "close stream database") + require.NoError(t, err, "close stream database") + wrapper.Destroy(t) } - func TestReassemblingSingleDocument(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ConnectionStreams) patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/impossible_to_match/", 0)) - require.Nil(t, err) + require.NoError(t, err) scratch, err := hyperscan.NewScratch(patterns) - require.Nil(t, err) - storage := &testStorage{} - streamHandler := createTestStreamHandler(storage, patterns, scratch) + require.NoError(t, err) + streamHandler := createTestStreamHandler(wrapper, patterns, scratch) payloadLen := 256 firstTime := time.Unix(0, 0) @@ -71,10 +72,10 @@ func TestReassemblingSingleDocument(t *testing.T) { lastTime := time.Unix(20, 0) data := make([]byte, MaxDocumentSize) rand.Read(data) - reassembles := make([]tcpassembly.Reassembly, MaxDocumentSize / payloadLen) - indexes := make([]int, MaxDocumentSize / payloadLen) - timestamps := make([]time.Time, MaxDocumentSize / payloadLen) - lossBlocks := make([]bool, MaxDocumentSize / payloadLen) + reassembles := make([]tcpassembly.Reassembly, MaxDocumentSize/payloadLen) + indexes := make([]int, MaxDocumentSize/payloadLen) + timestamps := make([]time.Time, MaxDocumentSize/payloadLen) + lossBlocks := make([]bool, MaxDocumentSize/payloadLen) for i := 0; i < len(reassembles); i++ { var seen time.Time if i == 0 { @@ -86,36 +87,22 @@ func TestReassemblingSingleDocument(t *testing.T) { } reassembles[i] = tcpassembly.Reassembly{ - Bytes: data[i*payloadLen:(i+1)*payloadLen], + Bytes: data[i*payloadLen : (i+1)*payloadLen], Skip: 0, Start: i == 0, End: i == len(reassembles)-1, Seen: seen, } - indexes[i] = i*payloadLen + indexes[i] = i * payloadLen timestamps[i] = seen } - inserted := false - storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) { - od := document.(OrderedDocument) - assert.Equal(t, "connection_streams", collectionName) - assert.Equal(t, "bb41a60281cfae830000000000000000", od[0].Value) - assert.Equal(t, nil, od[1].Value) - assert.Equal(t, 0, od[2].Value) - assert.Equal(t, data, od[3].Value) - assert.Equal(t, indexes, od[4].Value) - assert.Equal(t, timestamps, od[5].Value) - assert.Equal(t, lossBlocks, od[6].Value) - assert.Len(t, od[7].Value, 0) - inserted = true - return nil, nil - } + var results []ConnectionStream streamHandler.Reassembled(reassembles) - if !assert.Equal(t, false, inserted) { - inserted = false - } + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 0) completed := false streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) { @@ -123,6 +110,18 @@ func TestReassemblingSingleDocument(t *testing.T) { } streamHandler.ReassemblyComplete() + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 1) + assert.Equal(t, firstTime.Unix(), results[0].ID.Timestamp().Unix()) + assert.Zero(t, results[0].ConnectionID) + assert.Equal(t, 0, results[0].DocumentIndex) + assert.Equal(t, data, results[0].Payload) + assert.Equal(t, indexes, results[0].BlocksIndexes) + assert.Len(t, results[0].BlocksTimestamps, len(timestamps)) // should be compared one by one + assert.Equal(t, lossBlocks, results[0].BlocksLoss) + assert.Len(t, results[0].PatternMatches, 0) + assert.Equal(t, len(data), streamHandler.currentIndex) assert.Equal(t, firstTime, streamHandler.firstPacketSeen) assert.Equal(t, lastTime, streamHandler.lastPacketSeen) @@ -130,35 +129,35 @@ func TestReassemblingSingleDocument(t *testing.T) { assert.Equal(t, len(data), streamHandler.streamLength) assert.Len(t, streamHandler.patternMatches, 0) - assert.Equal(t, true, inserted, "inserted") assert.Equal(t, true, completed, "completed") err = scratch.Free() - require.Nil(t, err, "free scratch") + require.NoError(t, err, "free scratch") err = patterns.Close() - require.Nil(t, err, "close stream database") + require.NoError(t, err, "close stream database") + wrapper.Destroy(t) } - func TestReassemblingMultipleDocuments(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ConnectionStreams) patterns, err := hyperscan.NewStreamDatabase(hyperscan.NewPattern("/impossible_to_match/", 0)) - require.Nil(t, err) + require.NoError(t, err) scratch, err := hyperscan.NewScratch(patterns) - require.Nil(t, err) - storage := &testStorage{} - streamHandler := createTestStreamHandler(storage, patterns, scratch) + require.NoError(t, err) + streamHandler := createTestStreamHandler(wrapper, patterns, scratch) payloadLen := 256 firstTime := time.Unix(0, 0) middleTime := time.Unix(10, 0) lastTime := time.Unix(20, 0) - dataSize := MaxDocumentSize*2 + dataSize := MaxDocumentSize * 2 data := make([]byte, dataSize) rand.Read(data) - reassembles := make([]tcpassembly.Reassembly, dataSize / payloadLen) - indexes := make([]int, dataSize / payloadLen) - timestamps := make([]time.Time, dataSize / payloadLen) - lossBlocks := make([]bool, dataSize / payloadLen) + reassembles := make([]tcpassembly.Reassembly, dataSize/payloadLen) + indexes := make([]int, dataSize/payloadLen) + timestamps := make([]time.Time, dataSize/payloadLen) + lossBlocks := make([]bool, dataSize/payloadLen) for i := 0; i < len(reassembles); i++ { var seen time.Time if i == 0 { @@ -170,38 +169,22 @@ func TestReassemblingMultipleDocuments(t *testing.T) { } reassembles[i] = tcpassembly.Reassembly{ - Bytes: data[i*payloadLen:(i+1)*payloadLen], + Bytes: data[i*payloadLen : (i+1)*payloadLen], Skip: 0, Start: i == 0, End: i == len(reassembles)-1, Seen: seen, } - indexes[i] = i*payloadLen % MaxDocumentSize + indexes[i] = i * payloadLen % MaxDocumentSize timestamps[i] = seen } - inserted := 0 - storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) { - od := document.(OrderedDocument) - blockLen := MaxDocumentSize / payloadLen - assert.Equal(t, "connection_streams", collectionName) - assert.Equal(t, fmt.Sprintf("bb41a60281cfae83000%v000000000000", inserted), od[0].Value) - assert.Equal(t, nil, od[1].Value) - assert.Equal(t, inserted, od[2].Value) - assert.Equal(t, data[MaxDocumentSize*inserted:MaxDocumentSize*(inserted+1)], od[3].Value) - assert.Equal(t, indexes[blockLen*inserted:blockLen*(inserted+1)], od[4].Value) - assert.Equal(t, timestamps[blockLen*inserted:blockLen*(inserted+1)], od[5].Value) - assert.Equal(t, lossBlocks[blockLen*inserted:blockLen*(inserted+1)], od[6].Value) - assert.Len(t, od[7].Value, 0) - inserted += 1 - - return nil, nil - } - streamHandler.Reassembled(reassembles) - if !assert.Equal(t, 1, inserted) { - inserted = 1 - } + + var results []ConnectionStream + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 1) completed := false streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) { @@ -209,6 +192,21 @@ func TestReassemblingMultipleDocuments(t *testing.T) { } streamHandler.ReassemblyComplete() + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 2) + for i := 0; i < 2; i++ { + blockLen := MaxDocumentSize / payloadLen + assert.Equal(t, firstTime.Unix(), results[i].ID.Timestamp().Unix()) + assert.Zero(t, results[i].ConnectionID) + assert.Equal(t, i, results[i].DocumentIndex) + assert.Equal(t, data[MaxDocumentSize*i:MaxDocumentSize*(i+1)], results[i].Payload) + assert.Equal(t, indexes[blockLen*i:blockLen*(i+1)], results[i].BlocksIndexes) + assert.Len(t, results[i].BlocksTimestamps, len(timestamps[blockLen*i:blockLen*(i+1)])) // should be compared one by one + assert.Equal(t, lossBlocks[blockLen*i:blockLen*(i+1)], results[i].BlocksLoss) + assert.Len(t, results[i].PatternMatches, 0) + } + assert.Equal(t, MaxDocumentSize, streamHandler.currentIndex) assert.Equal(t, firstTime, streamHandler.firstPacketSeen) assert.Equal(t, lastTime, streamHandler.lastPacketSeen) @@ -216,26 +214,28 @@ func TestReassemblingMultipleDocuments(t *testing.T) { assert.Equal(t, len(data), streamHandler.streamLength) assert.Len(t, streamHandler.patternMatches, 0) - assert.Equal(t, 2, inserted, "inserted") assert.Equal(t, true, completed, "completed") err = scratch.Free() - require.Nil(t, err, "free scratch") + require.NoError(t, err, "free scratch") err = patterns.Close() - require.Nil(t, err, "close stream database") + require.NoError(t, err, "close stream database") + wrapper.Destroy(t) } func TestReassemblingPatternMatching(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ConnectionStreams) a, err := hyperscan.ParsePattern("/a{8}/i") - require.Nil(t, err) + require.NoError(t, err) a.Id = 0 a.Flags |= hyperscan.SomLeftMost b, err := hyperscan.ParsePattern("/b[c]+b/i") - require.Nil(t, err) + require.NoError(t, err) b.Id = 1 b.Flags |= hyperscan.SomLeftMost d, err := hyperscan.ParsePattern("/[d]+e[d]+/i") - require.Nil(t, err) + require.NoError(t, err) d.Id = 2 d.Flags |= hyperscan.SomLeftMost @@ -247,30 +247,12 @@ func TestReassemblingPatternMatching(t *testing.T) { } patterns, err := hyperscan.NewStreamDatabase(a, b, d) - require.Nil(t, err) + require.NoError(t, err) scratch, err := hyperscan.NewScratch(patterns) - require.Nil(t, err) - storage := &testStorage{} - streamHandler := createTestStreamHandler(storage, patterns, scratch) + require.NoError(t, err) + streamHandler := createTestStreamHandler(wrapper, patterns, scratch) seen := time.Unix(0, 0) - inserted := false - storage.insertFunc = func(ctx context.Context, collectionName string, document interface{}) (i interface{}, err error) { - od := document.(OrderedDocument) - assert.Equal(t, "connection_streams", collectionName) - assert.Equal(t, "bb41a60281cfae830000000000000000", od[0].Value) - assert.Equal(t, nil, od[1].Value) - assert.Equal(t, 0, od[2].Value) - assert.Equal(t, []byte(payload), od[3].Value) - assert.Equal(t, []int{0}, od[4].Value) - assert.Equal(t, []time.Time{seen}, od[5].Value) - assert.Equal(t, []bool{false}, od[6].Value) - assert.Equal(t, expected, od[7].Value) - inserted = true - - return nil, nil - } - streamHandler.Reassembled([]tcpassembly.Reassembly{{ Bytes: []byte(payload), Skip: 0, @@ -278,7 +260,11 @@ func TestReassemblingPatternMatching(t *testing.T) { End: true, Seen: seen, }}) - assert.Equal(t, false, inserted) + + var results []ConnectionStream + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 0) completed := false streamHandler.connection.(*testConnectionHandler).onComplete = func(handler *StreamHandler) { @@ -286,26 +272,36 @@ func TestReassemblingPatternMatching(t *testing.T) { } streamHandler.ReassemblyComplete() + err = wrapper.Storage.Find(ConnectionStreams).Context(wrapper.Context).All(&results) + require.NoError(t, err) + assert.Len(t, results, 1) + assert.Equal(t, seen.Unix(), results[0].ID.Timestamp().Unix()) + assert.Zero(t, results[0].ConnectionID) + assert.Equal(t, 0, results[0].DocumentIndex) + assert.Equal(t, []byte(payload), results[0].Payload) + assert.Equal(t, []int{0}, results[0].BlocksIndexes) + assert.Len(t, results[0].BlocksTimestamps, 1) // should be compared one by one + assert.Equal(t, []bool{false}, results[0].BlocksLoss) + assert.Equal(t, expected, results[0].PatternMatches) + assert.Equal(t, len(payload), streamHandler.currentIndex) assert.Equal(t, seen, streamHandler.firstPacketSeen) assert.Equal(t, seen, streamHandler.lastPacketSeen) assert.Len(t, streamHandler.documentsKeys, 1) assert.Equal(t, len(payload), streamHandler.streamLength) - assert.Equal(t, true, inserted, "inserted") assert.Equal(t, true, completed, "completed") err = scratch.Free() - require.Nil(t, err, "free scratch") + require.NoError(t, err, "free scratch") err = patterns.Close() - require.Nil(t, err, "close stream database") + require.NoError(t, err, "close stream database") + wrapper.Destroy(t) } - -func createTestStreamHandler(storage Storage, patterns hyperscan.StreamDatabase, scratch *hyperscan.Scratch) StreamHandler { +func createTestStreamHandler(wrapper *TestStorageWrapper, patterns hyperscan.StreamDatabase, scratch *hyperscan.Scratch) StreamHandler { testConnectionHandler := &testConnectionHandler{ - storage: storage, - context: context.Background(), + wrapper: wrapper, patterns: patterns, } @@ -318,18 +314,17 @@ func createTestStreamHandler(storage Storage, patterns hyperscan.StreamDatabase, } type testConnectionHandler struct { - storage Storage - context context.Context - patterns hyperscan.StreamDatabase + wrapper *TestStorageWrapper + patterns hyperscan.StreamDatabase onComplete func(*StreamHandler) } func (tch *testConnectionHandler) Storage() Storage { - return tch.storage + return tch.wrapper.Storage } func (tch *testConnectionHandler) Context() context.Context { - return tch.context + return tch.wrapper.Context } func (tch *testConnectionHandler) Patterns() hyperscan.StreamDatabase { |