aboutsummaryrefslogtreecommitdiff
path: root/connection_streams_controller.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-09-16 15:56:19 +0000
committerEmiliano Ciavatta2020-09-16 15:56:19 +0000
commita77f2f97f1df204c663119fe8ccafb6f274cb634 (patch)
tree294d839017dbf67d85a2501e9a0570e87602e30a /connection_streams_controller.go
parent991d3b6c91d9fe3046ec94a3716a7dd21f496feb (diff)
parentdfd6d543074b4a30c2fc990063ca69ebf8a734e1 (diff)
Merge branch 'feature/decode-gzip' into develop
Diffstat (limited to 'connection_streams_controller.go')
-rw-r--r--connection_streams_controller.go78
1 files changed, 59 insertions, 19 deletions
diff --git a/connection_streams_controller.go b/connection_streams_controller.go
index 251e842..9d73b0e 100644
--- a/connection_streams_controller.go
+++ b/connection_streams_controller.go
@@ -1,7 +1,9 @@
package main
import (
+ "bytes"
"context"
+ "github.com/eciavatta/caronte/parsers"
log "github.com/sirupsen/logrus"
"time"
)
@@ -25,12 +27,14 @@ type ConnectionStream struct {
type PatternSlice [2]uint64
type Payload struct {
- FromClient bool `json:"from_client"`
- Content string `json:"content"`
- Index int `json:"index"`
- Timestamp time.Time `json:"timestamp"`
- IsRetransmitted bool `json:"is_retransmitted"`
- RegexMatches []RegexSlice `json:"regex_matches"`
+ FromClient bool `json:"from_client"`
+ Content string `json:"content"`
+ Metadata parsers.Metadata `json:"metadata"`
+ IsMetadataContinuation bool `json:"is_metadata_continuation"`
+ Index int `json:"index"`
+ Timestamp time.Time `json:"timestamp"`
+ IsRetransmitted bool `json:"is_retransmitted"`
+ RegexMatches []RegexSlice `json:"regex_matches"`
}
type RegexSlice struct {
@@ -55,8 +59,8 @@ func NewConnectionStreamsController(storage Storage) ConnectionStreamsController
}
func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID,
- format QueryFormat) []Payload {
- payloads := make([]Payload, 0, InitialPayloadsSize)
+ format QueryFormat) []*Payload {
+ payloads := make([]*Payload, 0, InitialPayloadsSize)
var clientIndex, serverIndex, globalIndex uint64
if format.Limit <= 0 {
@@ -75,7 +79,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
return serverBlocksIndex < len(serverStream.BlocksIndexes)
}
- var payload Payload
+ var payload *Payload
+ payloadsBuffer := make([]*Payload, 0, 16)
+ contentChunkBuffer := new(bytes.Buffer)
+ var lastContentSlice []byte
+ var sideChanged, lastClient, lastServer bool
for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() {
if hasClientBlocks() && (!hasServerBlocks() || // next payload is from client
clientStream.BlocksTimestamps[clientBlocksIndex].UnixNano() <=
@@ -85,11 +93,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
if clientBlocksIndex < len(clientStream.BlocksIndexes)-1 {
end = clientStream.BlocksIndexes[clientBlocksIndex+1]
} else {
- end = len(clientStream.Payload) - 1
+ end = len(clientStream.Payload)
}
size := uint64(end - start)
- payload = Payload{
+ payload = &Payload{
FromClient: true,
Content: DecodeBytes(clientStream.Payload[start:end], format.Format),
Index: start,
@@ -100,17 +108,20 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
clientIndex += size
globalIndex += size
clientBlocksIndex++
+
+ lastContentSlice = clientStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastServer, true, false
} else { // next payload is from server
start := serverStream.BlocksIndexes[serverBlocksIndex]
end := 0
if serverBlocksIndex < len(serverStream.BlocksIndexes)-1 {
end = serverStream.BlocksIndexes[serverBlocksIndex+1]
} else {
- end = len(serverStream.Payload) - 1
+ end = len(serverStream.Payload)
}
size := uint64(end - start)
- payload = Payload{
+ payload = &Payload{
FromClient: false,
Content: DecodeBytes(serverStream.Payload[start:end], format.Format),
Index: start,
@@ -121,13 +132,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
serverIndex += size
globalIndex += size
serverBlocksIndex++
- }
- if globalIndex > format.Skip {
- payloads = append(payloads, payload)
- }
- if globalIndex > format.Skip+format.Limit {
- return payloads
+ lastContentSlice = serverStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastClient, false, true
}
if !hasClientBlocks() {
@@ -140,6 +147,39 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
serverBlocksIndex = 0
serverStream = csc.getConnectionStream(c, connectionID, false, serverDocumentIndex)
}
+
+ updateMetadata := func() {
+ metadata := parsers.Parse(contentChunkBuffer.Bytes())
+ var isMetadataContinuation bool
+ for _, elem := range payloadsBuffer {
+ elem.Metadata = metadata
+ elem.IsMetadataContinuation = isMetadataContinuation
+ isMetadataContinuation = true
+ }
+
+ payloadsBuffer = payloadsBuffer[:0]
+ contentChunkBuffer.Reset()
+ }
+
+ if sideChanged {
+ updateMetadata()
+ }
+ payloadsBuffer = append(payloadsBuffer, payload)
+ contentChunkBuffer.Write(lastContentSlice)
+
+ if clientStream.ID.IsZero() && serverStream.ID.IsZero() {
+ updateMetadata()
+ }
+
+ if globalIndex > format.Skip {
+ // problem: waste of time if the payload is discarded
+ payloads = append(payloads, payload)
+ }
+ if globalIndex > format.Skip+format.Limit {
+ // problem: the last chunk is not parsed, but can be ok because it is not finished
+ updateMetadata()
+ return payloads
+ }
}
return payloads