aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go67
1 files changed, 54 insertions, 13 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index 2bdbd1a..129e60b 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -9,10 +9,15 @@ import (
"github.com/google/gopacket/tcpassembly"
log "github.com/sirupsen/logrus"
"net"
+ "os"
+ "path"
+ "path/filepath"
"sync"
"time"
)
+const PcapsBasePath = "pcaps/"
+const ProcessingPcapsBasePath = PcapsBasePath + "processing/"
const initialAssemblerPoolSize = 16
const flushOlderThan = 5 * time.Minute
const importUpdateProgressInterval = 100 * time.Millisecond
@@ -46,11 +51,20 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager
serverEndpoint := layers.NewIPEndpoint(serverIP)
streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager))
+ var result []ImportingSession
+ if err := storage.Find(ImportingSessions).All(&result); err != nil {
+ log.WithError(err).Panic("failed to retrieve importing sessions")
+ }
+ sessions := make(map[string]ImportingSession)
+ for _, session := range result {
+ sessions[session.ID] = session
+ }
+
return &PcapImporter{
storage: storage,
streamPool: streamPool,
assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
- sessions: make(map[string]ImportingSession),
+ sessions: sessions,
mAssemblers: sync.Mutex{},
mSessions: sync.Mutex{},
serverIP: serverEndpoint,
@@ -62,15 +76,24 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager
// create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256
// of the pcap).
func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
- hash, err := Sha256Sum(fileName)
+ switch filepath.Ext(fileName) {
+ case ".pcap":
+ case ".pcapng":
+ default:
+ deleteProcessingFile(fileName)
+ return "", errors.New("invalid file extension")
+ }
+
+ hash, err := Sha256Sum(ProcessingPcapsBasePath + fileName)
if err != nil {
- return "", err
+ log.WithError(err).Panic("failed to calculate pcap sha256")
+ deleteProcessingFile(fileName)
}
pi.mSessions.Lock()
- _, isPresent := pi.sessions[hash]
- if isPresent {
+ if _, isPresent := pi.sessions[hash]; isPresent {
pi.mSessions.Unlock()
+ deleteProcessingFile(fileName)
return hash, errors.New("pcap already processed")
}
@@ -78,7 +101,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
session := ImportingSession{
ID: hash,
StartedAt: time.Now(),
- Size: FileSize(fileName),
+ Size: FileSize(ProcessingPcapsBasePath + fileName),
PacketsPerService: make(map[uint16]flowCount),
cancelFunc: cancelFunc,
completed: make(chan string),
@@ -121,9 +144,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool {
// Read the pcap and save the tcp stream flow to the database
func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
- handle, err := pcap.OpenOffline(fileName)
+ handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
- pi.progressUpdate(session, false, "failed to process pcap")
+ pi.progressUpdate(session, fileName, false, "failed to process pcap")
log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
Error("failed to open pcap")
return
@@ -141,7 +164,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
case <-ctx.Done():
handle.Close()
pi.releaseAssembler(assembler)
- pi.progressUpdate(session, false, "import process cancelled")
+ pi.progressUpdate(session, fileName, false, "import process cancelled")
return
default:
}
@@ -150,11 +173,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
case packet := <-packets:
if packet == nil { // completed
if !firstPacketTime.IsZero() {
- assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
+ // TODO:
+ //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
}
handle.Close()
pi.releaseAssembler(assembler)
- pi.progressUpdate(session, true, "")
+ pi.progressUpdate(session, fileName, true, "")
return
}
@@ -195,12 +219,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
case <-updateProgressInterval:
- pi.progressUpdate(session, false, "")
+ pi.progressUpdate(session, fileName, false, "")
}
}
}
-func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) {
+func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) {
if completed {
session.CompletedAt = time.Now()
}
@@ -220,6 +244,11 @@ func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool,
if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil {
log.WithError(_err).WithField("session", session).Error("failed to insert importing stats")
}
+ if completed {
+ moveProcessingFile(session.ID, fileName)
+ } else {
+ deleteProcessingFile(fileName)
+ }
close(session.completed)
}
}
@@ -244,3 +273,15 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) {
pi.assemblers = append(pi.assemblers, assembler)
pi.mAssemblers.Unlock()
}
+
+func deleteProcessingFile(fileName string) {
+ if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil {
+ log.WithError(err).Error("failed to delete processing file")
+ }
+}
+
+func moveProcessingFile(sessionID string, fileName string) {
+ if err := os.Rename(ProcessingPcapsBasePath + fileName, PcapsBasePath + sessionID + path.Ext(fileName)); err != nil {
+ log.WithError(err).Error("failed to move processed file")
+ }
+}