diff options
author | Emiliano Ciavatta | 2020-04-26 15:41:01 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-26 15:41:01 +0000 |
commit | b33f14c35bb3a6d08fe095d3d20e1d40f7398500 (patch) | |
tree | fb6ee4cd9b667700f75203b9ea91c6f6bccac364 | |
parent | 8bd21ad9873690c52485e3581a8108c6f351e3a6 (diff) |
Change pcap_importer api
-rw-r--r-- | application_router.go | 62 | ||||
-rw-r--r-- | application_router_test.go | 8 | ||||
-rw-r--r-- | caronte_test.go | 6 | ||||
-rw-r--r-- | pcap_importer.go | 67 | ||||
-rw-r--r-- | pcap_importer_test.go | 30 | ||||
-rw-r--r-- | pcaps/.gitignore | 2 | ||||
-rw-r--r-- | pcaps/processing/.gitkeep | 0 | ||||
-rw-r--r-- | rules_manager.go | 5 | ||||
-rw-r--r-- | rules_manager_test.go | 1 | ||||
-rw-r--r-- | shared/.gitignore | 2 | ||||
-rw-r--r-- | storage.go | 2 | ||||
-rw-r--r-- | stream_handler_test.go | 2 | ||||
-rw-r--r-- | utils.go | 21 |
13 files changed, 170 insertions, 38 deletions
diff --git a/application_router.go b/application_router.go index 128b1ec..ea68e9d 100644 --- a/application_router.go +++ b/application_router.go @@ -2,14 +2,20 @@ package main import ( "errors" + "fmt" "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" "net/http" + "os" + "path/filepath" + "time" ) func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine { router := gin.New() router.Use(gin.Logger()) router.Use(gin.Recovery()) + router.MaxMultipartMemory = 8 << 30 // engine.Static("/", "./frontend/build") @@ -96,22 +102,49 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine } }) + api.POST("/pcap/upload", func(c *gin.Context) { + fileHeader, err := c.FormFile("file") + if err != nil { + badRequest(c, err) + return + } + fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), fileHeader.Filename) + if err := c.SaveUploadedFile(fileHeader, ProcessingPcapsBasePath + fileName); err != nil { + log.WithError(err).Panic("failed to save uploaded file") + } + + if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil { + unprocessableEntity(c, err) + } else { + c.JSON(http.StatusAccepted, gin.H{"session": sessionID}) + } + }) + api.POST("/pcap/file", func(c *gin.Context) { - var body struct { - Path string + var request struct { + File string `json:"file"` + DeleteOriginalFile bool `json:"delete_original_file"` } - if err := c.ShouldBindJSON(&body); err != nil { + if err := c.ShouldBindJSON(&request); err != nil { badRequest(c, err) return } - - if !FileExists(body.Path) { - unprocessableEntity(c, errors.New("invalid path")) + if !FileExists(request.File) { + badRequest(c, errors.New("file not exists")) return } - if sessionID, err := applicationContext.PcapImporter.ImportPcap(body.Path); err != nil { + fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), filepath.Base(request.File)) + if err := CopyFile(ProcessingPcapsBasePath + fileName, request.File); err != nil { + log.WithError(err).Panic("failed to copy pcap file") + } + if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil { + if request.DeleteOriginalFile { + if err := os.Remove(request.File); err != nil { + log.WithError(err).Panic("failed to remove processed file") + } + } unprocessableEntity(c, err) } else { c.JSON(http.StatusAccepted, gin.H{"session": sessionID}) @@ -131,6 +164,21 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine } }) + api.GET("/pcap/sessions/:id/download", func(c *gin.Context) { + sessionID := c.Param("id") + if _, isPresent := applicationContext.PcapImporter.GetSession(sessionID); isPresent { + if FileExists(PcapsBasePath + sessionID + ".pcap") { + c.FileAttachment(PcapsBasePath + sessionID + ".pcap", sessionID[:16] + ".pcap") + } else if FileExists(PcapsBasePath + sessionID + ".pcapng") { + c.FileAttachment(PcapsBasePath + sessionID + ".pcapng", sessionID[:16] + ".pcapng") + } else { + log.WithField("sessionID", sessionID).Panic("pcap file not exists") + } + } else { + notFound(c, gin.H{"session": sessionID}) + } + }) + api.DELETE("/pcap/sessions/:id", func(c *gin.Context) { sessionID := c.Param("id") session := gin.H{"session": sessionID} diff --git a/application_router_test.go b/application_router_test.go index c6f902c..c8e4474 100644 --- a/application_router_test.go +++ b/application_router_test.go @@ -102,15 +102,15 @@ func TestPcapImporterApi(t *testing.T) { // Import pcap assert.Equal(t, http.StatusBadRequest, toolkit.MakeRequest("POST", "/api/pcap/file", nil).Code) - assert.Equal(t, http.StatusUnprocessableEntity, toolkit.MakeRequest("POST", "/api/pcap/file", - gin.H{"path": "invalidPath"}).Code) - w := toolkit.MakeRequest("POST", "/api/pcap/file", gin.H{"path": "test_data/ping_pong_10000.pcap"}) + assert.Equal(t, http.StatusBadRequest, toolkit.MakeRequest("POST", "/api/pcap/file", + gin.H{"file": "invalidPath"}).Code) + w := toolkit.MakeRequest("POST", "/api/pcap/file", gin.H{"file": "test_data/ping_pong_10000.pcap"}) var sessionID struct{ Session string } assert.Equal(t, http.StatusAccepted, w.Code) assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &sessionID)) assert.Equal(t, "369ef4b6abb6214b4ee2e0c81ecb93c49e275c26c85e30493b37727d408cf280", sessionID.Session) assert.Equal(t, http.StatusUnprocessableEntity, toolkit.MakeRequest("POST", "/api/pcap/file", - gin.H{"path": "test_data/ping_pong_10000.pcap"}).Code) // duplicate + gin.H{"file": "test_data/ping_pong_10000.pcap"}).Code) // duplicate // Get sessions var sessions []ImportingSession diff --git a/caronte_test.go b/caronte_test.go index b248ce3..12ec50f 100644 --- a/caronte_test.go +++ b/caronte_test.go @@ -32,12 +32,10 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper { dbName := fmt.Sprintf("%x", time.Now().UnixNano()) log.WithField("dbName", dbName).Info("creating new storage") - storage := NewMongoStorage(mongoHost, port, dbName) + storage, err := NewMongoStorage(mongoHost, port, dbName) + require.NoError(t, err) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - err = storage.Connect(ctx) - require.NoError(t, err, "failed to connect to database") - return &TestStorageWrapper{ DbName: dbName, Storage: storage, diff --git a/pcap_importer.go b/pcap_importer.go index 2bdbd1a..129e60b 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -9,10 +9,15 @@ import ( "github.com/google/gopacket/tcpassembly" log "github.com/sirupsen/logrus" "net" + "os" + "path" + "path/filepath" "sync" "time" ) +const PcapsBasePath = "pcaps/" +const ProcessingPcapsBasePath = PcapsBasePath + "processing/" const initialAssemblerPoolSize = 16 const flushOlderThan = 5 * time.Minute const importUpdateProgressInterval = 100 * time.Millisecond @@ -46,11 +51,20 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager serverEndpoint := layers.NewIPEndpoint(serverIP) streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager)) + var result []ImportingSession + if err := storage.Find(ImportingSessions).All(&result); err != nil { + log.WithError(err).Panic("failed to retrieve importing sessions") + } + sessions := make(map[string]ImportingSession) + for _, session := range result { + sessions[session.ID] = session + } + return &PcapImporter{ storage: storage, streamPool: streamPool, assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize), - sessions: make(map[string]ImportingSession), + sessions: sessions, mAssemblers: sync.Mutex{}, mSessions: sync.Mutex{}, serverIP: serverEndpoint, @@ -62,15 +76,24 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager // create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256 // of the pcap). func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { - hash, err := Sha256Sum(fileName) + switch filepath.Ext(fileName) { + case ".pcap": + case ".pcapng": + default: + deleteProcessingFile(fileName) + return "", errors.New("invalid file extension") + } + + hash, err := Sha256Sum(ProcessingPcapsBasePath + fileName) if err != nil { - return "", err + log.WithError(err).Panic("failed to calculate pcap sha256") + deleteProcessingFile(fileName) } pi.mSessions.Lock() - _, isPresent := pi.sessions[hash] - if isPresent { + if _, isPresent := pi.sessions[hash]; isPresent { pi.mSessions.Unlock() + deleteProcessingFile(fileName) return hash, errors.New("pcap already processed") } @@ -78,7 +101,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { session := ImportingSession{ ID: hash, StartedAt: time.Now(), - Size: FileSize(fileName), + Size: FileSize(ProcessingPcapsBasePath + fileName), PacketsPerService: make(map[uint16]flowCount), cancelFunc: cancelFunc, completed: make(chan string), @@ -121,9 +144,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool { // Read the pcap and save the tcp stream flow to the database func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { - handle, err := pcap.OpenOffline(fileName) + handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { - pi.progressUpdate(session, false, "failed to process pcap") + pi.progressUpdate(session, fileName, false, "failed to process pcap") log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}). Error("failed to open pcap") return @@ -141,7 +164,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx case <-ctx.Done(): handle.Close() pi.releaseAssembler(assembler) - pi.progressUpdate(session, false, "import process cancelled") + pi.progressUpdate(session, fileName, false, "import process cancelled") return default: } @@ -150,11 +173,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx case packet := <-packets: if packet == nil { // completed if !firstPacketTime.IsZero() { - assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) + // TODO: + //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) } handle.Close() pi.releaseAssembler(assembler) - pi.progressUpdate(session, true, "") + pi.progressUpdate(session, fileName, true, "") return } @@ -195,12 +219,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) case <-updateProgressInterval: - pi.progressUpdate(session, false, "") + pi.progressUpdate(session, fileName, false, "") } } } -func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) { +func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) { if completed { session.CompletedAt = time.Now() } @@ -220,6 +244,11 @@ func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil { log.WithError(_err).WithField("session", session).Error("failed to insert importing stats") } + if completed { + moveProcessingFile(session.ID, fileName) + } else { + deleteProcessingFile(fileName) + } close(session.completed) } } @@ -244,3 +273,15 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) { pi.assemblers = append(pi.assemblers, assembler) pi.mAssemblers.Unlock() } + +func deleteProcessingFile(fileName string) { + if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil { + log.WithError(err).Error("failed to delete processing file") + } +} + +func moveProcessingFile(sessionID string, fileName string) { + if err := os.Rename(ProcessingPcapsBasePath + fileName, PcapsBasePath + sessionID + path.Ext(fileName)); err != nil { + log.WithError(err).Error("failed to move processed file") + } +} diff --git a/pcap_importer_test.go b/pcap_importer_test.go index c79556a..6f9d4a5 100644 --- a/pcap_importer_test.go +++ b/pcap_importer_test.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "fmt" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/tcpassembly" @@ -9,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "net" + "os" "sync" "testing" "time" @@ -20,12 +22,15 @@ func TestImportPcap(t *testing.T) { pcapImporter.releaseAssembler(pcapImporter.takeAssembler()) - sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + fileName := copyToProcessing(t, "ping_pong_10000.pcap") + sessionID, err := pcapImporter.ImportPcap(fileName) require.NoError(t, err) - duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + duplicatePcapFileName := copyToProcessing(t, "ping_pong_10000.pcap") + duplicateSessionID, err := pcapImporter.ImportPcap(duplicatePcapFileName) require.Error(t, err) assert.Equal(t, sessionID, duplicateSessionID) + assert.Error(t, os.Remove(ProcessingPcapsBasePath + duplicatePcapFileName)) _, isPresent := pcapImporter.GetSession("invalid") assert.False(t, isPresent) @@ -38,6 +43,9 @@ func TestImportPcap(t *testing.T) { checkSessionEquals(t, wrapper, session) + assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) + assert.NoError(t, os.Remove(PcapsBasePath + session.ID + ".pcap")) + wrapper.Destroy(t) } @@ -45,7 +53,8 @@ func TestCancelImportSession(t *testing.T) { wrapper := NewTestStorageWrapper(t) pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3") - sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap") + fileName := copyToProcessing(t, "ping_pong_10000.pcap") + sessionID, err := pcapImporter.ImportPcap(fileName) require.NoError(t, err) assert.False(t, pcapImporter.CancelSession("invalid")) @@ -61,6 +70,9 @@ func TestCancelImportSession(t *testing.T) { checkSessionEquals(t, wrapper, session) + assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) + assert.Error(t, os.Remove(PcapsBasePath + sessionID + ".pcap")) + wrapper.Destroy(t) } @@ -68,7 +80,8 @@ func TestImportNoTcpPackets(t *testing.T) { wrapper := NewTestStorageWrapper(t) pcapImporter := newTestPcapImporter(wrapper, "172.17.0.4") - sessionID, err := pcapImporter.ImportPcap("test_data/icmp.pcap") + fileName := copyToProcessing(t, "icmp.pcap") + sessionID, err := pcapImporter.ImportPcap(fileName) require.NoError(t, err) session := waitSessionCompletion(t, pcapImporter, sessionID) @@ -80,6 +93,9 @@ func TestImportNoTcpPackets(t *testing.T) { checkSessionEquals(t, wrapper, session) + assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName)) + assert.NoError(t, os.Remove(PcapsBasePath + sessionID + ".pcap")) + wrapper.Destroy(t) } @@ -127,6 +143,12 @@ func checkSessionEquals(t *testing.T, wrapper *TestStorageWrapper, session Impor assert.Equal(t, session, result) } +func copyToProcessing(t *testing.T, fileName string) string { + newFile := fmt.Sprintf("test-%v-%s", time.Now().UnixNano(), fileName) + require.NoError(t, CopyFile(ProcessingPcapsBasePath + newFile, "test_data/" + fileName)) + return newFile +} + type testStreamFactory struct { } diff --git a/pcaps/.gitignore b/pcaps/.gitignore new file mode 100644 index 0000000..5a47851 --- /dev/null +++ b/pcaps/.gitignore @@ -0,0 +1,2 @@ +*.pcap +*.pcapng
\ No newline at end of file diff --git a/pcaps/processing/.gitkeep b/pcaps/processing/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pcaps/processing/.gitkeep diff --git a/rules_manager.go b/rules_manager.go index c256354..36379e6 100644 --- a/rules_manager.go +++ b/rules_manager.go @@ -19,7 +19,6 @@ type RegexFlags struct { Caseless bool `json:"caseless" bson:"caseless,omitempty"` // Set case-insensitive matching. DotAll bool `json:"dot_all" bson:"dot_all,omitempty"` // Matching a `.` will not exclude newlines. MultiLine bool `json:"multi_line" bson:"multi_line,omitempty"` // Set multi-line anchoring. - SingleMatch bool `json:"single_match" bson:"single_match,omitempty"` // Set single-match only mode. Utf8Mode bool `json:"utf_8_mode" bson:"utf_8_mode,omitempty"` // Enable UTF-8 mode for this expression. UnicodeProperty bool `json:"unicode_property" bson:"unicode_property,omitempty"` // Enable Unicode property support for this expression } @@ -339,6 +338,7 @@ func (p *Pattern) BuildPattern() (*hyperscan.Pattern, error) { return nil, err } + hp.Flags |= hyperscan.SomLeftMost if p.Flags.Caseless { hp.Flags |= hyperscan.Caseless } @@ -348,9 +348,6 @@ func (p *Pattern) BuildPattern() (*hyperscan.Pattern, error) { if p.Flags.MultiLine { hp.Flags |= hyperscan.MultiLine } - if p.Flags.SingleMatch { - hp.Flags |= hyperscan.SingleMatch - } if p.Flags.Utf8Mode { hp.Flags |= hyperscan.Utf8Mode } diff --git a/rules_manager_test.go b/rules_manager_test.go index f06362b..a2ec501 100644 --- a/rules_manager_test.go +++ b/rules_manager_test.go @@ -47,7 +47,6 @@ func TestAddAndGetAllRules(t *testing.T) { Caseless: true, DotAll: true, MultiLine: true, - SingleMatch: true, Utf8Mode: true, UnicodeProperty: true, }, diff --git a/shared/.gitignore b/shared/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/shared/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore
\ No newline at end of file @@ -27,6 +27,7 @@ type Storage interface { } type MongoStorage struct { + client *mongo.Client collections map[string]*mongo.Collection } @@ -66,6 +67,7 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro } return &MongoStorage{ + client: client, collections: collections, }, nil } diff --git a/stream_handler_test.go b/stream_handler_test.go index 0f610f3..199ae5b 100644 --- a/stream_handler_test.go +++ b/stream_handler_test.go @@ -311,7 +311,7 @@ func createTestStreamHandler(wrapper *TestStorageWrapper, patterns hyperscan.Str dstPort := layers.NewTCPPortEndpoint(dstPort) scanner := Scanner{scratch: scratch, version: ZeroRowID} - return NewStreamHandler(testConnectionHandler, StreamFlow{srcIP, dstIP, srcPort, dstPort}, scanner) + return NewStreamHandler(testConnectionHandler, StreamFlow{srcIP, dstIP, srcPort, dstPort}, scanner, true) // TODO: test isClient } type testConnectionHandler struct { @@ -106,3 +106,24 @@ func DecodeBytes(buffer []byte, format string) string { return string(buffer) } } + +func CopyFile(dst, src string) error { + in, err := os.Open(src) + if err != nil { + return err + } + + out, err := os.Create(dst) + if err != nil { + return err + } + + if _, err = io.Copy(out, in); err != nil { + return err + } + + if err := in.Close(); err != nil { + return err + } + return out.Close() +} |