From 0bbd12cd1f5223626ee1df4872e8337153093f2f Mon Sep 17 00:00:00 2001
From: Emiliano Ciavatta
Date: Fri, 4 Sep 2020 23:32:45 +0200
Subject: Add flushAll param to /api/pcaps/upload and /api/pcaps/file
---
application_router.go | 8 ++++++--
frontend/src/components/ConnectionContent.js | 10 ++--------
pcap_importer.go | 29 +++++++++++++++-------------
stream_handler.go | 4 +++-
4 files changed, 27 insertions(+), 24 deletions(-)
diff --git a/application_router.go b/application_router.go
index 9f497e8..852e68b 100644
--- a/application_router.go
+++ b/application_router.go
@@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path/filepath"
+ "strings"
"time"
)
@@ -112,12 +113,14 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
badRequest(c, err)
return
}
+ flushAllValue, isPresent := c.GetPostForm("flush_all")
+ flushAll := isPresent && strings.ToLower(flushAllValue) == "true"
fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), fileHeader.Filename)
if err := c.SaveUploadedFile(fileHeader, ProcessingPcapsBasePath + fileName); err != nil {
log.WithError(err).Panic("failed to save uploaded file")
}
- if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil {
+ if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName, flushAll); err != nil {
unprocessableEntity(c, err)
} else {
c.JSON(http.StatusAccepted, gin.H{"session": sessionID})
@@ -127,6 +130,7 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
api.POST("/pcap/file", func(c *gin.Context) {
var request struct {
File string `json:"file"`
+ FlushAll bool `json:"flush_all"`
DeleteOriginalFile bool `json:"delete_original_file"`
}
@@ -143,7 +147,7 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine
if err := CopyFile(ProcessingPcapsBasePath + fileName, request.File); err != nil {
log.WithError(err).Panic("failed to copy pcap file")
}
- if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil {
+ if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName, request.FlushAll); err != nil {
if request.DeleteOriginalFile {
if err := os.Remove(request.File); err != nil {
log.WithError(err).Panic("failed to remove processed file")
diff --git a/frontend/src/components/ConnectionContent.js b/frontend/src/components/ConnectionContent.js
index 40fdcad..905a56d 100644
--- a/frontend/src/components/ConnectionContent.js
+++ b/frontend/src/components/ConnectionContent.js
@@ -21,7 +21,8 @@ class ConnectionContent extends Component {
if (this.props.connection !== null && (
this.props.connection !== prevProps.connection || this.state.format !== prevState.format)) {
this.setState({loading: true});
- axios.get(`/api/streams/${this.props.connection.id}?format=${this.state.format}`).then(res => {
+ // TODO: limit workaround.
+ axios.get(`/api/streams/${this.props.connection.id}?format=${this.state.format}&limit=999999`).then(res => {
this.setState({
connectionContent: res.data,
loading: false
@@ -52,13 +53,6 @@ class ConnectionContent extends Component {
return (
- {/**/}
- {/* */}
- {/* ciao*/}
- {/*
*/}
- {/**/}
-
-
format
diff --git a/pcap_importer.go b/pcap_importer.go
index cd6fdfa..1739b3f 100644
--- a/pcap_importer.go
+++ b/pcap_importer.go
@@ -74,7 +74,7 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan
// going to be imported or if it has been already imported in the past the function returns an error. Otherwise it
// create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256
// of the pcap).
-func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
+func (pi *PcapImporter) ImportPcap(fileName string, flushAll bool) (string, error) {
switch filepath.Ext(fileName) {
case ".pcap":
case ".pcapng":
@@ -109,7 +109,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) {
pi.sessions[hash] = session
pi.mSessions.Unlock()
- go pi.parsePcap(session, fileName, ctx)
+ go pi.parsePcap(session, fileName, flushAll, ctx)
return hash, nil
}
@@ -141,8 +141,18 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool {
return isPresent
}
+func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (flushed, closed int) {
+ assembler := pi.takeAssembler()
+ flushed, closed = assembler.FlushWithOptions(tcpassembly.FlushOptions{
+ T: olderThen,
+ CloseAll: closeAll,
+ })
+ pi.releaseAssembler(assembler)
+ return
+}
+
// Read the pcap and save the tcp stream flow to the database
-func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) {
+func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) {
handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName)
if err != nil {
pi.progressUpdate(session, fileName, false, "failed to process pcap")
@@ -155,7 +165,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
packetSource.NoCopy = true
assembler := pi.takeAssembler()
packets := packetSource.Packets()
- firstPacketTime := time.Time{}
updateProgressInterval := time.Tick(importUpdateProgressInterval)
for {
@@ -171,9 +180,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
select {
case packet := <-packets:
if packet == nil { // completed
- if !firstPacketTime.IsZero() {
- // TODO:
- //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan))
+ if flushAll {
+ assembler.FlushAll()
}
handle.Close()
pi.releaseAssembler(assembler)
@@ -181,11 +189,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
return
}
- timestamp := packet.Metadata().Timestamp
- if firstPacketTime.IsZero() {
- firstPacketTime = timestamp
- }
-
session.ProcessedPackets++
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil ||
@@ -217,7 +220,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx
fCount[index]++
session.PacketsPerService[servicePort] = fCount
- assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp)
+ assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-updateProgressInterval:
pi.progressUpdate(session, fileName, false, "")
}
diff --git a/stream_handler.go b/stream_handler.go
index 4d33b01..bccdeee 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -63,12 +63,14 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) {
for _, r := range reassembly {
skip := r.Skip
if r.Start {
- skip = 0
sh.firstPacketSeen = r.Seen
}
if r.End {
sh.lastPacketSeen = r.Seen
}
+ if skip < 0 { // start or flush ~ workaround
+ skip = 0
+ }
reassemblyLen := len(r.Bytes)
if reassemblyLen == 0 {
--
cgit v1.2.3-70-g09d2