diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 73 |
1 files changed, 33 insertions, 40 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index 830ac38..bb09867 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -33,9 +33,9 @@ type ImportingSession struct { ProcessedPackets int `json:"processed_packets" bson:"processed_packets"` InvalidPackets int `json:"invalid_packets" bson:"invalid_packets"` PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"` - ImportingError error `json:"importing_error" bson:"importing_error,omitempty"` + ImportingError string `json:"importing_error" bson:"importing_error,omitempty"` cancelFunc context.CancelFunc - completed chan error + completed chan string } type flowCount [2]int @@ -77,16 +77,9 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { ID: hash, PacketsPerService: make(map[uint16]flowCount), cancelFunc: cancelFunc, - completed: make(chan error), + completed: make(chan string), } - if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil { - pi.mSessions.Unlock() - log.WithError(err).WithField("session", session).Panic("failed to insert a session into database") - } else if result == nil { - pi.mSessions.Unlock() - return hash, errors.New("pcap already processed") - } pi.sessions[hash] = session pi.mSessions.Unlock() @@ -115,34 +108,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) error { // Read the pcap and save the tcp stream flow to the database func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { - progressUpdate := func(completed bool, err error) { - if completed { - session.CompletedAt = time.Now() - } - session.ImportingError = err - - dupSession := session - dupSession.PacketsPerService = make(map[uint16]flowCount, len(session.PacketsPerService)) - for key, value := range session.PacketsPerService { - dupSession.PacketsPerService[key] = value - } - - pi.mSessions.Lock() - pi.sessions[session.ID] = dupSession - pi.mSessions.Unlock() - - if completed || err != nil { - if _, err = pi.storage.Update(ImportingSessions). - Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil { - log.WithError(err).WithField("session", session).Error("failed to update importing stats") - } - session.completed <- err - } - } - handle, err := pcap.OpenOffline(fileName) if err != nil { - progressUpdate(false, errors.New("failed to process pcap")) + pi.progressUpdate(session, false, "failed to process pcap") log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). Error("failed to open pcap") return @@ -160,7 +128,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx case <-ctx.Done(): handle.Close() pi.releaseAssembler(assembler) - progressUpdate(false, errors.New("import process cancelled")) + pi.progressUpdate(session, false, "import process cancelled") return default: } @@ -173,7 +141,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx } handle.Close() pi.releaseAssembler(assembler) - progressUpdate(true, nil) + pi.progressUpdate(session, true, "") return } @@ -182,12 +150,13 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx firstPacketTime = timestamp } + session.ProcessedPackets++ + if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet session.InvalidPackets++ continue } - session.ProcessedPackets++ tcp := packet.TransportLayer().(*layers.TCP) var servicePort uint16 @@ -213,8 +182,32 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) case <-updateProgressInterval: - progressUpdate(false, nil) + pi.progressUpdate(session, false, "") + } + } +} + +func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) { + if completed { + session.CompletedAt = time.Now() + } + session.ImportingError = err + + packetsPerService := session.PacketsPerService + session.PacketsPerService = make(map[uint16]flowCount, len(packetsPerService)) + for key, value := range packetsPerService { + session.PacketsPerService[key] = value + } + + pi.mSessions.Lock() + pi.sessions[session.ID] = session + pi.mSessions.Unlock() + + if completed || session.ImportingError != "" { + if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil { + log.WithError(_err).WithField("session", session).Error("failed to insert importing stats") } + session.completed <- session.ImportingError } } |