aboutsummaryrefslogtreecommitdiff
path: root/connection_streams_controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'connection_streams_controller.go')
-rw-r--r--connection_streams_controller.go59
1 files changed, 39 insertions, 20 deletions
diff --git a/connection_streams_controller.go b/connection_streams_controller.go
index 096210e..3ba30f8 100644
--- a/connection_streams_controller.go
+++ b/connection_streams_controller.go
@@ -1,7 +1,9 @@
package main
import (
+ "bytes"
"context"
+ "github.com/eciavatta/caronte/parsers"
log "github.com/sirupsen/logrus"
"time"
)
@@ -25,13 +27,13 @@ type ConnectionStream struct {
type PatternSlice [2]uint64
type Payload struct {
- FromClient bool `json:"from_client"`
- Content string `json:"content"`
- DecodedContent string `json:"decoded_content"`
- Index int `json:"index"`
- Timestamp time.Time `json:"timestamp"`
- IsRetransmitted bool `json:"is_retransmitted"`
- RegexMatches []RegexSlice `json:"regex_matches"`
+ FromClient bool `json:"from_client"`
+ Content string `json:"content"`
+ Metadata parsers.Metadata `json:"metadata"`
+ Index int `json:"index"`
+ Timestamp time.Time `json:"timestamp"`
+ IsRetransmitted bool `json:"is_retransmitted"`
+ RegexMatches []RegexSlice `json:"regex_matches"`
}
type RegexSlice struct {
@@ -56,8 +58,8 @@ func NewConnectionStreamsController(storage Storage) ConnectionStreamsController
}
func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, connectionID RowID,
- format QueryFormat) []Payload {
- payloads := make([]Payload, 0, InitialPayloadsSize)
+ format QueryFormat) []*Payload {
+ payloads := make([]*Payload, 0, InitialPayloadsSize)
var clientIndex, serverIndex, globalIndex uint64
if format.Limit <= 0 {
@@ -76,7 +78,11 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
return serverBlocksIndex < len(serverStream.BlocksIndexes)
}
- var payload Payload
+ var payload *Payload
+ payloadsBuffer := make([]*Payload, 0, 16)
+ contentChunkBuffer := new(bytes.Buffer)
+ var lastContentSlice []byte
+ var sideChanged, lastClient, lastServer bool
for !clientStream.ID.IsZero() || !serverStream.ID.IsZero() {
if hasClientBlocks() && (!hasServerBlocks() || // next payload is from client
clientStream.BlocksTimestamps[clientBlocksIndex].UnixNano() <=
@@ -90,10 +96,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
}
size := uint64(end - start)
- payload = Payload{
+ payload = &Payload{
FromClient: true,
Content: DecodeBytes(clientStream.Payload[start:end], format.Format),
- //Request: ReadRequest(content),
Index: start,
Timestamp: clientStream.BlocksTimestamps[clientBlocksIndex],
IsRetransmitted: clientStream.BlocksLoss[clientBlocksIndex],
@@ -102,6 +107,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
clientIndex += size
globalIndex += size
clientBlocksIndex++
+
+ lastContentSlice = clientStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastServer, true, false
} else { // next payload is from server
start := serverStream.BlocksIndexes[serverBlocksIndex]
end := 0
@@ -112,15 +120,9 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
}
size := uint64(end - start)
- content := DecodeBytes(serverStream.Payload[start:end], format.Format)
-
- plainContent := DecodeBytes(serverStream.Payload[start:end], "default")
- decodedContent := DecodeBytes([]byte(DecodeHttpResponse(plainContent)), format.Format)
-
- payload = Payload{
+ payload = &Payload{
FromClient: false,
- Content: content,
- DecodedContent: decodedContent,
+ Content: DecodeBytes(serverStream.Payload[start:end], format.Format),
Index: start,
Timestamp: serverStream.BlocksTimestamps[serverBlocksIndex],
IsRetransmitted: serverStream.BlocksLoss[serverBlocksIndex],
@@ -129,12 +131,29 @@ func (csc ConnectionStreamsController) GetConnectionPayload(c context.Context, c
serverIndex += size
globalIndex += size
serverBlocksIndex++
+
+ lastContentSlice = serverStream.Payload[start:end]
+ sideChanged, lastClient, lastServer = lastClient, false, true
+ }
+
+ if sideChanged {
+ metadata := parsers.Parse(contentChunkBuffer.Bytes())
+ for _, elem := range payloadsBuffer {
+ elem.Metadata = metadata
+ }
+
+ payloadsBuffer = payloadsBuffer[:0]
+ contentChunkBuffer.Reset()
}
+ payloadsBuffer = append(payloadsBuffer, payload)
+ contentChunkBuffer.Write(lastContentSlice)
if globalIndex > format.Skip {
+ // problem: waste of time if the payload is discarded
payloads = append(payloads, payload)
}
if globalIndex > format.Skip+format.Limit {
+ // problem: the last chunk is not parsed, but can be ok because it is not finished
return payloads
}