diff options
Diffstat (limited to 'connection_streams_controller.go')
-rw-r--r-- | connection_streams_controller.go | 78 |
1 files changed, 59 insertions, 19 deletions
diff --git a/connection_streams_controller.go b/connection_streams_controller.go index 251e842..9d73b0e 100644 --- a/connection_streams_controller.go +++ b/connection_streams_controller.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "context" + "github.com/eciavatta/caronte/parsers" log "github.com/sirupsen/logrus" "time" ) @@ -25,12 +27,14 @@ type ConnectionStream struct { type PatternSlice [2]uint64 type Payload struct { - FromClient bool `json:"from_client"` - Content string `json:"content"` - Index int `json:"index"` - Timestamp time.Time `json:"timestamp"` - IsRetransmitted bool `json:"is_retransmitted"` - RegexMatches []RegexSlice `json:"regex_matches"` + FromClient bool `json:"from_client"` + Content string `json:"content"` + Metadata parsers.Metadata `json:"metadata"` + IsMetadataContinuation bool `json:"is_metadata_continuation"` + Index int `json:"index"` + Timestamp time.Time `json:"timestamp"` + IsRetransmitted bool `json:"is_retransmitted"` + RegexMatches []RegexSlice `json:"regex_matches"` } type RegexSlice struct { @@ -55,8 +59,8 @@ func NewConnectionStreamsController(storage Storage) ConnectionStreamsController } func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID, - format QueryFormat) []Payload { - payloads := make([]Payload, 0, InitialPayloadsSize) + format QueryFormat) []*Payload { + payloads := make([]*Payload, 0, InitialPayloadsSize) var clientIndex, serverIndex, globalIndex uint64 if format.Limit <= 0 { @@ -75,7 +79,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c return serverBlocksIndex < len(serverStream.BlocksIndexes) } - var payload Payload + var payload *Payload + payloadsBuffer := make([]*Payload, 0, 16) + contentChunkBuffer := new(bytes.Buffer) + var lastContentSlice []byte + var sideChanged, lastClient, lastServer bool for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() { if hasClientBlocks() && (!hasServerBlocks() || // next payload is from client clientStream.BlocksTimestamps[clientBlocksIndex].UnixNano() <= @@ -85,11 +93,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c if clientBlocksIndex < len(clientStream.BlocksIndexes)-1 { end = clientStream.BlocksIndexes[clientBlocksIndex+1] } else { - end = len(clientStream.Payload) - 1 + end = len(clientStream.Payload) } size := uint64(end - start) - payload = Payload{ + payload = &Payload{ FromClient: true, Content: DecodeBytes(clientStream.Payload[start:end], format.Format), Index: start, @@ -100,17 +108,20 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c clientIndex += size globalIndex += size clientBlocksIndex++ + + lastContentSlice = clientStream.Payload[start:end] + sideChanged, lastClient, lastServer = lastServer, true, false } else { // next payload is from server start := serverStream.BlocksIndexes[serverBlocksIndex] end := 0 if serverBlocksIndex < len(serverStream.BlocksIndexes)-1 { end = serverStream.BlocksIndexes[serverBlocksIndex+1] } else { - end = len(serverStream.Payload) - 1 + end = len(serverStream.Payload) } size := uint64(end - start) - payload = Payload{ + payload = &Payload{ FromClient: false, Content: DecodeBytes(serverStream.Payload[start:end], format.Format), Index: start, @@ -121,13 +132,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c serverIndex += size globalIndex += size serverBlocksIndex++ - } - if globalIndex > format.Skip { - payloads = append(payloads, payload) - } - if globalIndex > format.Skip+format.Limit { - return payloads + lastContentSlice = serverStream.Payload[start:end] + sideChanged, lastClient, lastServer = lastClient, false, true } if !hasClientBlocks() { @@ -140,6 +147,39 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c serverBlocksIndex = 0 serverStream = csc.getConnectionStream(c, connectionID, false, serverDocumentIndex) } + + updateMetadata := func() { + metadata := parsers.Parse(contentChunkBuffer.Bytes()) + var isMetadataContinuation bool + for _, elem := range payloadsBuffer { + elem.Metadata = metadata + elem.IsMetadataContinuation = isMetadataContinuation + isMetadataContinuation = true + } + + payloadsBuffer = payloadsBuffer[:0] + contentChunkBuffer.Reset() + } + + if sideChanged { + updateMetadata() + } + payloadsBuffer = append(payloadsBuffer, payload) + contentChunkBuffer.Write(lastContentSlice) + + if clientStream.ID.IsZero() && serverStream.ID.IsZero() { + updateMetadata() + } + + if globalIndex > format.Skip { + // problem: waste of time if the payload is discarded + payloads = append(payloads, payload) + } + if globalIndex > format.Skip+format.Limit { + // problem: the last chunk is not parsed, but can be ok because it is not finished + updateMetadata() + return payloads + } } return payloads |