aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go39
1 files changed, 24 insertions, 15 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index 41ed082..1c80b3f 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -29,6 +29,7 @@ import (
"os"
"path"
"path/filepath"
+ "sort"
"sync"
"time"
)
@@ -39,13 +40,14 @@ const initialAssemblerPoolSize = 16
const importUpdateProgressInterval = 100 * time.Millisecond
type PcapImporter struct {
- storage Storage
- streamPool *tcpassembly.StreamPool
- assemblers []*tcpassembly.Assembler
- sessions map[string]ImportingSession
- mAssemblers sync.Mutex
- mSessions sync.Mutex
- serverNet net.IPNet
+ storage Storage
+ streamPool *tcpassembly.StreamPool
+ assemblers []*tcpassembly.Assembler
+ sessions map[string]ImportingSession
+ mAssemblers sync.Mutex
+ mSessions sync.Mutex
+ serverNet net.IPNet
+ notificationController *NotificationController
}
type ImportingSession struct {
@@ -63,7 +65,8 @@ type ImportingSession struct {
type flowCount [2]int
-func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager) *PcapImporter {
+func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager,
+ notificationController *NotificationController) *PcapImporter {
streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverNet, rulesManager))
var result []ImportingSession
@@ -76,13 +79,14 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan
}
return &PcapImporter{
- storage: storage,
- streamPool: streamPool,
- assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
- sessions: sessions,
- mAssemblers: sync.Mutex{},
- mSessions: sync.Mutex{},
- serverNet: serverNet,
+ storage: storage,
+ streamPool: streamPool,
+ assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
+ sessions: sessions,
+ mAssemblers: sync.Mutex{},
+ mSessions: sync.Mutex{},
+ serverNet: serverNet,
+ notificationController: notificationController,
}
}
@@ -136,6 +140,9 @@ func (pi *PcapImporter) GetSessions() []ImportingSession {
for _, session := range pi.sessions {
sessions = append(sessions, session)
}
+ sort.Slice(sessions, func(i, j int) bool {
+ return sessions[i].StartedAt.Before(sessions[j].StartedAt)
+ })
pi.mSessions.Unlock()
return sessions
}
@@ -202,6 +209,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu
handle.Close()
pi.releaseAssembler(assembler)
pi.progressUpdate(session, fileName, true, "")
+ pi.notificationController.Notify("pcap.completed", session)
+
return
}