From b33f14c35bb3a6d08fe095d3d20e1d40f7398500 Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Sun, 26 Apr 2020 17:41:01 +0200 Subject: Change pcap_importer api --- pcap_importer.go | 67 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 13 deletions(-) (limited to 'pcap_importer.go') diff --git a/pcap_importer.go b/pcap_importer.go index 2bdbd1a..129e60b 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -9,10 +9,15 @@ import ( "github.com/google/gopacket/tcpassembly" log "github.com/sirupsen/logrus" "net" + "os" + "path" + "path/filepath" "sync" "time" ) +const PcapsBasePath = "pcaps/" +const ProcessingPcapsBasePath = PcapsBasePath + "processing/" const initialAssemblerPoolSize = 16 const flushOlderThan = 5 * time.Minute const importUpdateProgressInterval = 100 * time.Millisecond @@ -46,11 +51,20 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager serverEndpoint := layers.NewIPEndpoint(serverIP) streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager)) + var result []ImportingSession + if err := storage.Find(ImportingSessions).All(&result); err != nil { + log.WithError(err).Panic("failed to retrieve importing sessions") + } + sessions := make(map[string]ImportingSession) + for _, session := range result { + sessions[session.ID] = session + } + return &PcapImporter{ storage: storage, streamPool: streamPool, assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), - sessions: make(map[string]ImportingSession), + sessions: sessions, mAssemblers: sync.Mutex{}, mSessions: sync.Mutex{}, serverIP: serverEndpoint, @@ -62,15 +76,24 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager // create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256 // of the pcap). func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { - hash, err := Sha256Sum(fileName) + switch filepath.Ext(fileName) { + case ".pcap": + case ".pcapng": + default: + deleteProcessingFile(fileName) + return "", errors.New("invalid file extension") + } + + hash, err := Sha256Sum(ProcessingPcapsBasePath + fileName) if err != nil { - return "", err + log.WithError(err).Panic("failed to calculate pcap sha256") + deleteProcessingFile(fileName) } pi.mSessions.Lock() - _, isPresent := pi.sessions[hash] - if isPresent { + if _, isPresent := pi.sessions[hash]; isPresent { pi.mSessions.Unlock() + deleteProcessingFile(fileName) return hash, errors.New("pcap already processed") } @@ -78,7 +101,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { session := ImportingSession{ ID: hash, StartedAt: time.Now(), - Size: FileSize(fileName), + Size: FileSize(ProcessingPcapsBasePath + fileName), PacketsPerService: make(map[uint16]flowCount), cancelFunc: cancelFunc, completed: make(chan string), @@ -121,9 +144,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool { // Read the pcap and save the tcp stream flow to the database func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { - handle, err := pcap.OpenOffline(fileName) + handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { - pi.progressUpdate(session, false, "failed to process pcap") + pi.progressUpdate(session, fileName, false, "failed to process pcap") log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). Error("failed to open pcap") return @@ -141,7 +164,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx case <-ctx.Done(): handle.Close() pi.releaseAssembler(assembler) - pi.progressUpdate(session, false, "import process cancelled") + pi.progressUpdate(session, fileName, false, "import process cancelled") return default: } @@ -150,11 +173,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx case packet := <-packets: if packet == nil { // completed if !firstPacketTime.IsZero() { - assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) + // TODO: + //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) } handle.Close() pi.releaseAssembler(assembler) - pi.progressUpdate(session, true, "") + pi.progressUpdate(session, fileName, true, "") return } @@ -195,12 +219,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) case <-updateProgressInterval: - pi.progressUpdate(session, false, "") + pi.progressUpdate(session, fileName, false, "") } } } -func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) { +func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) { if completed { session.CompletedAt = time.Now() } @@ -220,6 +244,11 @@ func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil { log.WithError(_err).WithField("session", session).Error("failed to insert importing stats") } + if completed { + moveProcessingFile(session.ID, fileName) + } else { + deleteProcessingFile(fileName) + } close(session.completed) } } @@ -244,3 +273,15 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) { pi.assemblers = append(pi.assemblers, assembler) pi.mAssemblers.Unlock() } + +func deleteProcessingFile(fileName string) { + if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil { + log.WithError(err).Error("failed to delete processing file") + } +} + +func moveProcessingFile(sessionID string, fileName string) { + if err := os.Rename(ProcessingPcapsBasePath + fileName, PcapsBasePath + sessionID + path.Ext(fileName)); err != nil { + log.WithError(err).Error("failed to move processed file") + } +} -- cgit v1.2.3-70-g09d2