diff options
author | Emiliano Ciavatta | 2020-04-17 13:19:51 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-17 13:19:51 +0000 |
commit | eceb7772bcd5006352087a10d38faafae561f0af (patch) | |
tree | a20c79bc091aab4017579fbdb9981fa90a19138c | |
parent | c118d899081bc62e28d47a5a0479fb16a24878ec (diff) |
Add pcap import optimizations
-rw-r--r-- | pcap_importer.go | 24 | ||||
-rw-r--r-- | pcap_importer_test.go | 12 |
2 files changed, 19 insertions, 17 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index 628b25d..830ac38 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -35,6 +35,7 @@ type ImportingSession struct { PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"` ImportingError error `json:"importing_error" bson:"importing_error,omitempty"` cancelFunc context.CancelFunc + completed chan error } type flowCount [2]int @@ -76,6 +77,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { ID: hash, PacketsPerService: make(map[uint16]flowCount), cancelFunc: cancelFunc, + completed: make(chan error), } if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil { @@ -129,15 +131,20 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx pi.sessions[session.ID] = dupSession pi.mSessions.Unlock() - if _, err = pi.storage.Update(ImportingSessions). - Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil { - log.WithError(err).WithField("session", session).Error("failed to update importing stats") + if completed || err != nil { + if _, err = pi.storage.Update(ImportingSessions). + Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil { + log.WithError(err).WithField("session", session).Error("failed to update importing stats") + } + session.completed <- err } } handle, err := pcap.OpenOffline(fileName) if err != nil { progressUpdate(false, errors.New("failed to process pcap")) + log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). + Error("failed to open pcap") return } @@ -148,15 +155,11 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx firstPacketTime := time.Time{} updateProgressInterval := time.Tick(importUpdateProgressInterval) - terminate := func() { - handle.Close() - pi.releaseAssembler(assembler) - } - for { select { case <-ctx.Done(): - terminate() + handle.Close() + pi.releaseAssembler(assembler) progressUpdate(false, errors.New("import process cancelled")) return default: @@ -168,7 +171,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx if !firstPacketTime.IsZero() { assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) } - terminate() + handle.Close() + pi.releaseAssembler(assembler) progressUpdate(true, nil) return } diff --git a/pcap_importer_test.go b/pcap_importer_test.go index 234988e..b38d2c9 100644 --- a/pcap_importer_test.go +++ b/pcap_importer_test.go @@ -6,14 +6,12 @@ import ( "github.com/google/gopacket/layers" "github.com/google/gopacket/tcpassembly" "github.com/google/gopacket/tcpassembly/tcpreader" - // log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" "net" "sync" "testing" - "time" ) func TestImportPcap(t *testing.T) { @@ -44,13 +42,13 @@ func TestImportPcap(t *testing.T) { _, isPresent := pcapImporter.GetSession("invalid") assert.False(t, isPresent) - var session ImportingSession + session, isPresent := pcapImporter.GetSession(sessionID) + require.True(t, isPresent) + err, _ = <- session.completed + session, isPresent = pcapImporter.GetSession(sessionID) require.True(t, isPresent) - for session.CompletedAt.IsZero() && session.ImportingError == nil { - time.Sleep(importUpdateProgressInterval) - session, isPresent = pcapImporter.GetSession(sessionID) - } + assert.NoError(t, err) assert.Equal(t, sessionID, session.ID) assert.Equal(t, 15008, session.ProcessedPackets) assert.Equal(t, 0, session.InvalidPackets) |