aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-25 17:25:55 +0000
committerEmiliano Ciavatta2020-04-25 17:26:05 +0000
commit8bd21ad9873690c52485e3581a8108c6f351e3a6 (patch)
tree3998d401e3963f05b15a68a47c84beaeb3c466fa
parent4f82402b345af658eee6485801426857e16f49f8 (diff)
Add connection_streams_controller
-rw-r--r--application_context.go19
-rw-r--r--application_router.go14
-rw-r--r--connection_handler.go2
-rw-r--r--connection_streams.go16
-rw-r--r--connection_streams_controller.go186
-rw-r--r--stream_handler.go5
-rw-r--r--utils.go29
7 files changed, 244 insertions, 27 deletions
diff --git a/application_context.go b/application_context.go
index 3ac3031..6960c7d 100644
--- a/application_context.go
+++ b/application_context.go
@@ -13,14 +13,15 @@ type Config struct {
}
type ApplicationContext struct {
- Storage Storage
- Config Config
- Accounts gin.Accounts
- RulesManager RulesManager
- PcapImporter *PcapImporter
- ConnectionsController ConnectionsController
- ServicesController *ServicesController
- IsConfigured bool
+ Storage Storage
+ Config Config
+ Accounts gin.Accounts
+ RulesManager RulesManager
+ PcapImporter *PcapImporter
+ ConnectionsController ConnectionsController
+ ServicesController *ServicesController
+ ConnectionStreamsController ConnectionStreamsController
+ IsConfigured bool
}
func CreateApplicationContext(storage Storage) (*ApplicationContext, error) {
@@ -92,6 +93,6 @@ func (sm *ApplicationContext) configure() {
sm.PcapImporter = NewPcapImporter(sm.Storage, serverIP, sm.RulesManager)
sm.ServicesController = NewServicesController(sm.Storage)
sm.ConnectionsController = NewConnectionsController(sm.Storage, sm.ServicesController)
+ sm.ConnectionStreamsController = NewConnectionStreamsController(sm.Storage)
sm.IsConfigured = true
-
}
diff --git a/application_router.go b/application_router.go
index be01e11..128b1ec 100644
--- a/application_router.go
+++ b/application_router.go
@@ -200,6 +200,20 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
}
})
+ api.GET("/streams/:id", func(c *gin.Context) {
+ id, err := RowIDFromHex(c.Param("id"))
+ if err != nil {
+ badRequest(c, err)
+ return
+ }
+ var format QueryFormat
+ if err := c.ShouldBindQuery(&format); err != nil {
+ badRequest(c, err)
+ return
+ }
+ success(c, applicationContext.ConnectionStreamsController.GetConnectionPayload(c, id, format))
+ })
+
api.GET("/services", func(c *gin.Context) {
success(c, applicationContext.ServicesController.GetServices())
})
diff --git a/connection_handler.go b/connection_handler.go
index 53e594f..ddf5e55 100644
--- a/connection_handler.go
+++ b/connection_handler.go
@@ -152,7 +152,7 @@ func (factory *BiDirectionalStreamFactory) New(net, transport gopacket.Flow) tcp
}
factory.mConnections.Unlock()
- streamHandler := NewStreamHandler(connection, flow, factory.takeScanner())
+ streamHandler := NewStreamHandler(connection, flow, factory.takeScanner(), net.Src() != factory.serverIP)
return &streamHandler
}
diff --git a/connection_streams.go b/connection_streams.go
deleted file mode 100644
index bede526..0000000
--- a/connection_streams.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package main
-
-import "time"
-
-type ConnectionStream struct {
- ID RowID `json:"id" bson:"_id"`
- ConnectionID RowID `json:"connection_id" bson:"connection_id"`
- DocumentIndex int `json:"document_index" bson:"document_index"`
- Payload []byte `json:"payload" bson:"payload"`
- BlocksIndexes []int `json:"blocks_indexes" bson:"blocks_indexes"`
- BlocksTimestamps []time.Time `json:"blocks_timestamps" bson:"blocks_timestamps"`
- BlocksLoss []bool `json:"blocks_loss" bson:"blocks_loss"`
- PatternMatches map[uint][]PatternSlice `json:"pattern_matches" bson:"pattern_matches"`
-}
-
-type PatternSlice [2]uint64
diff --git a/connection_streams_controller.go b/connection_streams_controller.go
new file mode 100644
index 0000000..6c6c962
--- /dev/null
+++ b/connection_streams_controller.go
@@ -0,0 +1,186 @@
+package main
+
+import (
+ "context"
+ log "github.com/sirupsen/logrus"
+ "time"
+)
+
+const InitialPayloadsSize = 1024
+const DefaultQueryFormatLimit = 8024
+const InitialRegexSlicesCount = 8
+
+type ConnectionStream struct {
+ ID RowID `bson:"_id"`
+ ConnectionID RowID `bson:"connection_id"`
+ FromClient bool `bson:"from_client"`
+ DocumentIndex int `bson:"document_index"`
+ Payload []byte `bson:"payload"`
+ BlocksIndexes []int `bson:"blocks_indexes"`
+ BlocksTimestamps []time.Time `bson:"blocks_timestamps"`
+ BlocksLoss []bool `bson:"blocks_loss"`
+ PatternMatches map[uint][]PatternSlice `bson:"pattern_matches"`
+}
+
+type PatternSlice [2]uint64
+
+type Payload struct {
+ FromClient bool `json:"from_client"`
+ Content string `json:"content"`
+ Index int `json:"index"`
+ Timestamp time.Time `json:"timestamp"`
+ IsRetransmitted bool `json:"is_retransmitted"`
+ RegexMatches []RegexSlice `json:"regex_matches"`
+}
+
+type RegexSlice struct {
+ From uint64 `json:"from"`
+ To uint64 `json:"to"`
+}
+
+type QueryFormat struct {
+ Format string `form:"format"`
+ Skip uint64 `form:"skip"`
+ Limit uint64 `form:"limit"`
+}
+
+type ConnectionStreamsController struct {
+ storage Storage
+}
+
+func NewConnectionStreamsController(storage Storage) ConnectionStreamsController {
+ return ConnectionStreamsController{
+ storage: storage,
+ }
+}
+
+func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID,
+ format QueryFormat) []Payload {
+ payloads := make([]Payload, 0, InitialPayloadsSize)
+ var clientIndex, serverIndex, globalIndex uint64
+
+ if format.Limit <= 0 {
+ format.Limit = DefaultQueryFormatLimit
+ }
+
+ var clientBlocksIndex, serverBlocksIndex int
+ var clientDocumentIndex, serverDocumentIndex int
+ clientStream := csc.getConnectionStream(c, connectionID, true, clientDocumentIndex)
+ serverStream := csc.getConnectionStream(c, connectionID, false, serverDocumentIndex)
+
+ hasClientBlocks := func() bool {
+ return clientBlocksIndex < len(clientStream.BlocksIndexes)
+ }
+ hasServerBlocks := func() bool {
+ return serverBlocksIndex < len(serverStream.BlocksIndexes)
+ }
+
+ var payload Payload
+ for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() {
+ if hasClientBlocks() && !(hasServerBlocks() && // next payload is from client
+ clientStream.BlocksTimestamps[0].UnixNano() > serverStream.BlocksTimestamps[0].UnixNano()) {
+ start := clientStream.BlocksIndexes[clientBlocksIndex]
+ end := 0
+ if clientBlocksIndex < len(clientStream.BlocksIndexes)-1 {
+ end = clientStream.BlocksIndexes[clientBlocksIndex+1]
+ } else {
+ end = len(clientStream.Payload) - 1
+ }
+ size := uint64(end - start)
+
+ payload = Payload{
+ FromClient: true,
+ Content: DecodeBytes(clientStream.Payload[start:end], format.Format),
+ Index: start,
+ Timestamp: clientStream.BlocksTimestamps[clientBlocksIndex],
+ IsRetransmitted: clientStream.BlocksLoss[clientBlocksIndex],
+ RegexMatches: findMatchesBetween(clientStream.PatternMatches, clientIndex, clientIndex+size),
+ }
+ clientIndex += size
+ globalIndex += size
+ clientBlocksIndex++
+ } else { // next payload is from server
+ start := serverStream.BlocksIndexes[serverBlocksIndex]
+ end := 0
+ if serverBlocksIndex < len(serverStream.BlocksIndexes)-1 {
+ end = serverStream.BlocksIndexes[serverBlocksIndex+1]
+ } else {
+ end = len(serverStream.Payload) - 1
+ }
+ size := uint64(end - start)
+
+ payload = Payload{
+ FromClient: false,
+ Content: DecodeBytes(serverStream.Payload[start:end], format.Format),
+ Index: start,
+ Timestamp: serverStream.BlocksTimestamps[serverBlocksIndex],
+ IsRetransmitted: serverStream.BlocksLoss[serverBlocksIndex],
+ RegexMatches: findMatchesBetween(serverStream.PatternMatches, serverIndex, serverIndex+size),
+ }
+ serverIndex += size
+ globalIndex += size
+ serverBlocksIndex++
+ }
+
+ if globalIndex > format.Skip {
+ payloads = append(payloads, payload)
+ }
+ if globalIndex > format.Skip+format.Limit {
+ return payloads
+ }
+
+ if !hasClientBlocks() {
+ clientDocumentIndex++
+ clientBlocksIndex = 0
+ clientStream = csc.getConnectionStream(c, connectionID, true, clientDocumentIndex)
+ }
+ if !hasServerBlocks() {
+ serverDocumentIndex++
+ serverBlocksIndex = 0
+ serverStream = csc.getConnectionStream(c, connectionID, false, serverDocumentIndex)
+ }
+ }
+
+ return payloads
+}
+
+func (csc ConnectionStreamsController) getConnectionStream(c context.Context, connectionID RowID, fromClient bool,
+ documentIndex int) ConnectionStream {
+ var result ConnectionStream
+ if err := csc.storage.Find(ConnectionStreams).Filter(OrderedDocument{
+ {"connection_id", connectionID},
+ {"from_client", fromClient},
+ {"document_index", documentIndex},
+ }).Context(c).First(&result); err != nil {
+ log.WithError(err).WithField("connection_id", connectionID).Panic("failed to get a ConnectionStream")
+ }
+ return result
+}
+
+func findMatchesBetween(patternMatches map[uint][]PatternSlice, from, to uint64) []RegexSlice {
+ regexSlices := make([]RegexSlice, 0, InitialRegexSlicesCount)
+ for _, slices := range patternMatches {
+ for _, slice := range slices {
+ if from > slice[1] || to <= slice[0] {
+ continue
+ }
+
+ log.Info(slice[0], slice[1], from, to)
+ var start, end uint64
+ if from > slice[0] {
+ start = 0
+ } else {
+ start = slice[0] - from
+ }
+
+ if to <= slice[1] {
+ end = to - from
+ } else {
+ end = slice[1] - from
+ }
+
+ regexSlices = append(regexSlices, RegexSlice{From: start, To: end})
+ }
+ }
+ return regexSlices
+}
diff --git a/stream_handler.go b/stream_handler.go
index 97975fa..4d33b01 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -31,10 +31,11 @@ type StreamHandler struct {
patternStream hyperscan.Stream
patternMatches map[uint][]PatternSlice
scanner Scanner
+ isClient bool
}
// NewReaderStream returns a new StreamHandler object.
-func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner) StreamHandler {
+func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner, isClient bool) StreamHandler {
handler := StreamHandler{
connection: connection,
streamFlow: streamFlow,
@@ -45,6 +46,7 @@ func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scann
documentsIDs: make([]RowID, 0, 1), // most of the time the stream fit in one document
patternMatches: make(map[uint][]PatternSlice, connection.PatternsDatabaseSize()),
scanner: scanner,
+ isClient: isClient,
}
stream, err := connection.PatternsDatabase().Open(0, scanner.scratch, handler.onMatch, nil)
@@ -159,6 +161,7 @@ func (sh *StreamHandler) storageCurrentDocument() {
BlocksTimestamps: sh.timestamps,
BlocksLoss: sh.lossBlocks,
PatternMatches: sh.patternMatches,
+ FromClient: sh.isClient,
}); err != nil {
log.WithError(err).Error("failed to insert connection stream")
} else {
diff --git a/utils.go b/utils.go
index 5028616..3b66822 100644
--- a/utils.go
+++ b/utils.go
@@ -2,6 +2,8 @@ package main
import (
"crypto/sha256"
+ "encoding/base32"
+ "encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
@@ -77,3 +79,30 @@ func FileSize(filename string) int64 {
func byID(id RowID) OrderedDocument {
return OrderedDocument{{"_id", id}}
}
+
+func DecodeBytes(buffer []byte, format string) string {
+ switch format {
+ case "hex":
+ return hex.EncodeToString(buffer)
+ case "hexdump":
+ return hex.Dump(buffer)
+ case "base32":
+ return base32.StdEncoding.EncodeToString(buffer)
+ case "base64":
+ return base64.StdEncoding.EncodeToString(buffer)
+ case "ascii":
+ str := fmt.Sprintf("%+q", buffer)
+ return str[1 : len(str)-1]
+ case "binary":
+ str := fmt.Sprintf("%b", buffer)
+ return str[1 : len(str)-1]
+ case "decimal":
+ str := fmt.Sprintf("%d", buffer)
+ return str[1 : len(str)-1]
+ case "octal":
+ str := fmt.Sprintf("%o", buffer)
+ return str[1 : len(str)-1]
+ default:
+ return string(buffer)
+ }
+}