From 8bd21ad9873690c52485e3581a8108c6f351e3a6 Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Sat, 25 Apr 2020 19:25:55 +0200 Subject: Add connection_streams_controller --- application_context.go | 19 ++-- application_router.go | 14 +++ connection_handler.go | 2 +- connection_streams.go | 16 ---- connection_streams_controller.go | 186 +++++++++++++++++++++++++++++++++++++++ stream_handler.go | 5 +- utils.go | 29 ++++++ 7 files changed, 244 insertions(+), 27 deletions(-) delete mode 100644 connection_streams.go create mode 100644 connection_streams_controller.go diff --git a/application_context.go b/application_context.go index 3ac3031..6960c7d 100644 --- a/application_context.go +++ b/application_context.go @@ -13,14 +13,15 @@ type Config struct { } type ApplicationContext struct { - Storage Storage - Config Config - Accounts gin.Accounts - RulesManager RulesManager - PcapImporter *PcapImporter - ConnectionsController ConnectionsController - ServicesController *ServicesController - IsConfigured bool + Storage Storage + Config Config + Accounts gin.Accounts + RulesManager RulesManager + PcapImporter *PcapImporter + ConnectionsController ConnectionsController + ServicesController *ServicesController + ConnectionStreamsController ConnectionStreamsController + IsConfigured bool } func CreateApplicationContext(storage Storage) (*ApplicationContext, error) { @@ -92,6 +93,6 @@ func (sm *ApplicationContext) configure() { sm.PcapImporter = NewPcapImporter(sm.Storage, serverIP, sm.RulesManager) sm.ServicesController = NewServicesController(sm.Storage) sm.ConnectionsController = NewConnectionsController(sm.Storage, sm.ServicesController) + sm.ConnectionStreamsController = NewConnectionStreamsController(sm.Storage) sm.IsConfigured = true - } diff --git a/application_router.go b/application_router.go index be01e11..128b1ec 100644 --- a/application_router.go +++ b/application_router.go @@ -200,6 +200,20 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine } }) + api.GET("/streams/:id", func(c *gin.Context) { + id, err := RowIDFromHex(c.Param("id")) + if err != nil { + badRequest(c, err) + return + } + var format QueryFormat + if err := c.ShouldBindQuery(&format); err != nil { + badRequest(c, err) + return + } + success(c, applicationContext.ConnectionStreamsController.GetConnectionPayload(c, id, format)) + }) + api.GET("/services", func(c *gin.Context) { success(c, applicationContext.ServicesController.GetServices()) }) diff --git a/connection_handler.go b/connection_handler.go index 53e594f..ddf5e55 100644 --- a/connection_handler.go +++ b/connection_handler.go @@ -152,7 +152,7 @@ func (factory *BiDirectionalStreamFactory) New(net, transport gopacket.Flow) tcp } factory.mConnections.Unlock() - streamHandler := NewStreamHandler(connection, flow, factory.takeScanner()) + streamHandler := NewStreamHandler(connection, flow, factory.takeScanner(), net.Src() != factory.serverIP) return &streamHandler } diff --git a/connection_streams.go b/connection_streams.go deleted file mode 100644 index bede526..0000000 --- a/connection_streams.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import "time" - -type ConnectionStream struct { - ID RowID `json:"id" bson:"_id"` - ConnectionID RowID `json:"connection_id" bson:"connection_id"` - DocumentIndex int `json:"document_index" bson:"document_index"` - Payload []byte `json:"payload" bson:"payload"` - BlocksIndexes []int `json:"blocks_indexes" bson:"blocks_indexes"` - BlocksTimestamps []time.Time `json:"blocks_timestamps" bson:"blocks_timestamps"` - BlocksLoss []bool `json:"blocks_loss" bson:"blocks_loss"` - PatternMatches map[uint][]PatternSlice `json:"pattern_matches" bson:"pattern_matches"` -} - -type PatternSlice [2]uint64 diff --git a/connection_streams_controller.go b/connection_streams_controller.go new file mode 100644 index 0000000..6c6c962 --- /dev/null +++ b/connection_streams_controller.go @@ -0,0 +1,186 @@ +package main + +import ( + "context" + log "github.com/sirupsen/logrus" + "time" +) + +const InitialPayloadsSize = 1024 +const DefaultQueryFormatLimit = 8024 +const InitialRegexSlicesCount = 8 + +type ConnectionStream struct { + ID RowID `bson:"_id"` + ConnectionID RowID `bson:"connection_id"` + FromClient bool `bson:"from_client"` + DocumentIndex int `bson:"document_index"` + Payload []byte `bson:"payload"` + BlocksIndexes []int `bson:"blocks_indexes"` + BlocksTimestamps []time.Time `bson:"blocks_timestamps"` + BlocksLoss []bool `bson:"blocks_loss"` + PatternMatches map[uint][]PatternSlice `bson:"pattern_matches"` +} + +type PatternSlice [2]uint64 + +type Payload struct { + FromClient bool `json:"from_client"` + Content string `json:"content"` + Index int `json:"index"` + Timestamp time.Time `json:"timestamp"` + IsRetransmitted bool `json:"is_retransmitted"` + RegexMatches []RegexSlice `json:"regex_matches"` +} + +type RegexSlice struct { + From uint64 `json:"from"` + To uint64 `json:"to"` +} + +type QueryFormat struct { + Format string `form:"format"` + Skip uint64 `form:"skip"` + Limit uint64 `form:"limit"` +} + +type ConnectionStreamsController struct { + storage Storage +} + +func NewConnectionStreamsController(storage Storage) ConnectionStreamsController { + return ConnectionStreamsController{ + storage: storage, + } +} + +func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID, + format QueryFormat) []Payload { + payloads := make([]Payload, 0, InitialPayloadsSize) + var clientIndex, serverIndex, globalIndex uint64 + + if format.Limit <= 0 { + format.Limit = DefaultQueryFormatLimit + } + + var clientBlocksIndex, serverBlocksIndex int + var clientDocumentIndex, serverDocumentIndex int + clientStream := csc.getConnectionStream(c, connectionID, true, clientDocumentIndex) + serverStream := csc.getConnectionStream(c, connectionID, false, serverDocumentIndex) + + hasClientBlocks := func() bool { + return clientBlocksIndex < len(clientStream.BlocksIndexes) + } + hasServerBlocks := func() bool { + return serverBlocksIndex < len(serverStream.BlocksIndexes) + } + + var payload Payload + for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() { + if hasClientBlocks() && !(hasServerBlocks() && // next payload is from client + clientStream.BlocksTimestamps[0].UnixNano() > serverStream.BlocksTimestamps[0].UnixNano()) { + start := clientStream.BlocksIndexes[clientBlocksIndex] + end := 0 + if clientBlocksIndex < len(clientStream.BlocksIndexes)-1 { + end = clientStream.BlocksIndexes[clientBlocksIndex+1] + } else { + end = len(clientStream.Payload) - 1 + } + size := uint64(end - start) + + payload = Payload{ + FromClient: true, + Content: DecodeBytes(clientStream.Payload[start:end], format.Format), + Index: start, + Timestamp: clientStream.BlocksTimestamps[clientBlocksIndex], + IsRetransmitted: clientStream.BlocksLoss[clientBlocksIndex], + RegexMatches: findMatchesBetween(clientStream.PatternMatches, clientIndex, clientIndex+size), + } + clientIndex += size + globalIndex += size + clientBlocksIndex++ + } else { // next payload is from server + start := serverStream.BlocksIndexes[serverBlocksIndex] + end := 0 + if serverBlocksIndex < len(serverStream.BlocksIndexes)-1 { + end = serverStream.BlocksIndexes[serverBlocksIndex+1] + } else { + end = len(serverStream.Payload) - 1 + } + size := uint64(end - start) + + payload = Payload{ + FromClient: false, + Content: DecodeBytes(serverStream.Payload[start:end], format.Format), + Index: start, + Timestamp: serverStream.BlocksTimestamps[serverBlocksIndex], + IsRetransmitted: serverStream.BlocksLoss[serverBlocksIndex], + RegexMatches: findMatchesBetween(serverStream.PatternMatches, serverIndex, serverIndex+size), + } + serverIndex += size + globalIndex += size + serverBlocksIndex++ + } + + if globalIndex > format.Skip { + payloads = append(payloads, payload) + } + if globalIndex > format.Skip+format.Limit { + return payloads + } + + if !hasClientBlocks() { + clientDocumentIndex++ + clientBlocksIndex = 0 + clientStream = csc.getConnectionStream(c, connectionID, true, clientDocumentIndex) + } + if !hasServerBlocks() { + serverDocumentIndex++ + serverBlocksIndex = 0 + serverStream = csc.getConnectionStream(c, connectionID, false, serverDocumentIndex) + } + } + + return payloads +} + +func (csc ConnectionStreamsController) getConnectionStream(c context.Context, connectionID RowID, fromClient bool, + documentIndex int) ConnectionStream { + var result ConnectionStream + if err := csc.storage.Find(ConnectionStreams).Filter(OrderedDocument{ + {"connection_id", connectionID}, + {"from_client", fromClient}, + {"document_index", documentIndex}, + }).Context(c).First(&result); err != nil { + log.WithError(err).WithField("connection_id", connectionID).Panic("failed to get a ConnectionStream") + } + return result +} + +func findMatchesBetween(patternMatches map[uint][]PatternSlice, from, to uint64) []RegexSlice { + regexSlices := make([]RegexSlice, 0, InitialRegexSlicesCount) + for _, slices := range patternMatches { + for _, slice := range slices { + if from > slice[1] || to <= slice[0] { + continue + } + + log.Info(slice[0], slice[1], from, to) + var start, end uint64 + if from > slice[0] { + start = 0 + } else { + start = slice[0] - from + } + + if to <= slice[1] { + end = to - from + } else { + end = slice[1] - from + } + + regexSlices = append(regexSlices, RegexSlice{From: start, To: end}) + } + } + return regexSlices +} diff --git a/stream_handler.go b/stream_handler.go index 97975fa..4d33b01 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -31,10 +31,11 @@ type StreamHandler struct { patternStream hyperscan.Stream patternMatches map[uint][]PatternSlice scanner Scanner + isClient bool } // NewReaderStream returns a new StreamHandler object. -func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner) StreamHandler { +func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scanner Scanner, isClient bool) StreamHandler { handler := StreamHandler{ connection: connection, streamFlow: streamFlow, @@ -45,6 +46,7 @@ func NewStreamHandler(connection ConnectionHandler, streamFlow StreamFlow, scann documentsIDs: make([]RowID, 0, 1), // most of the time the stream fit in one document patternMatches: make(map[uint][]PatternSlice, connection.PatternsDatabaseSize()), scanner: scanner, + isClient: isClient, } stream, err := connection.PatternsDatabase().Open(0, scanner.scratch, handler.onMatch, nil) @@ -159,6 +161,7 @@ func (sh *StreamHandler) storageCurrentDocument() { BlocksTimestamps: sh.timestamps, BlocksLoss: sh.lossBlocks, PatternMatches: sh.patternMatches, + FromClient: sh.isClient, }); err != nil { log.WithError(err).Error("failed to insert connection stream") } else { diff --git a/utils.go b/utils.go index 5028616..3b66822 100644 --- a/utils.go +++ b/utils.go @@ -2,6 +2,8 @@ package main import ( "crypto/sha256" + "encoding/base32" + "encoding/base64" "encoding/binary" "encoding/hex" "fmt" @@ -77,3 +79,30 @@ func FileSize(filename string) int64 { func byID(id RowID) OrderedDocument { return OrderedDocument{{"_id", id}} } + +func DecodeBytes(buffer []byte, format string) string { + switch format { + case "hex": + return hex.EncodeToString(buffer) + case "hexdump": + return hex.Dump(buffer) + case "base32": + return base32.StdEncoding.EncodeToString(buffer) + case "base64": + return base64.StdEncoding.EncodeToString(buffer) + case "ascii": + str := fmt.Sprintf("%+q", buffer) + return str[1 : len(str)-1] + case "binary": + str := fmt.Sprintf("%b", buffer) + return str[1 : len(str)-1] + case "decimal": + str := fmt.Sprintf("%d", buffer) + return str[1 : len(str)-1] + case "octal": + str := fmt.Sprintf("%o", buffer) + return str[1 : len(str)-1] + default: + return string(buffer) + } +} -- cgit v1.2.3-70-g09d2