aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer_test.go')
-rw-r--r--pcap_importer_test.go75
1 files changed, 75 insertions, 0 deletions
diff --git a/pcap_importer_test.go b/pcap_importer_test.go
new file mode 100644
index 0000000..234988e
--- /dev/null
+++ b/pcap_importer_test.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "bufio"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/tcpassembly"
+ "github.com/google/gopacket/tcpassembly/tcpreader"
+ // log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/atomic"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestImportPcap(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ImportingSessions)
+
+ serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3"))
+ streamPool := tcpassembly.NewStreamPool(&testStreamFactory{})
+
+ pcapImporter := PcapImporter{
+ storage: wrapper.Storage,
+ streamPool: streamPool,
+ assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
+ sessions: make(map[string]ImportingSession),
+ mAssemblers: sync.Mutex{},
+ mSessions: sync.Mutex{},
+ serverIP: serverEndpoint,
+ }
+
+ sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.NoError(t, err)
+ assert.NotZero(t, sessionID)
+
+ duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.Error(t, err)
+ assert.Equal(t, sessionID, duplicateSessionID)
+
+ _, isPresent := pcapImporter.GetSession("invalid")
+ assert.False(t, isPresent)
+
+ var session ImportingSession
+ session, isPresent = pcapImporter.GetSession(sessionID)
+ require.True(t, isPresent)
+ for session.CompletedAt.IsZero() && session.ImportingError == nil {
+ time.Sleep(importUpdateProgressInterval)
+ session, isPresent = pcapImporter.GetSession(sessionID)
+ }
+ assert.Equal(t, sessionID, session.ID)
+ assert.Equal(t, 15008, session.ProcessedPackets)
+ assert.Equal(t, 0, session.InvalidPackets)
+ assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService)
+ assert.NoError(t, session.ImportingError)
+
+ wrapper.Destroy(t)
+}
+
+type testStreamFactory struct{
+ counter atomic.Int32
+}
+
+func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream {
+ sf.counter.Inc()
+ reader := tcpreader.NewReaderStream()
+ go func() {
+ buffer := bufio.NewReader(&reader)
+ tcpreader.DiscardBytesToEOF(buffer)
+ }()
+ return &reader
+}