aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go73
1 files changed, 33 insertions, 40 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index 830ac38..bb09867 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -33,9 +33,9 @@ type ImportingSession struct {
ProcessedPackets int `json:"processed_packets" bson:"processed_packets"`
InvalidPackets int `json:"invalid_packets" bson:"invalid_packets"`
PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"`
- ImportingError error `json:"importing_error" bson:"importing_error,omitempty"`
+ ImportingError string `json:"importing_error" bson:"importing_error,omitempty"`
cancelFunc context.CancelFunc
- completed chan error
+ completed chan string
}
type flowCount [2]int
@@ -77,16 +77,9 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
ID: hash,
PacketsPerService: make(map[uint16]flowCount),
cancelFunc: cancelFunc,
- completed: make(chan error),
+ completed: make(chan string),
}
- if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil {
- pi.mSessions.Unlock()
- log.WithError(err).WithField("session", session).Panic("failed to insert a session into database")
- } else if result == nil {
- pi.mSessions.Unlock()
- return hash, errors.New("pcap already processed")
- }
pi.sessions[hash] = session
pi.mSessions.Unlock()
@@ -115,34 +108,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) error {
// Read the pcap and save the tcp stream flow to the database
func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
- progressUpdate := func(completed bool, err error) {
- if completed {
- session.CompletedAt = time.Now()
- }
- session.ImportingError = err
-
- dupSession := session
- dupSession.PacketsPerService = make(map[uint16]flowCount, len(session.PacketsPerService))
- for key, value := range session.PacketsPerService {
- dupSession.PacketsPerService[key] = value
- }
-
- pi.mSessions.Lock()
- pi.sessions[session.ID] = dupSession
- pi.mSessions.Unlock()
-
- if completed || err != nil {
- if _, err = pi.storage.Update(ImportingSessions).
- Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil {
- log.WithError(err).WithField("session", session).Error("failed to update importing stats")
- }
- session.completed <- err
- }
- }
-
handle, err := pcap.OpenOffline(fileName)
if err != nil {
- progressUpdate(false, errors.New("failed to process pcap"))
+ pi.progressUpdate(session, false, "failed to process pcap")
log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
Error("failed to open pcap")
return
@@ -160,7 +128,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
case <-ctx.Done():
handle.Close()
pi.releaseAssembler(assembler)
- progressUpdate(false, errors.New("import process cancelled"))
+ pi.progressUpdate(session, false, "import process cancelled")
return
default:
}
@@ -173,7 +141,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
}
handle.Close()
pi.releaseAssembler(assembler)
- progressUpdate(true, nil)
+ pi.progressUpdate(session, true, "")
return
}
@@ -182,12 +150,13 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
firstPacketTime = timestamp
}
+ session.ProcessedPackets++
+
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet
session.InvalidPackets++
continue
}
- session.ProcessedPackets++
tcp := packet.TransportLayer().(*layers.TCP)
var servicePort uint16
@@ -213,8 +182,32 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
case <-updateProgressInterval:
- progressUpdate(false, nil)
+ pi.progressUpdate(session, false, "")
+ }
+ }
+}
+
+func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) {
+ if completed {
+ session.CompletedAt = time.Now()
+ }
+ session.ImportingError = err
+
+ packetsPerService := session.PacketsPerService
+ session.PacketsPerService = make(map[uint16]flowCount, len(packetsPerService))
+ for key, value := range packetsPerService {
+ session.PacketsPerService[key] = value
+ }
+
+ pi.mSessions.Lock()
+ pi.sessions[session.ID] = session
+ pi.mSessions.Unlock()
+
+ if completed || session.ImportingError != "" {
+ if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil {
+ log.WithError(_err).WithField("session", session).Error("failed to insert importing stats")
}
+ session.completed <- session.ImportingError
}
}