aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index cd6fdfa..1739b3f 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -74,7 +74,7 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan
// going to be imported or if it has been already imported in the past the function returns an error. Otherwise it
// create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256
// of the pcap).
-func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
+func (pi *PcapImporter) ImportPcap(fileName string, flushAll bool) (string, error) {
switch filepath.Ext(fileName) {
case ".pcap":
case ".pcapng":
@@ -109,7 +109,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
pi.sessions[hash] = session
pi.mSessions.Unlock()
- go pi.parsePcap(session, fileName, ctx)
+ go pi.parsePcap(session, fileName, flushAll, ctx)
return hash, nil
}
@@ -141,8 +141,18 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool {
return isPresent
}
+func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (flushed, closed int) {
+ assembler := pi.takeAssembler()
+ flushed, closed = assembler.FlushWithOptions(tcpassembly.FlushOptions{
+ T: olderThen,
+ CloseAll: closeAll,
+ })
+ pi.releaseAssembler(assembler)
+ return
+}
+
// Read the pcap and save the tcp stream flow to the database
-func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
+func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) {
handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
pi.progressUpdate(session, fileName, false, "failed to process pcap")
@@ -155,7 +165,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
packetSource.NoCopy = true
assembler := pi.takeAssembler()
packets := packetSource.Packets()
- firstPacketTime := time.Time{}
updateProgressInterval := time.Tick(importUpdateProgressInterval)
for {
@@ -171,9 +180,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
select {
case packet := <-packets:
if packet == nil { // completed
- if !firstPacketTime.IsZero() {
- // TODO:
- //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
+ if flushAll {
+ assembler.FlushAll()
}
handle.Close()
pi.releaseAssembler(assembler)
@@ -181,11 +189,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
return
}
- timestamp := packet.Metadata().Timestamp
- if firstPacketTime.IsZero() {
- firstPacketTime = timestamp
- }
-
session.ProcessedPackets++
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
@@ -217,7 +220,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
fCount[index]++
session.PacketsPerService[servicePort] = fCount
- assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
+ assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-updateProgressInterval:
pi.progressUpdate(session, fileName, false, "")
}