aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--application_router.go3
-rw-r--r--caronte_test.go8
-rw-r--r--docker-compose.yml1
-rw-r--r--pcap_importer.go55
4 files changed, 50 insertions, 17 deletions
diff --git a/application_router.go b/application_router.go
index d51843a..08988ea 100644
--- a/application_router.go
+++ b/application_router.go
@@ -233,7 +233,8 @@ func CreateApplicationRouter(applicationContext *ApplicationContext,
} else if FileExists(PcapsBasePath + sessionID + ".pcapng") {
c.FileAttachment(PcapsBasePath+sessionID+".pcapng", sessionID[:16]+".pcapng")
} else {
- log.WithField("sessionID", sessionID).Panic("pcap file not exists")
+ log.WithField("sessionID", sessionID).Warn("requested pcap file not found")
+ notFound(c, gin.H{"session": sessionID})
}
} else {
notFound(c, gin.H{"session": sessionID})
diff --git a/caronte_test.go b/caronte_test.go
index 8935ea3..0628e72 100644
--- a/caronte_test.go
+++ b/caronte_test.go
@@ -20,12 +20,13 @@ package main
import (
"context"
"fmt"
- log "github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
"os"
"strconv"
"testing"
"time"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
)
type TestStorageWrapper struct {
@@ -51,7 +52,8 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper {
storage, err := NewMongoStorage(mongoHost, port, dbName)
require.NoError(t, err)
- ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
return &TestStorageWrapper{
DbName: dbName,
diff --git a/docker-compose.yml b/docker-compose.yml
index d2f0abf..484755a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -9,6 +9,7 @@ services:
caronte:
image: ghcr.io/eciavatta/caronte:latest
+ build: .
ports:
- "3333:3333"
environment:
diff --git a/pcap_importer.go b/pcap_importer.go
index 476d8d7..4cca916 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -20,11 +20,6 @@ package main
import (
"context"
"errors"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "github.com/google/gopacket/pcap"
- "github.com/google/gopacket/tcpassembly"
- log "github.com/sirupsen/logrus"
"net"
"os"
"path"
@@ -32,12 +27,19 @@ import (
"sort"
"sync"
"time"
+
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/pcap"
+ "github.com/google/gopacket/tcpassembly"
+ log "github.com/sirupsen/logrus"
)
const PcapsBasePath = "pcaps/"
const ProcessingPcapsBasePath = PcapsBasePath + "processing/"
const initialAssemblerPoolSize = 16
const importUpdateProgressInterval = 100 * time.Millisecond
+const MAX_PCAPS = 100
type PcapImporter struct {
storage Storage
@@ -176,7 +178,7 @@ func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (fl
// Read the pcap and save the tcp stream flow to the database
func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) {
- handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
+ pcapHandle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
pi.progressUpdate(session, fileName, false, "failed to process pcap")
log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
@@ -184,7 +186,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
return
}
- packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
+ packetSource := gopacket.NewPacketSource(pcapHandle, pcapHandle.LinkType())
packetSource.NoCopy = true
assembler := pi.takeAssembler()
packets := packetSource.Packets()
@@ -193,7 +195,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
for {
select {
case <-ctx.Done():
- handle.Close()
+ pcapHandle.Close()
pi.releaseAssembler(assembler)
pi.progressUpdate(session, fileName, false, "import process cancelled")
return
@@ -202,12 +204,15 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
select {
case packet := <-packets:
- if packet == nil { // completed
+
+ if packet == nil {
+ // we read all the packets
if flushAll {
connectionsClosed := assembler.FlushAll()
log.Debugf("connections closed after flush: %v", connectionsClosed)
}
- handle.Close()
+ pcapHandle.Close()
+ pi.tryDeleteOldPcaps()
pi.releaseAssembler(assembler)
pi.progressUpdate(session, fileName, true, "")
pi.notificationController.Notify("pcap.completed", session)
@@ -253,6 +258,17 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
}
}
+func (pi *PcapImporter) tryDeleteOldPcaps() {
+ sessions := pi.GetSessions()
+ size := len(sessions)
+
+ if size > MAX_PCAPS {
+ hash := sessions[0].ID
+ // delete the oldest session pcap file
+ deletePcapFile(hash)
+ }
+}
+
func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) {
if completed {
session.CompletedAt = time.Now()
@@ -304,13 +320,26 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) {
}
func deleteProcessingFile(fileName string) {
- if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil {
+ err := os.Remove(ProcessingPcapsBasePath + fileName)
+ if err != nil {
log.WithError(err).Error("failed to delete processing file")
}
}
-func moveProcessingFile(sessionID string, fileName string) {
- if err := os.Rename(ProcessingPcapsBasePath+fileName, PcapsBasePath+sessionID+path.Ext(fileName)); err != nil {
+func deletePcapFile(fileName string) {
+ err := os.Remove(PcapsBasePath + fileName)
+ if err != nil {
+ log.WithError(err).Error("failed to delete pcap file")
+ }
+}
+
+func moveProcessingFile(sessionID string, oldFileName string) {
+ oldExt := path.Ext(oldFileName)
+ oldpath := ProcessingPcapsBasePath + oldFileName
+ newpath := PcapsBasePath + sessionID + oldExt
+
+ err := os.Rename(oldpath, newpath)
+ if err != nil {
log.WithError(err).Error("failed to move processed file")
}
}