diff options
-rw-r--r-- | application_router.go | 3 | ||||
-rw-r--r-- | caronte_test.go | 8 | ||||
-rw-r--r-- | docker-compose.yml | 1 | ||||
-rw-r--r-- | pcap_importer.go | 55 |
4 files changed, 50 insertions, 17 deletions
diff --git a/application_router.go b/application_router.go index d51843a..08988ea 100644 --- a/application_router.go +++ b/application_router.go @@ -233,7 +233,8 @@ func CreateApplicationRouter(applicationContext *ApplicationContext, } else if FileExists(PcapsBasePath + sessionID + ".pcapng") { c.FileAttachment(PcapsBasePath+sessionID+".pcapng", sessionID[:16]+".pcapng") } else { - log.WithField("sessionID", sessionID).Panic("pcap file not exists") + log.WithField("sessionID", sessionID).Warn("requested pcap file not found") + notFound(c, gin.H{"session": sessionID}) } } else { notFound(c, gin.H{"session": sessionID}) diff --git a/caronte_test.go b/caronte_test.go index 8935ea3..0628e72 100644 --- a/caronte_test.go +++ b/caronte_test.go @@ -20,12 +20,13 @@ package main import ( "context" "fmt" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" "os" "strconv" "testing" "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" ) type TestStorageWrapper struct { @@ -51,7 +52,8 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper { storage, err := NewMongoStorage(mongoHost, port, dbName) require.NoError(t, err) - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() return &TestStorageWrapper{ DbName: dbName, diff --git a/docker-compose.yml b/docker-compose.yml index d2f0abf..484755a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,7 @@ services: caronte: image: ghcr.io/eciavatta/caronte:latest + build: . ports: - "3333:3333" environment: diff --git a/pcap_importer.go b/pcap_importer.go index 476d8d7..4cca916 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -20,11 +20,6 @@ package main import ( "context" "errors" - "github.com/google/gopacket" - "github.com/google/gopacket/layers" - "github.com/google/gopacket/pcap" - "github.com/google/gopacket/tcpassembly" - log "github.com/sirupsen/logrus" "net" "os" "path" @@ -32,12 +27,19 @@ import ( "sort" "sync" "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "github.com/google/gopacket/tcpassembly" + log "github.com/sirupsen/logrus" ) const PcapsBasePath = "pcaps/" const ProcessingPcapsBasePath = PcapsBasePath + "processing/" const initialAssemblerPoolSize = 16 const importUpdateProgressInterval = 100 * time.Millisecond +const MAX_PCAPS = 100 type PcapImporter struct { storage Storage @@ -176,7 +178,7 @@ func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (fl // Read the pcap and save the tcp stream flow to the database func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) { - handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) + pcapHandle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { pi.progressUpdate(session, fileName, false, "failed to process pcap") log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). @@ -184,7 +186,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu return } - packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) + packetSource := gopacket.NewPacketSource(pcapHandle, pcapHandle.LinkType()) packetSource.NoCopy = true assembler := pi.takeAssembler() packets := packetSource.Packets() @@ -193,7 +195,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu for { select { case <-ctx.Done(): - handle.Close() + pcapHandle.Close() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, false, "import process cancelled") return @@ -202,12 +204,15 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu select { case packet := <-packets: - if packet == nil { // completed + + if packet == nil { + // we read all the packets if flushAll { connectionsClosed := assembler.FlushAll() log.Debugf("connections closed after flush: %v", connectionsClosed) } - handle.Close() + pcapHandle.Close() + pi.tryDeleteOldPcaps() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, true, "") pi.notificationController.Notify("pcap.completed", session) @@ -253,6 +258,17 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu } } +func (pi *PcapImporter) tryDeleteOldPcaps() { + sessions := pi.GetSessions() + size := len(sessions) + + if size > MAX_PCAPS { + hash := sessions[0].ID + // delete the oldest session pcap file + deletePcapFile(hash) + } +} + func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) { if completed { session.CompletedAt = time.Now() @@ -304,13 +320,26 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) { } func deleteProcessingFile(fileName string) { - if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil { + err := os.Remove(ProcessingPcapsBasePath + fileName) + if err != nil { log.WithError(err).Error("failed to delete processing file") } } -func moveProcessingFile(sessionID string, fileName string) { - if err := os.Rename(ProcessingPcapsBasePath+fileName, PcapsBasePath+sessionID+path.Ext(fileName)); err != nil { +func deletePcapFile(fileName string) { + err := os.Remove(PcapsBasePath + fileName) + if err != nil { + log.WithError(err).Error("failed to delete pcap file") + } +} + +func moveProcessingFile(sessionID string, oldFileName string) { + oldExt := path.Ext(oldFileName) + oldpath := ProcessingPcapsBasePath + oldFileName + newpath := PcapsBasePath + sessionID + oldExt + + err := os.Rename(oldpath, newpath) + if err != nil { log.WithError(err).Error("failed to move processed file") } } |