diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 55 |
1 files changed, 42 insertions, 13 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index 476d8d7..4cca916 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -20,11 +20,6 @@ package main import ( "context" "errors" - "github.com/google/gopacket" - "github.com/google/gopacket/layers" - "github.com/google/gopacket/pcap" - "github.com/google/gopacket/tcpassembly" - log "github.com/sirupsen/logrus" "net" "os" "path" @@ -32,12 +27,19 @@ import ( "sort" "sync" "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "github.com/google/gopacket/tcpassembly" + log "github.com/sirupsen/logrus" ) const PcapsBasePath = "pcaps/" const ProcessingPcapsBasePath = PcapsBasePath + "processing/" const initialAssemblerPoolSize = 16 const importUpdateProgressInterval = 100 * time.Millisecond +const MAX_PCAPS = 100 type PcapImporter struct { storage Storage @@ -176,7 +178,7 @@ func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (fl // Read the pcap and save the tcp stream flow to the database func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) { - handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) + pcapHandle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { pi.progressUpdate(session, fileName, false, "failed to process pcap") log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). @@ -184,7 +186,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu return } - packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) + packetSource := gopacket.NewPacketSource(pcapHandle, pcapHandle.LinkType()) packetSource.NoCopy = true assembler := pi.takeAssembler() packets := packetSource.Packets() @@ -193,7 +195,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu for { select { case <-ctx.Done(): - handle.Close() + pcapHandle.Close() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, false, "import process cancelled") return @@ -202,12 +204,15 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu select { case packet := <-packets: - if packet == nil { // completed + + if packet == nil { + // we read all the packets if flushAll { connectionsClosed := assembler.FlushAll() log.Debugf("connections closed after flush: %v", connectionsClosed) } - handle.Close() + pcapHandle.Close() + pi.tryDeleteOldPcaps() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, true, "") pi.notificationController.Notify("pcap.completed", session) @@ -253,6 +258,17 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu } } +func (pi *PcapImporter) tryDeleteOldPcaps() { + sessions := pi.GetSessions() + size := len(sessions) + + if size > MAX_PCAPS { + hash := sessions[0].ID + // delete the oldest session pcap file + deletePcapFile(hash) + } +} + func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) { if completed { session.CompletedAt = time.Now() @@ -304,13 +320,26 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) { } func deleteProcessingFile(fileName string) { - if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil { + err := os.Remove(ProcessingPcapsBasePath + fileName) + if err != nil { log.WithError(err).Error("failed to delete processing file") } } -func moveProcessingFile(sessionID string, fileName string) { - if err := os.Rename(ProcessingPcapsBasePath+fileName, PcapsBasePath+sessionID+path.Ext(fileName)); err != nil { +func deletePcapFile(fileName string) { + err := os.Remove(PcapsBasePath + fileName) + if err != nil { + log.WithError(err).Error("failed to delete pcap file") + } +} + +func moveProcessingFile(sessionID string, oldFileName string) { + oldExt := path.Ext(oldFileName) + oldpath := ProcessingPcapsBasePath + oldFileName + newpath := PcapsBasePath + sessionID + oldExt + + err := os.Rename(oldpath, newpath) + if err != nil { log.WithError(err).Error("failed to move processed file") } } |