diff options
Diffstat (limited to 'pcap_importer_test.go')
-rw-r--r-- | pcap_importer_test.go | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/pcap_importer_test.go b/pcap_importer_test.go new file mode 100644 index 0000000..234988e --- /dev/null +++ b/pcap_importer_test.go @@ -0,0 +1,75 @@ +package main + +import ( + "bufio" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/tcpassembly" + "github.com/google/gopacket/tcpassembly/tcpreader" + // log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "net" + "sync" + "testing" + "time" +) + +func TestImportPcap(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ImportingSessions) + + serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3")) + streamPool := tcpassembly.NewStreamPool(&testStreamFactory{}) + + pcapImporter := PcapImporter{ + storage: wrapper.Storage, + streamPool: streamPool, + assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), + sessions: make(map[string]ImportingSession), + mAssemblers: sync.Mutex{}, + mSessions: sync.Mutex{}, + serverIP: serverEndpoint, + } + + sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.NoError(t, err) + assert.NotZero(t, sessionID) + + duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.Error(t, err) + assert.Equal(t, sessionID, duplicateSessionID) + + _, isPresent := pcapImporter.GetSession("invalid") + assert.False(t, isPresent) + + var session ImportingSession + session, isPresent = pcapImporter.GetSession(sessionID) + require.True(t, isPresent) + for session.CompletedAt.IsZero() && session.ImportingError == nil { + time.Sleep(importUpdateProgressInterval) + session, isPresent = pcapImporter.GetSession(sessionID) + } + assert.Equal(t, sessionID, session.ID) + assert.Equal(t, 15008, session.ProcessedPackets) + assert.Equal(t, 0, session.InvalidPackets) + assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService) + assert.NoError(t, session.ImportingError) + + wrapper.Destroy(t) +} + +type testStreamFactory struct{ + counter atomic.Int32 +} + +func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream { + sf.counter.Inc() + reader := tcpreader.NewReaderStream() + go func() { + buffer := bufio.NewReader(&reader) + tcpreader.DiscardBytesToEOF(buffer) + }() + return &reader +} |