aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
authorVaiTon2023-04-29 18:17:01 +0000
committerVaiTon2023-04-29 18:17:01 +0000
commit195450507328055200337c1138cdaeaab47c18d5 (patch)
tree957b687ce013e3339d255fc093257a99ae1d1cc8 /pcap_importer.go
parentcb1ee4f3a9ff09b11c99fd802225594a35497110 (diff)
Delete old pcaps to avoid infinite space consumption.
For the moment a const is used, namely MAX_PCAPS in pcap_importer.go. It should, however, be then refactored to a runtime config.
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go55
1 files changed, 42 insertions, 13 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index 476d8d7..4cca916 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -20,11 +20,6 @@ package main
import (
"context"
"errors"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "github.com/google/gopacket/pcap"
- "github.com/google/gopacket/tcpassembly"
- log "github.com/sirupsen/logrus"
"net"
"os"
"path"
@@ -32,12 +27,19 @@ import (
"sort"
"sync"
"time"
+
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/pcap"
+ "github.com/google/gopacket/tcpassembly"
+ log "github.com/sirupsen/logrus"
)
const PcapsBasePath = "pcaps/"
const ProcessingPcapsBasePath = PcapsBasePath + "processing/"
const initialAssemblerPoolSize = 16
const importUpdateProgressInterval = 100 * time.Millisecond
+const MAX_PCAPS = 100
type PcapImporter struct {
storage Storage
@@ -176,7 +178,7 @@ func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (fl
// Read the pcap and save the tcp stream flow to the database
func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) {
- handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
+ pcapHandle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
pi.progressUpdate(session, fileName, false, "failed to process pcap")
log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
@@ -184,7 +186,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
return
}
- packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
+ packetSource := gopacket.NewPacketSource(pcapHandle, pcapHandle.LinkType())
packetSource.NoCopy = true
assembler := pi.takeAssembler()
packets := packetSource.Packets()
@@ -193,7 +195,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
for {
select {
case <-ctx.Done():
- handle.Close()
+ pcapHandle.Close()
pi.releaseAssembler(assembler)
pi.progressUpdate(session, fileName, false, "import process cancelled")
return
@@ -202,12 +204,15 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
select {
case packet := <-packets:
- if packet == nil { // completed
+
+ if packet == nil {
+ // we read all the packets
if flushAll {
connectionsClosed := assembler.FlushAll()
log.Debugf("connections closed after flush: %v", connectionsClosed)
}
- handle.Close()
+ pcapHandle.Close()
+ pi.tryDeleteOldPcaps()
pi.releaseAssembler(assembler)
pi.progressUpdate(session, fileName, true, "")
pi.notificationController.Notify("pcap.completed", session)
@@ -253,6 +258,17 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
}
}
+func (pi *PcapImporter) tryDeleteOldPcaps() {
+ sessions := pi.GetSessions()
+ size := len(sessions)
+
+ if size > MAX_PCAPS {
+ hash := sessions[0].ID
+ // delete the oldest session pcap file
+ deletePcapFile(hash)
+ }
+}
+
func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) {
if completed {
session.CompletedAt = time.Now()
@@ -304,13 +320,26 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) {
}
func deleteProcessingFile(fileName string) {
- if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil {
+ err := os.Remove(ProcessingPcapsBasePath + fileName)
+ if err != nil {
log.WithError(err).Error("failed to delete processing file")
}
}
-func moveProcessingFile(sessionID string, fileName string) {
- if err := os.Rename(ProcessingPcapsBasePath+fileName, PcapsBasePath+sessionID+path.Ext(fileName)); err != nil {
+func deletePcapFile(fileName string) {
+ err := os.Remove(PcapsBasePath + fileName)
+ if err != nil {
+ log.WithError(err).Error("failed to delete pcap file")
+ }
+}
+
+func moveProcessingFile(sessionID string, oldFileName string) {
+ oldExt := path.Ext(oldFileName)
+ oldpath := ProcessingPcapsBasePath + oldFileName
+ newpath := PcapsBasePath + sessionID + oldExt
+
+ err := os.Rename(oldpath, newpath)
+ if err != nil {
log.WithError(err).Error("failed to move processed file")
}
}