aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-17 13:01:20 +0000
committerEmiliano Ciavatta2020-04-17 13:01:20 +0000
commitc118d899081bc62e28d47a5a0479fb16a24878ec (patch)
tree6a24c1ac6d37cd0dc8957345db740e2dbfd7d0e7
parent819d8af4f57e78724a2de02707bd4883715763bc (diff)
Add import pcaps test
-rw-r--r--caronte_test.go4
-rw-r--r--go.mod3
-rw-r--r--go.sum7
-rw-r--r--pcap_importer.go192
-rw-r--r--pcap_importer_test.go75
-rwxr-xr-xscripts/generate_ping.py49
-rw-r--r--storage.go13
-rw-r--r--utils.go11
8 files changed, 242 insertions, 112 deletions
diff --git a/caronte_test.go b/caronte_test.go
index 2766640..b248ce3 100644
--- a/caronte_test.go
+++ b/caronte_test.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "crypto/sha256"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
@@ -30,8 +29,7 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper {
port, err := strconv.Atoi(mongoPort)
require.NoError(t, err, "invalid port")
- uniqueDatabaseName := sha256.Sum256([]byte(time.Now().String()))
- dbName := fmt.Sprintf("%x", uniqueDatabaseName[:31])
+ dbName := fmt.Sprintf("%x", time.Now().UnixNano())
log.WithField("dbName", dbName).Info("creating new storage")
storage := NewMongoStorage(mongoHost, port, dbName)
diff --git a/go.mod b/go.mod
index ce12c15..15bb85c 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
go.mongodb.org/mongo-driver v1.3.1
- golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
+ go.uber.org/atomic v1.6.0
+ golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/sys v0.0.0-20200406155108-e3b113bbe6a4 // indirect
)
diff --git a/go.sum b/go.sum
index d2863ce..8df4fb7 100644
--- a/go.sum
+++ b/go.sum
@@ -130,14 +130,18 @@ github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwX
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.mongodb.org/mongo-driver v1.3.1 h1:op56IfTQiaY2679w922KVWa3qcHdml2K/Io8ayAOUEQ=
go.mongodb.org/mongo-driver v1.3.1/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE=
+go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
+go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
@@ -160,10 +164,13 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190416151739-9c9e1878f421/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190420181800-aa740d480789/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
diff --git a/pcap_importer.go b/pcap_importer.go
index 00c84bd..628b25d 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -7,46 +7,47 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"
- "go.mongodb.org/mongo-driver/mongo"
- "log"
+ log "github.com/sirupsen/logrus"
"net"
- "strconv"
"sync"
"time"
)
const initialAssemblerPoolSize = 16
const flushOlderThan = 5 * time.Minute
-const invalidSessionID = "invalid_id"
-const importUpdateProgressInterval = 3 * time.Second
-const initialPacketPerServicesMapSize = 16
-const importedPcapsCollectionName = "imported_pcaps"
+const importUpdateProgressInterval = 100 * time.Millisecond
type PcapImporter struct {
storage Storage
streamPool *tcpassembly.StreamPool
assemblers []*tcpassembly.Assembler
- sessions map[string]context.CancelFunc
+ sessions map[string]ImportingSession
mAssemblers sync.Mutex
mSessions sync.Mutex
serverIP gopacket.Endpoint
}
+type ImportingSession struct {
+ ID string `json:"id" bson:"_id"`
+ CompletedAt time.Time `json:"completed_at" bson:"completed_at,omitempty"`
+ ProcessedPackets int `json:"processed_packets" bson:"processed_packets"`
+ InvalidPackets int `json:"invalid_packets" bson:"invalid_packets"`
+ PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"`
+ ImportingError error `json:"importing_error" bson:"importing_error,omitempty"`
+ cancelFunc context.CancelFunc
+}
+
type flowCount [2]int
-func NewPcapImporter(storage Storage, serverIP net.IP) *PcapImporter {
+func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager) *PcapImporter {
serverEndpoint := layers.NewIPEndpoint(serverIP)
- streamFactory := &BiDirectionalStreamFactory{
- storage: storage,
- serverIP: serverEndpoint,
- }
- streamPool := tcpassembly.NewStreamPool(streamFactory)
+ streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager))
return &PcapImporter{
storage: storage,
streamPool: streamPool,
assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
- sessions: make(map[string]context.CancelFunc),
+ sessions: make(map[string]ImportingSession),
mAssemblers: sync.Mutex{},
mSessions: sync.Mutex{},
serverIP: serverEndpoint,
@@ -60,61 +61,83 @@ func NewPcapImporter(storage Storage, serverIP net.IP) *PcapImporter {
func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
hash, err := Sha256Sum(fileName)
if err != nil {
- return invalidSessionID, err
+ return "", err
}
pi.mSessions.Lock()
- _, ok := pi.sessions[hash]
- if ok {
+ _, isPresent := pi.sessions[hash]
+ if isPresent {
pi.mSessions.Unlock()
- return hash, errors.New("another equal session in progress")
+ return hash, errors.New("pcap already processed")
}
- doc := OrderedDocument{
- {"_id", hash},
- {"started_at", time.Now()},
- {"completed_at", nil},
- {"processed_packets", 0},
- {"invalid_packets", 0},
- {"packets_per_services", nil},
- {"importing_error", err},
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ session := ImportingSession{
+ ID: hash,
+ PacketsPerService: make(map[uint16]flowCount),
+ cancelFunc: cancelFunc,
}
- ctx, canc := context.WithCancel(context.Background())
- _, err = pi.storage.Insert(importedPcapsCollectionName).Context(ctx).One(doc)
- if err != nil {
+
+ if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil {
pi.mSessions.Unlock()
- _, alreadyProcessed := err.(mongo.WriteException)
- if alreadyProcessed {
- return hash, errors.New("pcap already processed")
- }
- return hash, err
+ log.WithError(err).WithField("session", session).Panic("failed to insert a session into database")
+ } else if result == nil {
+ pi.mSessions.Unlock()
+ return hash, errors.New("pcap already processed")
}
- pi.sessions[hash] = canc
+ pi.sessions[hash] = session
pi.mSessions.Unlock()
- go pi.parsePcap(hash, fileName, ctx)
+ go pi.parsePcap(session, fileName, ctx)
return hash, nil
}
-func (pi *PcapImporter) CancelImport(sessionID string) error {
+func (pi *PcapImporter) GetSession(sessionID string) (ImportingSession, bool) {
pi.mSessions.Lock()
defer pi.mSessions.Unlock()
- cancel, ok := pi.sessions[sessionID]
- if ok {
- delete(pi.sessions, sessionID)
- cancel()
- return nil
- } else {
+ session, isPresent := pi.sessions[sessionID]
+ return session, isPresent
+}
+
+func (pi *PcapImporter) CancelSession(sessionID string) error {
+ pi.mSessions.Lock()
+ defer pi.mSessions.Unlock()
+ if session, isPresent := pi.sessions[sessionID]; !isPresent {
return errors.New("session " + sessionID + " not found")
+ } else {
+ session.cancelFunc()
+ return nil
}
}
// Read the pcap and save the tcp stream flow to the database
-func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Context) {
+func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
+ progressUpdate := func(completed bool, err error) {
+ if completed {
+ session.CompletedAt = time.Now()
+ }
+ session.ImportingError = err
+
+ dupSession := session
+ dupSession.PacketsPerService = make(map[uint16]flowCount, len(session.PacketsPerService))
+ for key, value := range session.PacketsPerService {
+ dupSession.PacketsPerService[key] = value
+ }
+
+ pi.mSessions.Lock()
+ pi.sessions[session.ID] = dupSession
+ pi.mSessions.Unlock()
+
+ if _, err = pi.storage.Update(ImportingSessions).
+ Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil {
+ log.WithError(err).WithField("session", session).Error("failed to update importing stats")
+ }
+ }
+
handle, err := pcap.OpenOffline(fileName)
if err != nil {
- // TODO: update db and set error
+ progressUpdate(false, errors.New("failed to process pcap"))
return
}
@@ -125,40 +148,15 @@ func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Contex
firstPacketTime := time.Time{}
updateProgressInterval := time.Tick(importUpdateProgressInterval)
- processedPackets := 0
- invalidPackets := 0
- packetsPerService := make(map[int]*flowCount, initialPacketPerServicesMapSize)
-
- progressUpdate := func(completed bool, err error) {
- update := UnorderedDocument{
- "processed_packets": processedPackets,
- "invalid_packets": invalidPackets,
- "packets_per_services": packetsPerService,
- "importing_error": err,
- }
- if completed {
- update["completed_at"] = time.Now()
- }
-
- _, _err := pi.storage.Update(importedPcapsCollectionName).
- Filter(OrderedDocument{{"_id", sessionID}}).
- One(nil)
- if _err != nil {
- log.Println("can't update importing statistics : ", _err)
- }
- }
-
- deleteSession := func() {
- pi.mSessions.Lock()
- delete(pi.sessions, sessionID)
- pi.mSessions.Unlock()
+ terminate := func() {
+ handle.Close()
+ pi.releaseAssembler(assembler)
}
for {
select {
case <-ctx.Done():
- handle.Close()
- deleteSession()
+ terminate()
progressUpdate(false, errors.New("import process cancelled"))
return
default:
@@ -170,42 +168,44 @@ func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Contex
if !firstPacketTime.IsZero() {
assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
}
- pi.releaseAssembler(assembler)
- handle.Close()
-
- deleteSession()
+ terminate()
progressUpdate(true, nil)
-
return
}
- processedPackets++
-
- if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
- packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet
- invalidPackets++
- continue
- }
timestamp := packet.Metadata().Timestamp
if firstPacketTime.IsZero() {
firstPacketTime = timestamp
}
+ if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
+ packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet
+ session.InvalidPackets++
+ continue
+ }
+ session.ProcessedPackets++
+
tcp := packet.TransportLayer().(*layers.TCP)
- var servicePort, index int
- if packet.NetworkLayer().NetworkFlow().Dst() == pi.serverIP {
- servicePort, _ = strconv.Atoi(tcp.DstPort.String())
+ var servicePort uint16
+ var index int
+ isDstServer := packet.NetworkLayer().NetworkFlow().Dst() == pi.serverIP
+ isSrcServer := packet.NetworkLayer().NetworkFlow().Src() == pi.serverIP
+ if isDstServer && !isSrcServer {
+ servicePort = uint16(tcp.DstPort)
index = 0
- } else {
- servicePort, _ = strconv.Atoi(tcp.SrcPort.String())
+ } else if isSrcServer && !isDstServer {
+ servicePort = uint16(tcp.SrcPort)
index = 1
+ } else {
+ session.InvalidPackets++
+ continue
}
- fCount, ok := packetsPerService[servicePort]
- if !ok {
- fCount = &flowCount{0, 0}
- packetsPerService[servicePort] = fCount
+ fCount, isPresent := session.PacketsPerService[servicePort]
+ if !isPresent {
+ fCount = flowCount{0, 0}
}
fCount[index]++
+ session.PacketsPerService[servicePort] = fCount
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
case <-updateProgressInterval:
diff --git a/pcap_importer_test.go b/pcap_importer_test.go
new file mode 100644
index 0000000..234988e
--- /dev/null
+++ b/pcap_importer_test.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "bufio"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/tcpassembly"
+ "github.com/google/gopacket/tcpassembly/tcpreader"
+ // log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/atomic"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestImportPcap(t *testing.T) {
+ wrapper := NewTestStorageWrapper(t)
+ wrapper.AddCollection(ImportingSessions)
+
+ serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3"))
+ streamPool := tcpassembly.NewStreamPool(&testStreamFactory{})
+
+ pcapImporter := PcapImporter{
+ storage: wrapper.Storage,
+ streamPool: streamPool,
+ assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
+ sessions: make(map[string]ImportingSession),
+ mAssemblers: sync.Mutex{},
+ mSessions: sync.Mutex{},
+ serverIP: serverEndpoint,
+ }
+
+ sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.NoError(t, err)
+ assert.NotZero(t, sessionID)
+
+ duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ require.Error(t, err)
+ assert.Equal(t, sessionID, duplicateSessionID)
+
+ _, isPresent := pcapImporter.GetSession("invalid")
+ assert.False(t, isPresent)
+
+ var session ImportingSession
+ session, isPresent = pcapImporter.GetSession(sessionID)
+ require.True(t, isPresent)
+ for session.CompletedAt.IsZero() && session.ImportingError == nil {
+ time.Sleep(importUpdateProgressInterval)
+ session, isPresent = pcapImporter.GetSession(sessionID)
+ }
+ assert.Equal(t, sessionID, session.ID)
+ assert.Equal(t, 15008, session.ProcessedPackets)
+ assert.Equal(t, 0, session.InvalidPackets)
+ assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService)
+ assert.NoError(t, session.ImportingError)
+
+ wrapper.Destroy(t)
+}
+
+type testStreamFactory struct{
+ counter atomic.Int32
+}
+
+func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream {
+ sf.counter.Inc()
+ reader := tcpreader.NewReaderStream()
+ go func() {
+ buffer := bufio.NewReader(&reader)
+ tcpreader.DiscardBytesToEOF(buffer)
+ }()
+ return &reader
+}
diff --git a/scripts/generate_ping.py b/scripts/generate_ping.py
new file mode 100755
index 0000000..153a3cb
--- /dev/null
+++ b/scripts/generate_ping.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+
+import struct
+import socket
+import socketserver
+import sys
+import time
+
+class PongHandler(socketserver.BaseRequestHandler):
+
+ def handle(self):
+ def recv_data():
+ return self.request.recv(1024).strip()
+
+ data = recv_data()
+ while len(data) > 0:
+ counter = str(int(data) + 1).rjust(4, "0")
+ print(f"PONG! {counter}", end='\r')
+ self.request.sendall(counter.encode())
+ data = recv_data()
+
+
+if __name__ == "__main__":
+ if not ((sys.argv[1] == "server" and len(sys.argv) == 2) or
+ (sys.argv[1] == "client" and len(sys.argv) == 3)):
+ print(f"Usage: {sys.argv[0]} server")
+ print(f" {sys.argv[0]} client <server_address>")
+ exit(1)
+
+ port = 9999
+ n = 10000
+
+ if sys.argv[1] == "server":
+ # docker run -it --rm -p 9999:9999 -v "$PWD":/ping -w /ping python:3 python generate_ping.py server
+ with socketserver.TCPServer(("0.0.0.0", port), PongHandler) as server:
+ server.serve_forever()
+ else:
+ # docker run -it --rm -v "$PWD":/ping -w /ping python:3 python generate_ping.py client <server_address>
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ sock.connect((sys.argv[2], port))
+
+ counter = 0
+ while counter < n:
+ counter = str(counter).rjust(4, "0")
+ print(f"PING! {counter}", end='\r')
+ time.sleep(0.05)
+ sock.sendall(counter.encode())
+ counter = int(sock.recv(1024).strip()) + 1
+
diff --git a/storage.go b/storage.go
index 5ee9f3e..5c77c6c 100644
--- a/storage.go
+++ b/storage.go
@@ -14,7 +14,7 @@ import (
// Collections names
const Connections = "connections"
const ConnectionStreams = "connection_streams"
-const ImportedPcaps = "imported_pcaps"
+const ImportingSessions = "importing_sessions"
const Rules = "rules"
var ZeroRowID [12]byte
@@ -44,15 +44,16 @@ func NewMongoStorage(uri string, port int, database string) *MongoStorage {
}
db := client.Database(database)
- colls := map[string]*mongo.Collection{
- Connections: db.Collection(Connections),
- ImportedPcaps: db.Collection(ImportedPcaps),
- Rules: db.Collection(Rules),
+ collections := map[string]*mongo.Collection{
+ Connections: db.Collection(Connections),
+ ConnectionStreams: db.Collection(ConnectionStreams),
+ ImportingSessions: db.Collection(ImportingSessions),
+ Rules: db.Collection(Rules),
}
return &MongoStorage{
client: client,
- collections: colls,
+ collections: collections,
}
}
diff --git a/utils.go b/utils.go
index cb60ea6..b9cdd8c 100644
--- a/utils.go
+++ b/utils.go
@@ -4,6 +4,7 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
+ "fmt"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson/primitive"
"io"
@@ -11,26 +12,24 @@ import (
"time"
)
-const invalidHashString = "invalid"
-
func Sha256Sum(fileName string) (string, error) {
f, err := os.Open(fileName)
if err != nil {
- return invalidHashString, err
+ return "", err
}
defer func() {
err = f.Close()
if err != nil {
- log.Println("Cannot close file " + fileName)
+ log.WithError(err).WithField("filename", fileName).Error("failed to close file")
}
}()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
- return invalidHashString, err
+ return "", err
}
- return string(h.Sum(nil)), nil
+ return fmt.Sprintf("%x", h.Sum(nil)), nil
}
func CustomRowID(payload uint64, timestamp time.Time) RowID {