aboutsummaryrefslogtreecommitdiff
path: root/pcap_importer.go
diff options
context:
space:
mode:
Diffstat (limited to 'pcap_importer.go')
-rw-r--r--pcap_importer.go22
1 files changed, 10 insertions, 12 deletions
diff --git a/pcap_importer.go b/pcap_importer.go
index c6260de..ac7c0b5 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -22,7 +22,6 @@ const importUpdateProgressInterval = 3 * time.Second
const initialPacketPerServicesMapSize = 16
const importedPcapsCollectionName = "imported_pcaps"
-
type PcapImporter struct {
storage Storage
streamPool *tcpassembly.StreamPool
@@ -35,11 +34,10 @@ type PcapImporter struct {
type flowCount [2]int
-
func NewPcapImporter(storage Storage, serverIp net.IP) *PcapImporter {
serverEndpoint := layers.NewIPEndpoint(serverIp)
streamFactory := &BiDirectionalStreamFactory{
- storage: storage,
+ storage: storage,
serverIp: serverEndpoint,
}
streamPool := tcpassembly.NewStreamPool(streamFactory)
@@ -82,7 +80,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
{"importing_error", err},
}
ctx, canc := context.WithCancel(context.Background())
- _, err = pi.storage.InsertOne(ctx, importedPcapsCollectionName, doc)
+ _, err = pi.storage.Insert(importedPcapsCollectionName).Context(ctx).One(doc)
if err != nil {
pi.mSessions.Unlock()
_, alreadyProcessed := err.(mongo.WriteException)
@@ -133,18 +131,18 @@ func (pi *PcapImporter) parsePcap(sessionId, fileName string, ctx context.Contex
progressUpdate := func(completed bool, err error) {
update := UnorderedDocument{
- "processed_packets": processedPackets,
- "invalid_packets": invalidPackets,
+ "processed_packets": processedPackets,
+ "invalid_packets": invalidPackets,
"packets_per_services": packetsPerService,
- "importing_error": err,
+ "importing_error": err,
}
if completed {
update["completed_at"] = time.Now()
}
- _, _err := pi.storage.UpdateOne(nil, importedPcapsCollectionName, OrderedDocument{{"_id", sessionId}},
- completed, false)
-
+ _, _err := pi.storage.Update(importedPcapsCollectionName).
+ Filter(OrderedDocument{{"_id", sessionId}}).
+ One(nil)
if _err != nil {
log.Println("can't update importing statistics : ", _err)
}
@@ -158,10 +156,10 @@ func (pi *PcapImporter) parsePcap(sessionId, fileName string, ctx context.Contex
for {
select {
- case <- ctx.Done():
+ case <-ctx.Done():
handle.Close()
deleteSession()
- progressUpdate(false, errors.New("import process cancelled"))
+ progressUpdate(false, errors.New("import process cancelled"))
return
default:
}