diff options
Diffstat (limited to 'pcap_importer.go')
-rw-r--r-- | pcap_importer.go | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/pcap_importer.go b/pcap_importer.go index c6260de..ac7c0b5 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -22,7 +22,6 @@ const importUpdateProgressInterval = 3 * time.Second const initialPacketPerServicesMapSize = 16 const importedPcapsCollectionName = "imported_pcaps" - type PcapImporter struct { storage Storage streamPool *tcpassembly.StreamPool @@ -35,11 +34,10 @@ type PcapImporter struct { type flowCount [2]int - func NewPcapImporter(storage Storage, serverIp net.IP) *PcapImporter { serverEndpoint := layers.NewIPEndpoint(serverIp) streamFactory := &BiDirectionalStreamFactory{ - storage: storage, + storage: storage, serverIp: serverEndpoint, } streamPool := tcpassembly.NewStreamPool(streamFactory) @@ -82,7 +80,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { {"importing_error", err}, } ctx, canc := context.WithCancel(context.Background()) - _, err = pi.storage.InsertOne(ctx, importedPcapsCollectionName, doc) + _, err = pi.storage.Insert(importedPcapsCollectionName).Context(ctx).One(doc) if err != nil { pi.mSessions.Unlock() _, alreadyProcessed := err.(mongo.WriteException) @@ -133,18 +131,18 @@ func (pi *PcapImporter) parsePcap(sessionId, fileName string, ctx context.Contex progressUpdate := func(completed bool, err error) { update := UnorderedDocument{ - "processed_packets": processedPackets, - "invalid_packets": invalidPackets, + "processed_packets": processedPackets, + "invalid_packets": invalidPackets, "packets_per_services": packetsPerService, - "importing_error": err, + "importing_error": err, } if completed { update["completed_at"] = time.Now() } - _, _err := pi.storage.UpdateOne(nil, importedPcapsCollectionName, OrderedDocument{{"_id", sessionId}}, - completed, false) - + _, _err := pi.storage.Update(importedPcapsCollectionName). + Filter(OrderedDocument{{"_id", sessionId}}). + One(nil) if _err != nil { log.Println("can't update importing statistics : ", _err) } @@ -158,10 +156,10 @@ func (pi *PcapImporter) parsePcap(sessionId, fileName string, ctx context.Contex for { select { - case <- ctx.Done(): + case <-ctx.Done(): handle.Close() deleteSession() - progressUpdate(false, errors.New("import process cancelled")) + progressUpdate(false, errors.New("import process cancelled")) return default: } |