aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer_test.go')
-rw-r--r--pcap_importer_test.go112
1 files changed, 87 insertions, 25 deletions
diff --git a/pcap_importer_test.go b/pcap_importer_test.go
index b38d2c9..bda2cb2 100644
--- a/pcap_importer_test.go
+++ b/pcap_importer_test.go
@@ -8,20 +8,86 @@ import (
"github.com/google/gopacket/tcpassembly/tcpreader"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "go.uber.org/atomic"
"net"
"sync"
"testing"
+ "time"
)
func TestImportPcap(t *testing.T) {
wrapper := NewTestStorageWrapper(t)
+ pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3")
+
+ pcapImporter.releaseAssembler(pcapImporter.takeAssembler())
+
+ sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.NoError(t, err)
+
+ duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.Error(t, err)
+ assert.Equal(t, sessionID, duplicateSessionID)
+
+ _, isPresent := pcapImporter.GetSession("invalid")
+ assert.False(t, isPresent)
+
+ session := waitSessionCompletion(t, pcapImporter, sessionID)
+ assert.Equal(t, 15008, session.ProcessedPackets)
+ assert.Equal(t, 0, session.InvalidPackets)
+ assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService)
+ assert.Zero(t, session.ImportingError)
+
+ checkSessionEquals(t, wrapper, session)
+
+ wrapper.Destroy(t)
+}
+
+func TestCancelImportSession(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3")
+
+ sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.NoError(t, err)
+
+ assert.Error(t, pcapImporter.CancelSession("invalid"))
+ assert.NoError(t, pcapImporter.CancelSession(sessionID))
+
+ session := waitSessionCompletion(t, pcapImporter, sessionID)
+ assert.Zero(t, session.CompletedAt)
+ assert.Equal(t, 0, session.ProcessedPackets)
+ assert.Equal(t, 0, session.InvalidPackets)
+ assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService)
+ assert.NotZero(t, session.ImportingError)
+
+ checkSessionEquals(t, wrapper, session)
+
+ wrapper.Destroy(t)
+}
+
+func TestImportNoTcpPackets(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ pcapImporter := newTestPcapImporter(wrapper, "172.17.0.4")
+
+ sessionID, err := pcapImporter.ImportPcap("test_data/icmp.pcap")
+ require.NoError(t, err)
+
+ session := waitSessionCompletion(t, pcapImporter, sessionID)
+ assert.Equal(t, 2000, session.ProcessedPackets)
+ assert.Equal(t, 2000, session.InvalidPackets)
+ assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService)
+ assert.Zero(t, session.ImportingError)
+
+ checkSessionEquals(t, wrapper, session)
+
+ wrapper.Destroy(t)
+}
+
+func newTestPcapImporter(wrapper *TestStorageWrapper, serverIP string) *PcapImporter {
wrapper.AddCollection(ImportingSessions)
- serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3"))
+ serverEndpoint := layers.NewIPEndpoint(net.ParseIP(serverIP))
streamPool := tcpassembly.NewStreamPool(&testStreamFactory{})
- pcapImporter := PcapImporter{
+ return &PcapImporter{
storage: wrapper.Storage,
streamPool: streamPool,
assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
@@ -30,40 +96,36 @@ func TestImportPcap(t *testing.T) {
mSessions: sync.Mutex{},
serverIP: serverEndpoint,
}
+}
- sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
- require.NoError(t, err)
- assert.NotZero(t, sessionID)
-
- duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
- require.Error(t, err)
- assert.Equal(t, sessionID, duplicateSessionID)
-
- _, isPresent := pcapImporter.GetSession("invalid")
- assert.False(t, isPresent)
-
+func waitSessionCompletion(t *testing.T, pcapImporter *PcapImporter, sessionID string) ImportingSession {
session, isPresent := pcapImporter.GetSession(sessionID)
require.True(t, isPresent)
- err, _ = <- session.completed
+ <-session.completed
session, isPresent = pcapImporter.GetSession(sessionID)
- require.True(t, isPresent)
- assert.NoError(t, err)
+ assert.True(t, isPresent)
assert.Equal(t, sessionID, session.ID)
- assert.Equal(t, 15008, session.ProcessedPackets)
- assert.Equal(t, 0, session.InvalidPackets)
- assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService)
- assert.NoError(t, session.ImportingError)
- wrapper.Destroy(t)
+ return session
+}
+
+func checkSessionEquals(t *testing.T, wrapper *TestStorageWrapper, session ImportingSession) {
+ var result ImportingSession
+ assert.NoError(t, wrapper.Storage.Find(ImportingSessions).Filter(OrderedDocument{{"_id", session.ID}}).
+ Context(wrapper.Context).First(&result))
+ assert.Equal(t, session.CompletedAt.Unix(), result.CompletedAt.Unix())
+ session.CompletedAt = time.Time{}
+ result.CompletedAt = time.Time{}
+ session.cancelFunc = nil
+ session.completed = nil
+ assert.Equal(t, session, result)
}
-type testStreamFactory struct{
- counter atomic.Int32
+type testStreamFactory struct {
}
func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream {
- sf.counter.Inc()
reader := tcpreader.NewReaderStream()
go func() {
buffer := bufio.NewReader(&reader)