aboutsummaryrefslogtreecommitdiff
path: root/connection_streams_controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'connection_streams_controller.go')
-rw-r--r--connection_streams_controller.go88
1 files changed, 60 insertions, 28 deletions
diff --git a/connection_streams_controller.go b/connection_streams_controller.go
index 096210e..9d73b0e 100644
--- a/connection_streams_controller.go
+++ b/connection_streams_controller.go
@@ -1,7 +1,9 @@
package main
import (
+ "bytes"
"context"
+ "github.com/eciavatta/caronte/parsers"
log "github.com/sirupsen/logrus"
"time"
)
@@ -25,13 +27,14 @@ type ConnectionStream struct {
type PatternSlice [2]uint64
type Payload struct {
- FromClient bool `json:"from_client"`
- Content string `json:"content"`
- DecodedContent string `json:"decoded_content"`
- Index int `json:"index"`
- Timestamp time.Time `json:"timestamp"`
- IsRetransmitted bool `json:"is_retransmitted"`
- RegexMatches []RegexSlice `json:"regex_matches"`
+ FromClient bool `json:"from_client"`
+ Content string `json:"content"`
+ Metadata parsers.Metadata `json:"metadata"`
+ IsMetadataContinuation bool `json:"is_metadata_continuation"`
+ Index int `json:"index"`
+ Timestamp time.Time `json:"timestamp"`
+ IsRetransmitted bool `json:"is_retransmitted"`
+ RegexMatches []RegexSlice `json:"regex_matches"`
}
type RegexSlice struct {
@@ -56,8 +59,8 @@ func NewConnectionStreamsController(storage Storage) ConnectionStreamsController
}
func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID,
- format QueryFormat) []Payload {
- payloads := make([]Payload, 0, InitialPayloadsSize)
+ format QueryFormat) []*Payload {
+ payloads := make([]*Payload, 0, InitialPayloadsSize)
var clientIndex, serverIndex, globalIndex uint64
if format.Limit <= 0 {
@@ -76,7 +79,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
return serverBlocksIndex < len(serverStream.BlocksIndexes)
}
- var payload Payload
+ var payload *Payload
+ payloadsBuffer := make([]*Payload, 0, 16)
+ contentChunkBuffer := new(bytes.Buffer)
+ var lastContentSlice []byte
+ var sideChanged, lastClient, lastServer bool
for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() {
if hasClientBlocks() && (!hasServerBlocks() || // next payload is from client
clientStream.BlocksTimestamps[clientBlocksIndex].UnixNano() <=
@@ -86,14 +93,13 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
if clientBlocksIndex < len(clientStream.BlocksIndexes)-1 {
end = clientStream.BlocksIndexes[clientBlocksIndex+1]
} else {
- end = len(clientStream.Payload) - 1
+ end = len(clientStream.Payload)
}
size := uint64(end - start)
- payload = Payload{
+ payload = &Payload{
FromClient: true,
Content: DecodeBytes(clientStream.Payload[start:end], format.Format),
- //Request: ReadRequest(content),
Index: start,
Timestamp: clientStream.BlocksTimestamps[clientBlocksIndex],
IsRetransmitted: clientStream.BlocksLoss[clientBlocksIndex],
@@ -102,25 +108,22 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
clientIndex += size
globalIndex += size
clientBlocksIndex++
+
+ lastContentSlice = clientStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastServer, true, false
} else { // next payload is from server
start := serverStream.BlocksIndexes[serverBlocksIndex]
end := 0
if serverBlocksIndex < len(serverStream.BlocksIndexes)-1 {
end = serverStream.BlocksIndexes[serverBlocksIndex+1]
} else {
- end = len(serverStream.Payload) - 1
+ end = len(serverStream.Payload)
}
size := uint64(end - start)
- content := DecodeBytes(serverStream.Payload[start:end], format.Format)
-
- plainContent := DecodeBytes(serverStream.Payload[start:end], "default")
- decodedContent := DecodeBytes([]byte(DecodeHttpResponse(plainContent)), format.Format)
-
- payload = Payload{
+ payload = &Payload{
FromClient: false,
- Content: content,
- DecodedContent: decodedContent,
+ Content: DecodeBytes(serverStream.Payload[start:end], format.Format),
Index: start,
Timestamp: serverStream.BlocksTimestamps[serverBlocksIndex],
IsRetransmitted: serverStream.BlocksLoss[serverBlocksIndex],
@@ -129,13 +132,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
serverIndex += size
globalIndex += size
serverBlocksIndex++
- }
- if globalIndex > format.Skip {
- payloads = append(payloads, payload)
- }
- if globalIndex > format.Skip+format.Limit {
- return payloads
+ lastContentSlice = serverStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastClient, false, true
}
if !hasClientBlocks() {
@@ -148,6 +147,39 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
serverBlocksIndex = 0
serverStream = csc.getConnectionStream(c, connectionID, false, serverDocumentIndex)
}
+
+ updateMetadata := func() {
+ metadata := parsers.Parse(contentChunkBuffer.Bytes())
+ var isMetadataContinuation bool
+ for _, elem := range payloadsBuffer {
+ elem.Metadata = metadata
+ elem.IsMetadataContinuation = isMetadataContinuation
+ isMetadataContinuation = true
+ }
+
+ payloadsBuffer = payloadsBuffer[:0]
+ contentChunkBuffer.Reset()
+ }
+
+ if sideChanged {
+ updateMetadata()
+ }
+ payloadsBuffer = append(payloadsBuffer, payload)
+ contentChunkBuffer.Write(lastContentSlice)
+
+ if clientStream.ID.IsZero() && serverStream.ID.IsZero() {
+ updateMetadata()
+ }
+
+ if globalIndex > format.Skip {
+ // problem: waste of time if the payload is discarded
+ payloads = append(payloads, payload)
+ }
+ if globalIndex > format.Skip+format.Limit {
+ // problem: the last chunk is not parsed, but can be ok because it is not finished
+ updateMetadata()
+ return payloads
+ }
}
return payloads