aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--application_router.go62
-rw-r--r--application_router_test.go8
-rw-r--r--caronte_test.go6
-rw-r--r--pcap_importer.go67
-rw-r--r--pcap_importer_test.go30
-rw-r--r--pcaps/.gitignore2
-rw-r--r--pcaps/processing/.gitkeep0
-rw-r--r--rules_manager.go5
-rw-r--r--rules_manager_test.go1
-rw-r--r--shared/.gitignore2
-rw-r--r--storage.go2
-rw-r--r--stream_handler_test.go2
-rw-r--r--utils.go21
13 files changed, 170 insertions, 38 deletions
diff --git a/application_router.go b/application_router.go
index 128b1ec..ea68e9d 100644
--- a/application_router.go
+++ b/application_router.go
@@ -2,14 +2,20 @@ package main
import (
"errors"
+ "fmt"
"github.com/gin-gonic/gin"
+ log "github.com/sirupsen/logrus"
"net/http"
+ "os"
+ "path/filepath"
+ "time"
)
func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine {
router := gin.New()
router.Use(gin.Logger())
router.Use(gin.Recovery())
+ router.MaxMultipartMemory = 8 << 30
// engine.Static("/", "./frontend/build")
@@ -96,22 +102,49 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
}
})
+ api.POST("/pcap/upload", func(c *gin.Context) {
+ fileHeader, err := c.FormFile("file")
+ if err != nil {
+ badRequest(c, err)
+ return
+ }
+ fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), fileHeader.Filename)
+ if err := c.SaveUploadedFile(fileHeader, ProcessingPcapsBasePath + fileName); err != nil {
+ log.WithError(err).Panic("failed to save uploaded file")
+ }
+
+ if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil {
+ unprocessableEntity(c, err)
+ } else {
+ c.JSON(http.StatusAccepted, gin.H{"session": sessionID})
+ }
+ })
+
api.POST("/pcap/file", func(c *gin.Context) {
- var body struct {
- Path string
+ var request struct {
+ File string `json:"file"`
+ DeleteOriginalFile bool `json:"delete_original_file"`
}
- if err := c.ShouldBindJSON(&body); err != nil {
+ if err := c.ShouldBindJSON(&request); err != nil {
badRequest(c, err)
return
}
-
- if !FileExists(body.Path) {
- unprocessableEntity(c, errors.New("invalid path"))
+ if !FileExists(request.File) {
+ badRequest(c, errors.New("file not exists"))
return
}
- if sessionID, err := applicationContext.PcapImporter.ImportPcap(body.Path); err != nil {
+ fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), filepath.Base(request.File))
+ if err := CopyFile(ProcessingPcapsBasePath + fileName, request.File); err != nil {
+ log.WithError(err).Panic("failed to copy pcap file")
+ }
+ if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil {
+ if request.DeleteOriginalFile {
+ if err := os.Remove(request.File); err != nil {
+ log.WithError(err).Panic("failed to remove processed file")
+ }
+ }
unprocessableEntity(c, err)
} else {
c.JSON(http.StatusAccepted, gin.H{"session": sessionID})
@@ -131,6 +164,21 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
}
})
+ api.GET("/pcap/sessions/:id/download", func(c *gin.Context) {
+ sessionID := c.Param("id")
+ if _, isPresent := applicationContext.PcapImporter.GetSession(sessionID); isPresent {
+ if FileExists(PcapsBasePath + sessionID + ".pcap") {
+ c.FileAttachment(PcapsBasePath + sessionID + ".pcap", sessionID[:16] + ".pcap")
+ } else if FileExists(PcapsBasePath + sessionID + ".pcapng") {
+ c.FileAttachment(PcapsBasePath + sessionID + ".pcapng", sessionID[:16] + ".pcapng")
+ } else {
+ log.WithField("sessionID", sessionID).Panic("pcap file not exists")
+ }
+ } else {
+ notFound(c, gin.H{"session": sessionID})
+ }
+ })
+
api.DELETE("/pcap/sessions/:id", func(c *gin.Context) {
sessionID := c.Param("id")
session := gin.H{"session": sessionID}
diff --git a/application_router_test.go b/application_router_test.go
index c6f902c..c8e4474 100644
--- a/application_router_test.go
+++ b/application_router_test.go
@@ -102,15 +102,15 @@ func TestPcapImporterApi(t *testing.T) {
// Import pcap
assert.Equal(t, http.StatusBadRequest, toolkit.MakeRequest("POST", "/api/pcap/file", nil).Code)
- assert.Equal(t, http.StatusUnprocessableEntity, toolkit.MakeRequest("POST", "/api/pcap/file",
- gin.H{"path": "invalidPath"}).Code)
- w := toolkit.MakeRequest("POST", "/api/pcap/file", gin.H{"path": "test_data/ping_pong_10000.pcap"})
+ assert.Equal(t, http.StatusBadRequest, toolkit.MakeRequest("POST", "/api/pcap/file",
+ gin.H{"file": "invalidPath"}).Code)
+ w := toolkit.MakeRequest("POST", "/api/pcap/file", gin.H{"file": "test_data/ping_pong_10000.pcap"})
var sessionID struct{ Session string }
assert.Equal(t, http.StatusAccepted, w.Code)
assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &sessionID))
assert.Equal(t, "369ef4b6abb6214b4ee2e0c81ecb93c49e275c26c85e30493b37727d408cf280", sessionID.Session)
assert.Equal(t, http.StatusUnprocessableEntity, toolkit.MakeRequest("POST", "/api/pcap/file",
- gin.H{"path": "test_data/ping_pong_10000.pcap"}).Code) // duplicate
+ gin.H{"file": "test_data/ping_pong_10000.pcap"}).Code) // duplicate
// Get sessions
var sessions []ImportingSession
diff --git a/caronte_test.go b/caronte_test.go
index b248ce3..12ec50f 100644
--- a/caronte_test.go
+++ b/caronte_test.go
@@ -32,12 +32,10 @@ func NewTestStorageWrapper(t *testing.T) *TestStorageWrapper {
dbName := fmt.Sprintf("%x", time.Now().UnixNano())
log.WithField("dbName", dbName).Info("creating new storage")
- storage := NewMongoStorage(mongoHost, port, dbName)
+ storage, err := NewMongoStorage(mongoHost, port, dbName)
+ require.NoError(t, err)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
- err = storage.Connect(ctx)
- require.NoError(t, err, "failed to connect to database")
-
return &TestStorageWrapper{
DbName: dbName,
Storage: storage,
diff --git a/pcap_importer.go b/pcap_importer.go
index 2bdbd1a..129e60b 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -9,10 +9,15 @@ import (
"github.com/google/gopacket/tcpassembly"
log "github.com/sirupsen/logrus"
"net"
+ "os"
+ "path"
+ "path/filepath"
"sync"
"time"
)
+const PcapsBasePath = "pcaps/"
+const ProcessingPcapsBasePath = PcapsBasePath + "processing/"
const initialAssemblerPoolSize = 16
const flushOlderThan = 5 * time.Minute
const importUpdateProgressInterval = 100 * time.Millisecond
@@ -46,11 +51,20 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager
serverEndpoint := layers.NewIPEndpoint(serverIP)
streamPool := tcpassembly.NewStreamPool(NewBiDirectionalStreamFactory(storage, serverEndpoint, rulesManager))
+ var result []ImportingSession
+ if err := storage.Find(ImportingSessions).All(&result); err != nil {
+ log.WithError(err).Panic("failed to retrieve importing sessions")
+ }
+ sessions := make(map[string]ImportingSession)
+ for _, session := range result {
+ sessions[session.ID] = session
+ }
+
return &PcapImporter{
storage: storage,
streamPool: streamPool,
assemblers: make([]*tcpassembly.Assembler, 0, initialAssemblerPoolSize),
- sessions: make(map[string]ImportingSession),
+ sessions: sessions,
mAssemblers: sync.Mutex{},
mSessions: sync.Mutex{},
serverIP: serverEndpoint,
@@ -62,15 +76,24 @@ func NewPcapImporter(storage Storage, serverIP net.IP, rulesManager RulesManager
// create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256
// of the pcap).
func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
- hash, err := Sha256Sum(fileName)
+ switch filepath.Ext(fileName) {
+ case ".pcap":
+ case ".pcapng":
+ default:
+ deleteProcessingFile(fileName)
+ return "", errors.New("invalid file extension")
+ }
+
+ hash, err := Sha256Sum(ProcessingPcapsBasePath + fileName)
if err != nil {
- return "", err
+ log.WithError(err).Panic("failed to calculate pcap sha256")
+ deleteProcessingFile(fileName)
}
pi.mSessions.Lock()
- _, isPresent := pi.sessions[hash]
- if isPresent {
+ if _, isPresent := pi.sessions[hash]; isPresent {
pi.mSessions.Unlock()
+ deleteProcessingFile(fileName)
return hash, errors.New("pcap already processed")
}
@@ -78,7 +101,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
session := ImportingSession{
ID: hash,
StartedAt: time.Now(),
- Size: FileSize(fileName),
+ Size: FileSize(ProcessingPcapsBasePath + fileName),
PacketsPerService: make(map[uint16]flowCount),
cancelFunc: cancelFunc,
completed: make(chan string),
@@ -121,9 +144,9 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool {
// Read the pcap and save the tcp stream flow to the database
func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
- handle, err := pcap.OpenOffline(fileName)
+ handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
- pi.progressUpdate(session, false, "failed to process pcap")
+ pi.progressUpdate(session, fileName, false, "failed to process pcap")
log.WithError(err).WithFields(log.Fields{"session": session, "fileName": fileName}).
Error("failed to open pcap")
return
@@ -141,7 +164,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
case <-ctx.Done():
handle.Close()
pi.releaseAssembler(assembler)
- pi.progressUpdate(session, false, "import process cancelled")
+ pi.progressUpdate(session, fileName, false, "import process cancelled")
return
default:
}
@@ -150,11 +173,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
case packet := <-packets:
if packet == nil { // completed
if !firstPacketTime.IsZero() {
- assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
+ // TODO:
+ //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
}
handle.Close()
pi.releaseAssembler(assembler)
- pi.progressUpdate(session, true, "")
+ pi.progressUpdate(session, fileName, true, "")
return
}
@@ -195,12 +219,12 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
case <-updateProgressInterval:
- pi.progressUpdate(session, false, "")
+ pi.progressUpdate(session, fileName, false, "")
}
}
}
-func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool, err string) {
+func (pi *PcapImporter) progressUpdate(session ImportingSession, fileName string, completed bool, err string) {
if completed {
session.CompletedAt = time.Now()
}
@@ -220,6 +244,11 @@ func (pi *PcapImporter) progressUpdate(session ImportingSession, completed bool,
if _, _err := pi.storage.Insert(ImportingSessions).One(session); _err != nil {
log.WithError(_err).WithField("session", session).Error("failed to insert importing stats")
}
+ if completed {
+ moveProcessingFile(session.ID, fileName)
+ } else {
+ deleteProcessingFile(fileName)
+ }
close(session.completed)
}
}
@@ -244,3 +273,15 @@ func (pi *PcapImporter) releaseAssembler(assembler *tcpassembly.Assembler) {
pi.assemblers = append(pi.assemblers, assembler)
pi.mAssemblers.Unlock()
}
+
+func deleteProcessingFile(fileName string) {
+ if err := os.Remove(ProcessingPcapsBasePath + fileName); err != nil {
+ log.WithError(err).Error("failed to delete processing file")
+ }
+}
+
+func moveProcessingFile(sessionID string, fileName string) {
+ if err := os.Rename(ProcessingPcapsBasePath + fileName, PcapsBasePath + sessionID + path.Ext(fileName)); err != nil {
+ log.WithError(err).Error("failed to move processed file")
+ }
+}
diff --git a/pcap_importer_test.go b/pcap_importer_test.go
index c79556a..6f9d4a5 100644
--- a/pcap_importer_test.go
+++ b/pcap_importer_test.go
@@ -2,6 +2,7 @@ package main
import (
"bufio"
+ "fmt"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/tcpassembly"
@@ -9,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net"
+ "os"
"sync"
"testing"
"time"
@@ -20,12 +22,15 @@ func TestImportPcap(t *testing.T) {
pcapImporter.releaseAssembler(pcapImporter.takeAssembler())
- sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ fileName := copyToProcessing(t, "ping_pong_10000.pcap")
+ sessionID, err := pcapImporter.ImportPcap(fileName)
require.NoError(t, err)
- duplicateSessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ duplicatePcapFileName := copyToProcessing(t, "ping_pong_10000.pcap")
+ duplicateSessionID, err := pcapImporter.ImportPcap(duplicatePcapFileName)
require.Error(t, err)
assert.Equal(t, sessionID, duplicateSessionID)
+ assert.Error(t, os.Remove(ProcessingPcapsBasePath + duplicatePcapFileName))
_, isPresent := pcapImporter.GetSession("invalid")
assert.False(t, isPresent)
@@ -38,6 +43,9 @@ func TestImportPcap(t *testing.T) {
checkSessionEquals(t, wrapper, session)
+ assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName))
+ assert.NoError(t, os.Remove(PcapsBasePath + session.ID + ".pcap"))
+
wrapper.Destroy(t)
}
@@ -45,7 +53,8 @@ func TestCancelImportSession(t *testing.T) {
wrapper := NewTestStorageWrapper(t)
pcapImporter := newTestPcapImporter(wrapper, "172.17.0.3")
- sessionID, err := pcapImporter.ImportPcap("test_data/ping_pong_10000.pcap")
+ fileName := copyToProcessing(t, "ping_pong_10000.pcap")
+ sessionID, err := pcapImporter.ImportPcap(fileName)
require.NoError(t, err)
assert.False(t, pcapImporter.CancelSession("invalid"))
@@ -61,6 +70,9 @@ func TestCancelImportSession(t *testing.T) {
checkSessionEquals(t, wrapper, session)
+ assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName))
+ assert.Error(t, os.Remove(PcapsBasePath + sessionID + ".pcap"))
+
wrapper.Destroy(t)
}
@@ -68,7 +80,8 @@ func TestImportNoTcpPackets(t *testing.T) {
wrapper := NewTestStorageWrapper(t)
pcapImporter := newTestPcapImporter(wrapper, "172.17.0.4")
- sessionID, err := pcapImporter.ImportPcap("test_data/icmp.pcap")
+ fileName := copyToProcessing(t, "icmp.pcap")
+ sessionID, err := pcapImporter.ImportPcap(fileName)
require.NoError(t, err)
session := waitSessionCompletion(t, pcapImporter, sessionID)
@@ -80,6 +93,9 @@ func TestImportNoTcpPackets(t *testing.T) {
checkSessionEquals(t, wrapper, session)
+ assert.Error(t, os.Remove(ProcessingPcapsBasePath + fileName))
+ assert.NoError(t, os.Remove(PcapsBasePath + sessionID + ".pcap"))
+
wrapper.Destroy(t)
}
@@ -127,6 +143,12 @@ func checkSessionEquals(t *testing.T, wrapper *TestStorageWrapper, session Impor
assert.Equal(t, session, result)
}
+func copyToProcessing(t *testing.T, fileName string) string {
+ newFile := fmt.Sprintf("test-%v-%s", time.Now().UnixNano(), fileName)
+ require.NoError(t, CopyFile(ProcessingPcapsBasePath + newFile, "test_data/" + fileName))
+ return newFile
+}
+
type testStreamFactory struct {
}
diff --git a/pcaps/.gitignore b/pcaps/.gitignore
new file mode 100644
index 0000000..5a47851
--- /dev/null
+++ b/pcaps/.gitignore
@@ -0,0 +1,2 @@
+*.pcap
+*.pcapng \ No newline at end of file
diff --git a/pcaps/processing/.gitkeep b/pcaps/processing/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/pcaps/processing/.gitkeep
diff --git a/rules_manager.go b/rules_manager.go
index c256354..36379e6 100644
--- a/rules_manager.go
+++ b/rules_manager.go
@@ -19,7 +19,6 @@ type RegexFlags struct {
Caseless bool `json:"caseless" bson:"caseless,omitempty"` // Set case-insensitive matching.
DotAll bool `json:"dot_all" bson:"dot_all,omitempty"` // Matching a `.` will not exclude newlines.
MultiLine bool `json:"multi_line" bson:"multi_line,omitempty"` // Set multi-line anchoring.
- SingleMatch bool `json:"single_match" bson:"single_match,omitempty"` // Set single-match only mode.
Utf8Mode bool `json:"utf_8_mode" bson:"utf_8_mode,omitempty"` // Enable UTF-8 mode for this expression.
UnicodeProperty bool `json:"unicode_property" bson:"unicode_property,omitempty"` // Enable Unicode property support for this expression
}
@@ -339,6 +338,7 @@ func (p *Pattern) BuildPattern() (*hyperscan.Pattern, error) {
return nil, err
}
+ hp.Flags |= hyperscan.SomLeftMost
if p.Flags.Caseless {
hp.Flags |= hyperscan.Caseless
}
@@ -348,9 +348,6 @@ func (p *Pattern) BuildPattern() (*hyperscan.Pattern, error) {
if p.Flags.MultiLine {
hp.Flags |= hyperscan.MultiLine
}
- if p.Flags.SingleMatch {
- hp.Flags |= hyperscan.SingleMatch
- }
if p.Flags.Utf8Mode {
hp.Flags |= hyperscan.Utf8Mode
}
diff --git a/rules_manager_test.go b/rules_manager_test.go
index f06362b..a2ec501 100644
--- a/rules_manager_test.go
+++ b/rules_manager_test.go
@@ -47,7 +47,6 @@ func TestAddAndGetAllRules(t *testing.T) {
Caseless: true,
DotAll: true,
MultiLine: true,
- SingleMatch: true,
Utf8Mode: true,
UnicodeProperty: true,
},
diff --git a/shared/.gitignore b/shared/.gitignore
new file mode 100644
index 0000000..c96a04f
--- /dev/null
+++ b/shared/.gitignore
@@ -0,0 +1,2 @@
+*
+!.gitignore \ No newline at end of file
diff --git a/storage.go b/storage.go
index 502ff14..b1d2a7d 100644
--- a/storage.go
+++ b/storage.go
@@ -27,6 +27,7 @@ type Storage interface {
}
type MongoStorage struct {
+ client *mongo.Client
collections map[string]*mongo.Collection
}
@@ -66,6 +67,7 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro
}
return &MongoStorage{
+ client: client,
collections: collections,
}, nil
}
diff --git a/stream_handler_test.go b/stream_handler_test.go
index 0f610f3..199ae5b 100644
--- a/stream_handler_test.go
+++ b/stream_handler_test.go
@@ -311,7 +311,7 @@ func createTestStreamHandler(wrapper *TestStorageWrapper, patterns hyperscan.Str
dstPort := layers.NewTCPPortEndpoint(dstPort)
scanner := Scanner{scratch: scratch, version: ZeroRowID}
- return NewStreamHandler(testConnectionHandler, StreamFlow{srcIP, dstIP, srcPort, dstPort}, scanner)
+ return NewStreamHandler(testConnectionHandler, StreamFlow{srcIP, dstIP, srcPort, dstPort}, scanner, true) // TODO: test isClient
}
type testConnectionHandler struct {
diff --git a/utils.go b/utils.go
index 3b66822..a015b75 100644
--- a/utils.go
+++ b/utils.go
@@ -106,3 +106,24 @@ func DecodeBytes(buffer []byte, format string) string {
return string(buffer)
}
}
+
+func CopyFile(dst, src string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return err
+ }
+
+ if _, err = io.Copy(out, in); err != nil {
+ return err
+ }
+
+ if err := in.Close(); err != nil {
+ return err
+ }
+ return out.Close()
+}