diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 63 |
1 files changed, 44 insertions, 19 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index 1739b3f..1c80b3f 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -1,3 +1,20 @@ +/* + * This file is part of caronte (https://github.com/eciavatta/caronte). + * Copyright (c) 2020 Emiliano Ciavatta. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + package main import ( @@ -12,6 +29,7 @@ import ( "os" "path" "path/filepath" + "sort" "sync" "time" ) @@ -19,17 +37,17 @@ import ( const PcapsBasePath = "pcaps/" const ProcessingPcapsBasePath = PcapsBasePath + "processing/" const initialAssemblerPoolSize = 16 -const flushOlderThan = 5 * time.Minute const importUpdateProgressInterval = 100 * time.Millisecond type PcapImporter struct { - storage Storage - streamPool *tcpassembly.StreamPool - assemblers []*tcpassembly.Assembler - sessions map[string]ImportingSession - mAssemblers sync.Mutex - mSessions sync.Mutex - serverNet net.IPNet + storage Storage + streamPool *tcpassembly.StreamPool + assemblers []*tcpassembly.Assembler + sessions map[string]ImportingSession + mAssemblers sync.Mutex + mSessions sync.Mutex + serverNet net.IPNet + notificationController *NotificationController } type ImportingSession struct { @@ -47,7 +65,8 @@ type ImportingSession struct { type flowCount [2]int -func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager) *PcapImporter { +func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesManager, + notificationController *NotificationController) *PcapImporter { streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverNet, rulesManager)) var result []ImportingSession @@ -60,13 +79,14 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan } return &PcapImporter{ - storage: storage, - streamPool: streamPool, - assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), - sessions: sessions, - mAssemblers: sync.Mutex{}, - mSessions: sync.Mutex{}, - serverNet: serverNet, + storage: storage, + streamPool: streamPool, + assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), + sessions: sessions, + mAssemblers: sync.Mutex{}, + mSessions: sync.Mutex{}, + serverNet: serverNet, + notificationController: notificationController, } } @@ -120,6 +140,9 @@ func (pi *PcapImporter) GetSessions() []ImportingSession { for _, session := range pi.sessions { sessions = append(sessions, session) } + sort.Slice(sessions, func(i, j int) bool { + return sessions[i].StartedAt.Before(sessions[j].StartedAt) + }) pi.mSessions.Unlock() return sessions } @@ -186,6 +209,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu handle.Close() pi.releaseAssembler(assembler) pi.progressUpdate(session, fileName, true, "") + pi.notificationController.Notify("pcap.completed", session) + return } @@ -201,8 +226,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flu var servicePort uint16 var index int - isDstServer := pi.serverNet.Contains(packet.NetworkLayer().NetworkFlow().Dst().Raw()) - isSrcServer := pi.serverNet.Contains(packet.NetworkLayer().NetworkFlow().Src().Raw()) + isDstServer := pi.serverNet.Contains(packet.NetworkLayer().NetworkFlow().Dst().Raw()) + isSrcServer := pi.serverNet.Contains(packet.NetworkLayer().NetworkFlow().Src().Raw()) if isDstServer && !isSrcServer { servicePort = uint16(tcp.DstPort) index = 0 @@ -284,7 +309,7 @@ func deleteProcessingFile(fileName string) { } func moveProcessingFile(sessionID string, fileName string) { - if err := os.Rename(ProcessingPcapsBasePath + fileName, PcapsBasePath + sessionID + path.Ext(fileName)); err != nil { + if err := os.Rename(ProcessingPcapsBasePath+fileName, PcapsBasePath+sessionID+path.Ext(fileName)); err != nil { log.WithError(err).Error("failed to move processed file") } } |