From 0bbd12cd1f5223626ee1df4872e8337153093f2f Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Fri, 4 Sep 2020 23:32:45 +0200 Subject: Add flushAll param to /api/pcaps/upload and /api/pcaps/file --- application_router.go | 8 ++++++-- frontend/src/components/ConnectionContent.js | 10 ++-------- pcap_importer.go | 29 +++++++++++++++------------- stream_handler.go | 4 +++- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/application_router.go b/application_router.go index 9f497e8..852e68b 100644 --- a/application_router.go +++ b/application_router.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "strings" "time" ) @@ -112,12 +113,14 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine badRequest(c, err) return } + flushAllValue, isPresent := c.GetPostForm("flush_all") + flushAll := isPresent && strings.ToLower(flushAllValue) == "true" fileName := fmt.Sprintf("%v-%s", time.Now().UnixNano(), fileHeader.Filename) if err := c.SaveUploadedFile(fileHeader, ProcessingPcapsBasePath + fileName); err != nil { log.WithError(err).Panic("failed to save uploaded file") } - if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil { + if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName, flushAll); err != nil { unprocessableEntity(c, err) } else { c.JSON(http.StatusAccepted, gin.H{"session": sessionID}) @@ -127,6 +130,7 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine api.POST("/pcap/file", func(c *gin.Context) { var request struct { File string `json:"file"` + FlushAll bool `json:"flush_all"` DeleteOriginalFile bool `json:"delete_original_file"` } @@ -143,7 +147,7 @@ func CreateApplicationRouter(applicationContext *ApplicationContext) *gin.Engine if err := CopyFile(ProcessingPcapsBasePath + fileName, request.File); err != nil { log.WithError(err).Panic("failed to copy pcap file") } - if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName); err != nil { + if sessionID, err := applicationContext.PcapImporter.ImportPcap(fileName, request.FlushAll); err != nil { if request.DeleteOriginalFile { if err := os.Remove(request.File); err != nil { log.WithError(err).Panic("failed to remove processed file") diff --git a/frontend/src/components/ConnectionContent.js b/frontend/src/components/ConnectionContent.js index 40fdcad..905a56d 100644 --- a/frontend/src/components/ConnectionContent.js +++ b/frontend/src/components/ConnectionContent.js @@ -21,7 +21,8 @@ class ConnectionContent extends Component { if (this.props.connection !== null && ( this.props.connection !== prevProps.connection || this.state.format !== prevState.format)) { this.setState({loading: true}); - axios.get(`/api/streams/${this.props.connection.id}?format=${this.state.format}`).then(res => { + // TODO: limit workaround. + axios.get(`/api/streams/${this.props.connection.id}?format=${this.state.format}&limit=999999`).then(res => { this.setState({ connectionContent: res.data, loading: false @@ -52,13 +53,6 @@ class ConnectionContent extends Component { return (
- {/**/} - {/* */} - {/* ciao*/} - {/* */} - {/**/} - - format diff --git a/pcap_importer.go b/pcap_importer.go index cd6fdfa..1739b3f 100644 --- a/pcap_importer.go +++ b/pcap_importer.go @@ -74,7 +74,7 @@ func NewPcapImporter(storage Storage, serverNet net.IPNet, rulesManager RulesMan // going to be imported or if it has been already imported in the past the function returns an error. Otherwise it // create a new session and starts to import the pcap, and returns immediately the session name (that is the sha256 // of the pcap). -func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { +func (pi *PcapImporter) ImportPcap(fileName string, flushAll bool) (string, error) { switch filepath.Ext(fileName) { case ".pcap": case ".pcapng": @@ -109,7 +109,7 @@ func (pi *PcapImporter) ImportPcap(fileName string) (string, error) { pi.sessions[hash] = session pi.mSessions.Unlock() - go pi.parsePcap(session, fileName, ctx) + go pi.parsePcap(session, fileName, flushAll, ctx) return hash, nil } @@ -141,8 +141,18 @@ func (pi *PcapImporter) CancelSession(sessionID string) bool { return isPresent } +func (pi *PcapImporter) FlushConnections(olderThen time.Time, closeAll bool) (flushed, closed int) { + assembler := pi.takeAssembler() + flushed, closed = assembler.FlushWithOptions(tcpassembly.FlushOptions{ + T: olderThen, + CloseAll: closeAll, + }) + pi.releaseAssembler(assembler) + return +} + // Read the pcap and save the tcp stream flow to the database -func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx context.Context) { +func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, flushAll bool, ctx context.Context) { handle, err := pcap.OpenOffline(ProcessingPcapsBasePath + fileName) if err != nil { pi.progressUpdate(session, fileName, false, "failed to process pcap") @@ -155,7 +165,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx packetSource.NoCopy = true assembler := pi.takeAssembler() packets := packetSource.Packets() - firstPacketTime := time.Time{} updateProgressInterval := time.Tick(importUpdateProgressInterval) for { @@ -171,9 +180,8 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx select { case packet := <-packets: if packet == nil { // completed - if !firstPacketTime.IsZero() { - // TODO: - //assembler.FlushOlderThan(firstPacketTime.Add(-flushOlderThan)) + if flushAll { + assembler.FlushAll() } handle.Close() pi.releaseAssembler(assembler) @@ -181,11 +189,6 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx return } - timestamp := packet.Metadata().Timestamp - if firstPacketTime.IsZero() { - firstPacketTime = timestamp - } - session.ProcessedPackets++ if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || @@ -217,7 +220,7 @@ func (pi *PcapImporter) parsePcap(session ImportingSession, fileName string, ctx fCount[index]++ session.PacketsPerService[servicePort] = fCount - assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, timestamp) + assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp) case <-updateProgressInterval: pi.progressUpdate(session, fileName, false, "") } diff --git a/stream_handler.go b/stream_handler.go index 4d33b01..bccdeee 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -63,12 +63,14 @@ func (sh *StreamHandler) Reassembled(reassembly []tcpassembly.Reassembly) { for _, r := range reassembly { skip := r.Skip if r.Start { - skip = 0 sh.firstPacketSeen = r.Seen } if r.End { sh.lastPacketSeen = r.Seen } + if skip < 0 { // start or flush ~ workaround + skip = 0 + } reassemblyLen := len(r.Bytes) if reassemblyLen == 0 { -- cgit v1.2.3-70-g09d2