package main import ( "bufio" "fmt" "github.com/google/gopacket" "github.com/google/gopacket/tcpassembly" "github.com/google/gopacket/tcpassembly/tcpreader" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "os" "sync" "testing" "time" ) func TestImportPcap(t *testing.T) { wrapper := NewTestStorageWrapper(t) pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3") pcapImporter.releaseAssembler(pcapImporter.takeAssembler()) fileName := copyToProcessing(t, "ping_pong_10000.pcap") sessionID, err := pcapImporter.ImportPcap(fileName, false) require.NoError(t, err) duplicatePcapFileName := copyToProcessing(t, "ping_pong_10000.pcap") duplicateSessionID, err := pcapImporter.ImportPcap(duplicatePcapFileName, false) require.Error(t, err) assert.Equal(t, sessionID, duplicateSessionID) assert.Error(t, os.Remove(ProcessingPcapsBasePath + duplicatePcapFileName)) _, isPresent := pcapImporter.GetSession("invalid") assert.False(t, isPresent) session := waitSessionCompletion(t, pcapImporter, sessionID) assert.Equal(t, 15008, session.ProcessedPackets) assert.Equal(t, 0, session.InvalidPackets) assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService) assert.Zero(t, session.ImportingError) checkSessionEquals(t, wrapper, session) assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) assert.NoError(t, os.Remove(PcapsBasePath + session.ID + ".pcap")) wrapper.Destroy(t) } func TestCancelImportSession(t *testing.T) { wrapper := NewTestStorageWrapper(t) pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3") fileName := copyToProcessing(t, "ping_pong_10000.pcap") sessionID, err := pcapImporter.ImportPcap(fileName, false) require.NoError(t, err) assert.False(t, pcapImporter.CancelSession("invalid")) assert.True(t, pcapImporter.CancelSession(sessionID)) session := waitSessionCompletion(t, pcapImporter, sessionID) assert.Zero(t, session.CompletedAt) assert.Equal(t, int64(1270696), session.Size) assert.Equal(t, 0, session.ProcessedPackets) assert.Equal(t, 0, session.InvalidPackets) assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService) assert.NotZero(t, session.ImportingError) checkSessionEquals(t, wrapper, session) assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) assert.Error(t, os.Remove(PcapsBasePath + sessionID + ".pcap")) wrapper.Destroy(t) } func TestImportNoTcpPackets(t *testing.T) { wrapper := NewTestStorageWrapper(t) pcapImporter := newTestPcapImporter(wrapper, "172.17.0.4") fileName := copyToProcessing(t, "icmp.pcap") sessionID, err := pcapImporter.ImportPcap(fileName, false) require.NoError(t, err) session := waitSessionCompletion(t, pcapImporter, sessionID) assert.Equal(t, int64(228024), session.Size) assert.Equal(t, 2000, session.ProcessedPackets) assert.Equal(t, 2000, session.InvalidPackets) assert.Equal(t, map[uint16]flowCount{}, session.PacketsPerService) assert.Zero(t, session.ImportingError) checkSessionEquals(t, wrapper, session) assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) assert.NoError(t, os.Remove(PcapsBasePath + sessionID + ".pcap")) wrapper.Destroy(t) } func newTestPcapImporter(wrapper *TestStorageWrapper, serverAddress string) *PcapImporter { wrapper.AddCollection(ImportingSessions) streamPool := tcpassembly.NewStreamPool(&testStreamFactory{}) return &PcapImporter{ storage: wrapper.Storage, streamPool: streamPool, assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), sessions: make(map[string]ImportingSession), mAssemblers: sync.Mutex{}, mSessions: sync.Mutex{}, serverNet: *ParseIPNet(serverAddress), } } func waitSessionCompletion(t *testing.T, pcapImporter *PcapImporter, sessionID string) ImportingSession { session, isPresent := pcapImporter.GetSession(sessionID) require.True(t, isPresent) <-session.completed session, isPresent = pcapImporter.GetSession(sessionID) assert.True(t, isPresent) assert.Equal(t, sessionID, session.ID) return session } func checkSessionEquals(t *testing.T, wrapper *TestStorageWrapper, session ImportingSession) { var result ImportingSession assert.NoError(t, wrapper.Storage.Find(ImportingSessions).Filter(OrderedDocument{{"_id", session.ID}}). Context(wrapper.Context).First(&result)) assert.Equal(t, session.StartedAt.Unix(), result.StartedAt.Unix()) assert.Equal(t, session.CompletedAt.Unix(), result.CompletedAt.Unix()) session.StartedAt = time.Time{} result.StartedAt = time.Time{} session.CompletedAt = time.Time{} result.CompletedAt = time.Time{} session.cancelFunc = nil session.completed = nil assert.Equal(t, session, result) } func copyToProcessing(t *testing.T, fileName string) string { newFile := fmt.Sprintf("test-%v-%s", time.Now().UnixNano(), fileName) require.NoError(t, CopyFile(ProcessingPcapsBasePath + newFile, "test_data/" + fileName)) return newFile } type testStreamFactory struct { } func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream { reader := tcpreader.NewReaderStream() go func() { buffer := bufio.NewReader(&reader) tcpreader.DiscardBytesToEOF(buffer) }() return &reader }