diff options
Diffstat (limited to 'pcap_importer_test.go')
-rw-r--r-- | pcap_importer_test.go | 112 |
1 files changed, 87 insertions, 25 deletions
diff --git a/pcap_importer_test.go b/pcap_importer_test.go index b38d2c9..bda2cb2 100644 --- a/pcap_importer_test.go +++ b/pcap_importer_test.go @@ -8,20 +8,86 @@ import ( "github.com/google/gopacket/tcpassembly/tcpreader" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "net" "sync" "testing" + "time" ) func TestImportPcap(t *testing.T) { wrapper := NewTestStorageWrapper(t) + pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3") + + pcapImporter.releaseAssembler(pcapImporter.takeAssembler()) + + sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.NoError(t, err) + + duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.Error(t, err) + assert.Equal(t, sessionID, duplicateSessionID) + + _, isPresent := pcapImporter.GetSession("invalid") + assert.False(t, isPresent) + + session := waitSessionCompletion(t, pcapImporter, sessionID) + assert.Equal(t, 15008, session.ProcessedPackets) + assert.Equal(t, 0, session.InvalidPackets) + assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService) + assert.Zero(t, session.ImportingError) + + checkSessionEquals(t, wrapper, session) + + wrapper.Destroy(t) +} + +func TestCancelImportSession(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3") + + sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.NoError(t, err) + + assert.Error(t, pcapImporter.CancelSession("invalid")) + assert.NoError(t, pcapImporter.CancelSession(sessionID)) + + session := waitSessionCompletion(t, pcapImporter, sessionID) + assert.Zero(t, session.CompletedAt) + assert.Equal(t, 0, session.ProcessedPackets) + assert.Equal(t, 0, session.InvalidPackets) + assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService) + assert.NotZero(t, session.ImportingError) + + checkSessionEquals(t, wrapper, session) + + wrapper.Destroy(t) +} + +func TestImportNoTcpPackets(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + pcapImporter := newTestPcapImporter(wrapper, "172.17.0.4") + + sessionID, err := pcapImporter.ImportPcap("test_data/icmp.pcap") + require.NoError(t, err) + + session := waitSessionCompletion(t, pcapImporter, sessionID) + assert.Equal(t, 2000, session.ProcessedPackets) + assert.Equal(t, 2000, session.InvalidPackets) + assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService) + assert.Zero(t, session.ImportingError) + + checkSessionEquals(t, wrapper, session) + + wrapper.Destroy(t) +} + +func newTestPcapImporter(wrapper *TestStorageWrapper, serverIP string) *PcapImporter { wrapper.AddCollection(ImportingSessions) - serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3")) + serverEndpoint := layers.NewIPEndpoint(net.ParseIP(serverIP)) streamPool := tcpassembly.NewStreamPool(&testStreamFactory{}) - pcapImporter := PcapImporter{ + return &PcapImporter{ storage: wrapper.Storage, streamPool: streamPool, assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), @@ -30,40 +96,36 @@ func TestImportPcap(t *testing.T) { mSessions: sync.Mutex{}, serverIP: serverEndpoint, } +} - sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") - require.NoError(t, err) - assert.NotZero(t, sessionID) - - duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") - require.Error(t, err) - assert.Equal(t, sessionID, duplicateSessionID) - - _, isPresent := pcapImporter.GetSession("invalid") - assert.False(t, isPresent) - +func waitSessionCompletion(t *testing.T, pcapImporter *PcapImporter, sessionID string) ImportingSession { session, isPresent := pcapImporter.GetSession(sessionID) require.True(t, isPresent) - err, _ = <- session.completed + <-session.completed session, isPresent = pcapImporter.GetSession(sessionID) - require.True(t, isPresent) - assert.NoError(t, err) + assert.True(t, isPresent) assert.Equal(t, sessionID, session.ID) - assert.Equal(t, 15008, session.ProcessedPackets) - assert.Equal(t, 0, session.InvalidPackets) - assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService) - assert.NoError(t, session.ImportingError) - wrapper.Destroy(t) + return session +} + +func checkSessionEquals(t *testing.T, wrapper *TestStorageWrapper, session ImportingSession) { + var result ImportingSession + assert.NoError(t, wrapper.Storage.Find(ImportingSessions).Filter(OrderedDocument{{"_id", session.ID}}). + Context(wrapper.Context).First(&result)) + assert.Equal(t, session.CompletedAt.Unix(), result.CompletedAt.Unix()) + session.CompletedAt = time.Time{} + result.CompletedAt = time.Time{} + session.cancelFunc = nil + session.completed = nil + assert.Equal(t, session, result) } -type testStreamFactory struct{ - counter atomic.Int32 +type testStreamFactory struct { } func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream { - sf.counter.Inc() reader := tcpreader.NewReaderStream() go func() { buffer := bufio.NewReader(&reader) |