diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index cd6fdfa..1739b3f 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -74,7 +74,7 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan // going to be imported or if it has been already imported in the past the function returns an error. Otherwise it // create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256 // of the pcap). -func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { +func (pi *PcapImporter) ImportPcap(fileName string, flushAll bool) (string, error) { switch filepath.Ext(fileName) { case ".pcap": case ".pcapng": @@ -109,7 +109,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { pi.sessions[hash] = session pi.mSessions.Unlock() - go pi.parsePcap(session, fileName, ctx) + go pi.parsePcap(session, fileName, flushAll, ctx) return hash, nil } @@ -141,8 +141,18 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool { return isPresent } +func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (flushed, closed int) { + assembler := pi.takeAssembler() + flushed, closed = assembler.FlushWithOptions(tcpassembly.FlushOptions{ + T: olderThen, + CloseAll: closeAll, + }) + pi.releaseAssembler(assembler) + return +} + // Read the pcap and save the tcp stream flow to the database -func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { +func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) { handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { pi.progressUpdate(session, fileName, false, "failed to process pcap") @@ -155,7 +165,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx packetSource.NoCopy = true assembler := pi.takeAssembler() packets := packetSource.Packets() - firstPacketTime := time.Time{} updateProgressInterval := time.Tick(importUpdateProgressInterval) for { @@ -171,9 +180,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx select { case packet := <-packets: if packet == nil { // completed - if !firstPacketTime.IsZero() { - // TODO: - //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) + if flushAll { + assembler.FlushAll() } handle.Close() pi.releaseAssembler(assembler) @@ -181,11 +189,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx return } - timestamp := packet.Metadata().Timestamp - if firstPacketTime.IsZero() { - firstPacketTime = timestamp - } - session.ProcessedPackets++ if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || @@ -217,7 +220,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx fCount[index]++ session.PacketsPerService[servicePort] = fCount - assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) + assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp) case <-updateProgressInterval: pi.progressUpdate(session, fileName, false, "") } |