From c118d899081bc62e28d47a5a0479fb16a24878ec Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Fri, 17 Apr 2020 15:01:20 +0200 Subject: Add import pcaps test --- caronte_test.go | 4 +- go.mod | 3 +- go.sum | 7 ++ pcap_importer.go | 192 +++++++++++++++++++++++------------------------ pcap_importer_test.go | 75 ++++++++++++++++++ scripts/generate_ping.py | 49 ++++++++++++ storage.go | 13 ++-- utils.go | 11 ++- 8 files changed, 242 insertions(+), 112 deletions(-) create mode 100644 pcap_importer_test.go create mode 100755 scripts/generate_ping.py diff --git a/caronte_test.go b/caronte_test.go index 2766640..b248ce3 100644 --- a/caronte_test.go +++ b/caronte_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "crypto/sha256" "fmt" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -30,8 +29,7 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper { port, err := strconv.Atoi(mongoPort) require.NoError(t, err, "invalid port") - uniqueDatabaseName := sha256.Sum256([]byte(time.Now().String())) - dbName := fmt.Sprintf("%x", uniqueDatabaseName[:31]) + dbName := fmt.Sprintf("%x", time.Now().UnixNano()) log.WithField("dbName", dbName).Info("creating new storage") storage := NewMongoStorage(mongoHost, port, dbName) diff --git a/go.mod b/go.mod index ce12c15..15bb85c 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 go.mongodb.org/mongo-driver v1.3.1 - golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 + go.uber.org/atomic v1.6.0 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sys v0.0.0-20200406155108-e3b113bbe6a4 // indirect ) diff --git a/go.sum b/go.sum index d2863ce..8df4fb7 100644 --- a/go.sum +++ b/go.sum @@ -130,14 +130,18 @@ github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwX github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= go.mongodb.org/mongo-driver v1.3.1 h1:op56IfTQiaY2679w922KVWa3qcHdml2K/Io8ayAOUEQ= go.mongodb.org/mongo-driver v1.3.1/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= @@ -160,10 +164,13 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190416151739-9c9e1878f421/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190420181800-aa740d480789/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/pcap_importer.go b/pcap_importer.go index 00c84bd..628b25d 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -7,46 +7,47 @@ import ( "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/google/gopacket/tcpassembly" - "go.mongodb.org/mongo-driver/mongo" - "log" + log "github.com/sirupsen/logrus" "net" - "strconv" "sync" "time" ) const initialAssemblerPoolSize = 16 const flushOlderThan = 5 * time.Minute -const invalidSessionID = "invalid_id" -const importUpdateProgressInterval = 3 * time.Second -const initialPacketPerServicesMapSize = 16 -const importedPcapsCollectionName = "imported_pcaps" +const importUpdateProgressInterval = 100 * time.Millisecond type PcapImporter struct { storage Storage streamPool *tcpassembly.StreamPool assemblers []*tcpassembly.Assembler - sessions map[string]context.CancelFunc + sessions map[string]ImportingSession mAssemblers sync.Mutex mSessions sync.Mutex serverIP gopacket.Endpoint } +type ImportingSession struct { + ID string `json:"id" bson:"_id"` + CompletedAt time.Time `json:"completed_at" bson:"completed_at,omitempty"` + ProcessedPackets int `json:"processed_packets" bson:"processed_packets"` + InvalidPackets int `json:"invalid_packets" bson:"invalid_packets"` + PacketsPerService map[uint16]flowCount `json:"packets_per_service" bson:"packets_per_service"` + ImportingError error `json:"importing_error" bson:"importing_error,omitempty"` + cancelFunc context.CancelFunc +} + type flowCount [2]int -func NewPcapImporter(storage Storage, serverIP net.IP) *PcapImporter { +func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager) *PcapImporter { serverEndpoint := layers.NewIPEndpoint(serverIP) - streamFactory := &BiDirectionalStreamFactory{ - storage: storage, - serverIP: serverEndpoint, - } - streamPool := tcpassembly.NewStreamPool(streamFactory) + streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager)) return &PcapImporter{ storage: storage, streamPool: streamPool, assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), - sessions: make(map[string]context.CancelFunc), + sessions: make(map[string]ImportingSession), mAssemblers: sync.Mutex{}, mSessions: sync.Mutex{}, serverIP: serverEndpoint, @@ -60,61 +61,83 @@ func NewPcapImporter(storage Storage, serverIP net.IP) *PcapImporter { func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { hash, err := Sha256Sum(fileName) if err != nil { - return invalidSessionID, err + return "", err } pi.mSessions.Lock() - _, ok := pi.sessions[hash] - if ok { + _, isPresent := pi.sessions[hash] + if isPresent { pi.mSessions.Unlock() - return hash, errors.New("another equal session in progress") + return hash, errors.New("pcap already processed") } - doc := OrderedDocument{ - {"_id", hash}, - {"started_at", time.Now()}, - {"completed_at", nil}, - {"processed_packets", 0}, - {"invalid_packets", 0}, - {"packets_per_services", nil}, - {"importing_error", err}, + ctx, cancelFunc := context.WithCancel(context.Background()) + session := ImportingSession{ + ID: hash, + PacketsPerService: make(map[uint16]flowCount), + cancelFunc: cancelFunc, } - ctx, canc := context.WithCancel(context.Background()) - _, err = pi.storage.Insert(importedPcapsCollectionName).Context(ctx).One(doc) - if err != nil { + + if result, err := pi.storage.Insert(ImportingSessions).Context(ctx).One(session); err != nil { pi.mSessions.Unlock() - _, alreadyProcessed := err.(mongo.WriteException) - if alreadyProcessed { - return hash, errors.New("pcap already processed") - } - return hash, err + log.WithError(err).WithField("session", session).Panic("failed to insert a session into database") + } else if result == nil { + pi.mSessions.Unlock() + return hash, errors.New("pcap already processed") } - pi.sessions[hash] = canc + pi.sessions[hash] = session pi.mSessions.Unlock() - go pi.parsePcap(hash, fileName, ctx) + go pi.parsePcap(session, fileName, ctx) return hash, nil } -func (pi *PcapImporter) CancelImport(sessionID string) error { +func (pi *PcapImporter) GetSession(sessionID string) (ImportingSession, bool) { pi.mSessions.Lock() defer pi.mSessions.Unlock() - cancel, ok := pi.sessions[sessionID] - if ok { - delete(pi.sessions, sessionID) - cancel() - return nil - } else { + session, isPresent := pi.sessions[sessionID] + return session, isPresent +} + +func (pi *PcapImporter) CancelSession(sessionID string) error { + pi.mSessions.Lock() + defer pi.mSessions.Unlock() + if session, isPresent := pi.sessions[sessionID]; !isPresent { return errors.New("session " + sessionID + " not found") + } else { + session.cancelFunc() + return nil } } // Read the pcap and save the tcp stream flow to the database -func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Context) { +func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { + progressUpdate := func(completed bool, err error) { + if completed { + session.CompletedAt = time.Now() + } + session.ImportingError = err + + dupSession := session + dupSession.PacketsPerService = make(map[uint16]flowCount, len(session.PacketsPerService)) + for key, value := range session.PacketsPerService { + dupSession.PacketsPerService[key] = value + } + + pi.mSessions.Lock() + pi.sessions[session.ID] = dupSession + pi.mSessions.Unlock() + + if _, err = pi.storage.Update(ImportingSessions). + Filter(OrderedDocument{{"_id", session.ID}}).One(session); err != nil { + log.WithError(err).WithField("session", session).Error("failed to update importing stats") + } + } + handle, err := pcap.OpenOffline(fileName) if err != nil { - // TODO: update db and set error + progressUpdate(false, errors.New("failed to process pcap")) return } @@ -125,40 +148,15 @@ func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Contex firstPacketTime := time.Time{} updateProgressInterval := time.Tick(importUpdateProgressInterval) - processedPackets := 0 - invalidPackets := 0 - packetsPerService := make(map[int]*flowCount, initialPacketPerServicesMapSize) - - progressUpdate := func(completed bool, err error) { - update := UnorderedDocument{ - "processed_packets": processedPackets, - "invalid_packets": invalidPackets, - "packets_per_services": packetsPerService, - "importing_error": err, - } - if completed { - update["completed_at"] = time.Now() - } - - _, _err := pi.storage.Update(importedPcapsCollectionName). - Filter(OrderedDocument{{"_id", sessionID}}). - One(nil) - if _err != nil { - log.Println("can't update importing statistics : ", _err) - } - } - - deleteSession := func() { - pi.mSessions.Lock() - delete(pi.sessions, sessionID) - pi.mSessions.Unlock() + terminate := func() { + handle.Close() + pi.releaseAssembler(assembler) } for { select { case <-ctx.Done(): - handle.Close() - deleteSession() + terminate() progressUpdate(false, errors.New("import process cancelled")) return default: @@ -170,42 +168,44 @@ func (pi *PcapImporter) parsePcap(sessionID, fileName string, ctx context.Contex if !firstPacketTime.IsZero() { assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) } - pi.releaseAssembler(assembler) - handle.Close() - - deleteSession() + terminate() progressUpdate(true, nil) - return } - processedPackets++ - - if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || - packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet - invalidPackets++ - continue - } timestamp := packet.Metadata().Timestamp if firstPacketTime.IsZero() { firstPacketTime = timestamp } + if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || + packet.TransportLayer().LayerType() != layers.LayerTypeTCP { // invalid packet + session.InvalidPackets++ + continue + } + session.ProcessedPackets++ + tcp := packet.TransportLayer().(*layers.TCP) - var servicePort, index int - if packet.NetworkLayer().NetworkFlow().Dst() == pi.serverIP { - servicePort, _ = strconv.Atoi(tcp.DstPort.String()) + var servicePort uint16 + var index int + isDstServer := packet.NetworkLayer().NetworkFlow().Dst() == pi.serverIP + isSrcServer := packet.NetworkLayer().NetworkFlow().Src() == pi.serverIP + if isDstServer && !isSrcServer { + servicePort = uint16(tcp.DstPort) index = 0 - } else { - servicePort, _ = strconv.Atoi(tcp.SrcPort.String()) + } else if isSrcServer && !isDstServer { + servicePort = uint16(tcp.SrcPort) index = 1 + } else { + session.InvalidPackets++ + continue } - fCount, ok := packetsPerService[servicePort] - if !ok { - fCount = &flowCount{0, 0} - packetsPerService[servicePort] = fCount + fCount, isPresent := session.PacketsPerService[servicePort] + if !isPresent { + fCount = flowCount{0, 0} } fCount[index]++ + session.PacketsPerService[servicePort] = fCount assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) case <-updateProgressInterval: diff --git a/pcap_importer_test.go b/pcap_importer_test.go new file mode 100644 index 0000000..234988e --- /dev/null +++ b/pcap_importer_test.go @@ -0,0 +1,75 @@ +package main + +import ( + "bufio" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/tcpassembly" + "github.com/google/gopacket/tcpassembly/tcpreader" + // log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "net" + "sync" + "testing" + "time" +) + +func TestImportPcap(t *testing.T) { + wrapper := NewTestStorageWrapper(t) + wrapper.AddCollection(ImportingSessions) + + serverEndpoint := layers.NewIPEndpoint(net.ParseIP("172.17.0.3")) + streamPool := tcpassembly.NewStreamPool(&testStreamFactory{}) + + pcapImporter := PcapImporter{ + storage: wrapper.Storage, + streamPool: streamPool, + assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), + sessions: make(map[string]ImportingSession), + mAssemblers: sync.Mutex{}, + mSessions: sync.Mutex{}, + serverIP: serverEndpoint, + } + + sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.NoError(t, err) + assert.NotZero(t, sessionID) + + duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + require.Error(t, err) + assert.Equal(t, sessionID, duplicateSessionID) + + _, isPresent := pcapImporter.GetSession("invalid") + assert.False(t, isPresent) + + var session ImportingSession + session, isPresent = pcapImporter.GetSession(sessionID) + require.True(t, isPresent) + for session.CompletedAt.IsZero() && session.ImportingError == nil { + time.Sleep(importUpdateProgressInterval) + session, isPresent = pcapImporter.GetSession(sessionID) + } + assert.Equal(t, sessionID, session.ID) + assert.Equal(t, 15008, session.ProcessedPackets) + assert.Equal(t, 0, session.InvalidPackets) + assert.Equal(t, map[uint16]flowCount{9999: {10004, 5004}}, session.PacketsPerService) + assert.NoError(t, session.ImportingError) + + wrapper.Destroy(t) +} + +type testStreamFactory struct{ + counter atomic.Int32 +} + +func (sf *testStreamFactory) New(_, _ gopacket.Flow) tcpassembly.Stream { + sf.counter.Inc() + reader := tcpreader.NewReaderStream() + go func() { + buffer := bufio.NewReader(&reader) + tcpreader.DiscardBytesToEOF(buffer) + }() + return &reader +} diff --git a/scripts/generate_ping.py b/scripts/generate_ping.py new file mode 100755 index 0000000..153a3cb --- /dev/null +++ b/scripts/generate_ping.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 + +import struct +import socket +import socketserver +import sys +import time + +class PongHandler(socketserver.BaseRequestHandler): + + def handle(self): + def recv_data(): + return self.request.recv(1024).strip() + + data = recv_data() + while len(data) > 0: + counter = str(int(data) + 1).rjust(4, "0") + print(f"PONG! {counter}", end='\r') + self.request.sendall(counter.encode()) + data = recv_data() + + +if __name__ == "__main__": + if not ((sys.argv[1] == "server" and len(sys.argv) == 2) or + (sys.argv[1] == "client" and len(sys.argv) == 3)): + print(f"Usage: {sys.argv[0]} server") + print(f" {sys.argv[0]} client ") + exit(1) + + port = 9999 + n = 10000 + + if sys.argv[1] == "server": + # docker run -it --rm -p 9999:9999 -v "$PWD":/ping -w /ping python:3 python generate_ping.py server + with socketserver.TCPServer(("0.0.0.0", port), PongHandler) as server: + server.serve_forever() + else: + # docker run -it --rm -v "$PWD":/ping -w /ping python:3 python generate_ping.py client + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((sys.argv[2], port)) + + counter = 0 + while counter < n: + counter = str(counter).rjust(4, "0") + print(f"PING! {counter}", end='\r') + time.sleep(0.05) + sock.sendall(counter.encode()) + counter = int(sock.recv(1024).strip()) + 1 + diff --git a/storage.go b/storage.go index 5ee9f3e..5c77c6c 100644 --- a/storage.go +++ b/storage.go @@ -14,7 +14,7 @@ import ( // Collections names const Connections = "connections" const ConnectionStreams = "connection_streams" -const ImportedPcaps = "imported_pcaps" +const ImportingSessions = "importing_sessions" const Rules = "rules" var ZeroRowID [12]byte @@ -44,15 +44,16 @@ func NewMongoStorage(uri string, port int, database string) *MongoStorage { } db := client.Database(database) - colls := map[string]*mongo.Collection{ - Connections: db.Collection(Connections), - ImportedPcaps: db.Collection(ImportedPcaps), - Rules: db.Collection(Rules), + collections := map[string]*mongo.Collection{ + Connections: db.Collection(Connections), + ConnectionStreams: db.Collection(ConnectionStreams), + ImportingSessions: db.Collection(ImportingSessions), + Rules: db.Collection(Rules), } return &MongoStorage{ client: client, - collections: colls, + collections: collections, } } diff --git a/utils.go b/utils.go index cb60ea6..b9cdd8c 100644 --- a/utils.go +++ b/utils.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "fmt" log "github.com/sirupsen/logrus" "go.mongodb.org/mongo-driver/bson/primitive" "io" @@ -11,26 +12,24 @@ import ( "time" ) -const invalidHashString = "invalid" - func Sha256Sum(fileName string) (string, error) { f, err := os.Open(fileName) if err != nil { - return invalidHashString, err + return "", err } defer func() { err = f.Close() if err != nil { - log.Println("Cannot close file " + fileName) + log.WithError(err).WithField("filename", fileName).Error("failed to close file") } }() h := sha256.New() if _, err := io.Copy(h, f); err != nil { - return invalidHashString, err + return "", err } - return string(h.Sum(nil)), nil + return fmt.Sprintf("%x", h.Sum(nil)), nil } func CustomRowID(payload uint64, timestamp time.Time) RowID { -- cgit v1.2.3-70-g09d2