diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index 41ed082..1c80b3f 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -29,6 +29,7 @@ import ( "os" "path" "path/filepath" + "sort" "sync" "time" ) @@ -39,13 +40,14 @@ const initialAssemblerPoolSize = 16 const importUpdateProgressInterval = 100 * time.Millisecond type PcapImporter struct { - storage Storage - streamPool *tcpassembly.StreamPool - assemblers []*tcpassembly.Assembler - sessions map[string]ImportingSession - mAssemblers sync.Mutex - mSessions sync.Mutex - serverNet net.IPNet + storage Storage + streamPool *tcpassembly.StreamPool + assemblers []*tcpassembly.Assembler + sessions map[string]ImportingSession + mAssemblers sync.Mutex + mSessions sync.Mutex + serverNet net.IPNet + notificationController *NotificationController } type ImportingSession struct { @@ -63,7 +65,8 @@ type ImportingSession struct { type flowCount [2]int -func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager) *PcapImporter { +func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager, + notificationController *NotificationController) *PcapImporter { streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverNet, rulesManager)) var result []ImportingSession @@ -76,13 +79,14 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan } return &PcapImporter{ - storage: storage, - streamPool: streamPool, - assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), - sessions: sessions, - mAssemblers: sync.Mutex{}, - mSessions: sync.Mutex{}, - serverNet: serverNet, + storage: storage, + streamPool: streamPool, + assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), + sessions: sessions, + mAssemblers: sync.Mutex{}, + mSessions: sync.Mutex{}, + serverNet: serverNet, + notificationController: notificationController, } } @@ -136,6 +140,9 @@ func (pi *PcapImporter) GetSessions() []ImportingSession { for _, session := range pi.sessions { sessions = append(sessions, session) } + sort.Slice(sessions, func(i, j int) bool { + return sessions[i].StartedAt.Before(sessions[j].StartedAt) + }) pi.mSessions.Unlock() return sessions } @@ -202,6 +209,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu handle.Close() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, true, "") + pi.notificationController.Notify("pcap.completed", session) + return } |