aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-17 13:19:51 +0000
committerEmiliano Ciavatta2020-04-17 13:19:51 +0000
commiteceb7772bcd5006352087a10d38faafae561f0af (patch)
treea20c79bc091aab4017579fbdb9981fa90a19138c
parentc118d899081bc62e28d47a5a0479fb16a24878ec (diff)
Add pcap import optimizations
-rw-r--r--pcap_importer.go24
-rw-r--r--pcap_importer_test.go12
2 files changed, 19 insertions, 17 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index 628b25d..830ac38 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -35,6 +35,7 @@ type ImportingSession struct {
PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"`
ImportingError error `json:"importing_error" bson:"importing_error,omitempty"`
cancelFunc context.CancelFunc
+ completed chan error
}
type flowCount [2]int
@@ -76,6 +77,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
ID: hash,
PacketsPerService: make(map[uint16]flowCount),
cancelFunc: cancelFunc,
+ completed: make(chan error),
}
if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil {
@@ -129,15 +131,20 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
pi.sessions[session.ID] = dupSession
pi.mSessions.Unlock()
- if _, err = pi.storage.Update(ImportingSessions).
- Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil {
- log.WithError(err).WithField("session", session).Error("failed to update importing stats")
+ if completed || err != nil {
+ if _, err = pi.storage.Update(ImportingSessions).
+ Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil {
+ log.WithError(err).WithField("session", session).Error("failed to update importing stats")
+ }
+ session.completed <- err
}
}
handle, err := pcap.OpenOffline(fileName)
if err != nil {
progressUpdate(false, errors.New("failed to process pcap"))
+ log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
+ Error("failed to open pcap")
return
}
@@ -148,15 +155,11 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
firstPacketTime := time.Time{}
updateProgressInterval := time.Tick(importUpdateProgressInterval)
- terminate := func() {
- handle.Close()
- pi.releaseAssembler(assembler)
- }
-
for {
select {
case <-ctx.Done():
- terminate()
+ handle.Close()
+ pi.releaseAssembler(assembler)
progressUpdate(false, errors.New("import process cancelled"))
return
default:
@@ -168,7 +171,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
if !firstPacketTime.IsZero() {
assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
}
- terminate()
+ handle.Close()
+ pi.releaseAssembler(assembler)
progressUpdate(true, nil)
return
}
diff --git a/pcap_importer_test.go b/pcap_importer_test.go
index 234988e..b38d2c9 100644
--- a/pcap_importer_test.go
+++ b/pcap_importer_test.go
@@ -6,14 +6,12 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
- // log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"net"
"sync"
"testing"
- "time"
)
func TestImportPcap(t *testing.T) {
@@ -44,13 +42,13 @@ func TestImportPcap(t *testing.T) {
_, isPresent := pcapImporter.GetSession("invalid")
assert.False(t, isPresent)
- var session ImportingSession
+ session, isPresent := pcapImporter.GetSession(sessionID)
+ require.True(t, isPresent)
+ err, _ = <- session.completed
+
session, isPresent = pcapImporter.GetSession(sessionID)
require.True(t, isPresent)
- for session.CompletedAt.IsZero() && session.ImportingError == nil {
- time.Sleep(importUpdateProgressInterval)
- session, isPresent = pcapImporter.GetSession(sessionID)
- }
+ assert.NoError(t, err)
assert.Equal(t, sessionID, session.ID)
assert.Equal(t, 15008, session.ProcessedPackets)
assert.Equal(t, 0, session.InvalidPackets)