From a30815021e61023f996b1450ddcd9164a6e18bef Mon Sep 17 00:00:00 2001
From: Emiliano Ciavatta
Date: Thu, 8 Oct 2020 17:07:07 +0200
Subject: Add header license to all files
---
stream_handler.go | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
(limited to 'stream_handler.go')
diff --git a/stream_handler.go b/stream_handler.go
index bccdeee..48dba34 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -1,3 +1,20 @@
+/*
+ * This file is part of caronte (https://github.com/eciavatta/caronte).
+ * Copyright (c) 2020 Emiliano Ciavatta.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
package main
import (
--
cgit v1.2.3-70-g09d2
From bc989facca1f3381afed2f7c982da7784fad2327 Mon Sep 17 00:00:00 2001
From: Emiliano Ciavatta
Date: Sun, 11 Oct 2020 21:49:40 +0200
Subject: [inconsistent] SearchOption validation checkpoint
---
application_context.go | 2 +
application_router.go | 70 +++++++++++++++
connection_streams_controller.go | 1 +
search_controller.go | 189 +++++++++++++++++++++++++++++++++++++++
statistics_controller.go | 8 +-
storage.go | 41 ++++++---
stream_handler.go | 2 +
7 files changed, 299 insertions(+), 14 deletions(-)
create mode 100644 search_controller.go
(limited to 'stream_handler.go')
diff --git a/application_context.go b/application_context.go
index 9897bb6..fc76d00 100644
--- a/application_context.go
+++ b/application_context.go
@@ -37,6 +37,7 @@ type ApplicationContext struct {
ConnectionsController ConnectionsController
ServicesController *ServicesController
ConnectionStreamsController ConnectionStreamsController
+ SearchController *SearchController
StatisticsController StatisticsController
IsConfigured bool
Version string
@@ -113,6 +114,7 @@ func (sm *ApplicationContext) configure() {
sm.ServicesController = NewServicesController(sm.Storage)
sm.ConnectionsController = NewConnectionsController(sm.Storage, sm.ServicesController)
sm.ConnectionStreamsController = NewConnectionStreamsController(sm.Storage)
+ sm.SearchController = NewSearchController(sm.Storage)
sm.StatisticsController = NewStatisticsController(sm.Storage)
sm.IsConfigured = true
}
diff --git a/application_router.go b/application_router.go
index 89b471b..dc9f9d4 100644
--- a/application_router.go
+++ b/application_router.go
@@ -22,6 +22,8 @@ import (
"fmt"
"github.com/gin-gonic/contrib/static"
"github.com/gin-gonic/gin"
+ "github.com/gin-gonic/gin/binding"
+ "github.com/go-playground/validator/v10"
log "github.com/sirupsen/logrus"
"net/http"
"os"
@@ -294,6 +296,74 @@ func CreateApplicationRouter(applicationContext *ApplicationContext,
}
})
+ api.GET("/searches", func(c *gin.Context) {
+ success(c, applicationContext.SearchController.PerformedSearches())
+ })
+
+ api.POST("/searches/perform", func(c *gin.Context) {
+ var options SearchOptions
+
+
+ parentIsZero := func (fl validator.FieldLevel) bool {
+ log.Println(fl.FieldName())
+ log.Println("noooooo")
+ return fl.Parent().IsZero()
+ }
+ eitherWith := func (fl validator.FieldLevel) bool {
+ otherField := fl.Parent().FieldByName(fl.Param())
+ log.Println(fl.Param())
+ log.Println("bbbbbbbbbb")
+ return (fl.Field().IsZero() && !otherField.IsZero()) || (!fl.Field().IsZero() && otherField.IsZero())
+ }
+ aaa := func (fl validator.FieldLevel) bool {
+
+ log.Println("awww")
+ return fl.Field().IsZero()
+ }
+
+ bbb := func (fl validator.FieldLevel) bool {
+
+ log.Println("iiiii")
+ return true
+ }
+
+ if validate, ok := binding.Validator.Engine().(*validator.Validate); ok {
+ if err := validate.RegisterValidation("parent_is_zero", parentIsZero); err != nil {
+ log.WithError(err).Panic("cannot register 'topzero' validator")
+ }
+ if err := validate.RegisterValidation("either_with", eitherWith); err != nil {
+ log.WithError(err).Panic("cannot register 'either_with' validator")
+ }
+ if err := validate.RegisterValidation("aaa", aaa); err != nil {
+ log.WithError(err).Panic("cannot register 'either_with' validator")
+ }
+ if err := validate.RegisterValidation("bbb", bbb); err != nil {
+ log.WithError(err).Panic("cannot register 'either_with' validator")
+ }
+ } else {
+ log.Panic("cannot ")
+ }
+
+
+ if err := c.ShouldBindJSON(&options); err != nil {
+ badRequest(c, err)
+ return
+ }
+
+ log.Println(options)
+
+
+ success(c, "ok")
+
+
+
+
+
+
+
+ //success(c, applicationContext.SearchController.PerformSearch(c, options))
+ })
+
api.GET("/streams/:id", func(c *gin.Context) {
id, err := RowIDFromHex(c.Param("id"))
if err != nil {
diff --git a/connection_streams_controller.go b/connection_streams_controller.go
index 89e484d..eef1a2a 100644
--- a/connection_streams_controller.go
+++ b/connection_streams_controller.go
@@ -39,6 +39,7 @@ type ConnectionStream struct {
FromClient bool `bson:"from_client"`
DocumentIndex int `bson:"document_index"`
Payload []byte `bson:"payload"`
+ PayloadString string `bson:"payload_string"`
BlocksIndexes []int `bson:"blocks_indexes"`
BlocksTimestamps []time.Time `bson:"blocks_timestamps"`
BlocksLoss []bool `bson:"blocks_loss"`
diff --git a/search_controller.go b/search_controller.go
new file mode 100644
index 0000000..ad47dbc
--- /dev/null
+++ b/search_controller.go
@@ -0,0 +1,189 @@
+/*
+ * This file is part of caronte (https://github.com/eciavatta/caronte).
+ * Copyright (c) 2020 Emiliano Ciavatta.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package main
+
+import (
+ "context"
+ log "github.com/sirupsen/logrus"
+ "strings"
+ "sync"
+ "time"
+)
+
+const (
+ secondsToNano = 1000 * 1000 * 1000
+ maxSearchTimeout = 60 * secondsToNano
+)
+
+type PerformedSearch struct {
+ ID RowID `bson:"_id" json:"id"`
+ SearchOptions SearchOptions `bson:"search_options" json:"search_options"`
+ AffectedConnections []RowID `bson:"affected_connections" json:"affected_connections,omitempty"`
+ AffectedConnectionsCount int `bson:"affected_connections_count" json:"affected_connections_count"`
+ StartedAt time.Time `bson:"started_at" json:"started_at"`
+ FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
+ UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
+ Timeout time.Duration `bson:"timeout" json:"timeout"`
+}
+
+type SearchOptions struct {
+ TextSearch TextSearch `bson:"text_search" json:"text_search" validate:"either_with=RegexSearch"`
+ RegexSearch RegexSearch `bson:"regex_search" json:"regex_search" validate:"either_with=TextSearch"`
+ Timeout time.Duration `bson:"timeout" json:"timeout" binding:"max=60"`
+}
+
+type TextSearch struct {
+ Terms []string `bson:"terms" json:"terms" binding:"parent_is_zero|either_with=ExactPhrase,isdefault|min=1,dive,min=3"`
+ ExcludedTerms []string `bson:"excluded_terms" json:"excluded_terms" binding:"required_with=Terms,dive,isdefault|min=1"`
+ ExactPhrase string `bson:"exact_phrase" json:"exact_phrase" binding:"isdefault|min=3,parent_is_zero|either_with=Terms"`
+ CaseSensitive bool `bson:"case_sensitive" json:"case_sensitive"`
+}
+
+type RegexSearch struct {
+ Pattern string `bson:"pattern" json:"pattern" binding:"parent_is_zero|either_with=NotPattern,isdefault|min=3"`
+ NotPattern string `bson:"not_pattern" json:"not_pattern" binding:"parent_is_zero|either_with=Pattern,isdefault|min=3"`
+ CaseInsensitive bool `bson:"case_insensitive" json:"case_insensitive"`
+ MultiLine bool `bson:"multi_line" json:"multi_line"`
+ IgnoreWhitespaces bool `bson:"ignore_whitespaces" json:"ignore_whitespaces"`
+ DotCharacter bool `bson:"dot_character" json:"dot_character"`
+}
+
+type SearchController struct {
+ storage Storage
+ performedSearches []PerformedSearch
+ mutex sync.Mutex
+}
+
+func NewSearchController(storage Storage) *SearchController {
+ var searches []PerformedSearch
+ if err := storage.Find(Searches).All(&searches); err != nil {
+ // log.WithError(err).Panic("failed to retrieve performed searches")
+ }
+
+ return &SearchController{
+ storage: storage,
+ performedSearches: searches,
+ }
+}
+
+func (sc *SearchController) PerformedSearches() []PerformedSearch {
+ sc.mutex.Lock()
+ defer sc.mutex.Unlock()
+
+ return sc.performedSearches
+}
+
+func (sc *SearchController) PerformSearch(c context.Context, options SearchOptions) PerformedSearch {
+ findQuery := sc.storage.Find(ConnectionStreams).Projection(OrderedDocument{{"connection_id", 1}}).Context(c)
+ timeout := options.Timeout * secondsToNano
+ if timeout <= 0 || timeout > maxSearchTimeout {
+ timeout = maxSearchTimeout
+ }
+ findQuery = findQuery.MaxTime(timeout)
+
+ if !options.TextSearch.isZero() {
+ var text string
+ if options.TextSearch.ExactPhrase != "" {
+ text = "\"" + options.TextSearch.ExactPhrase + "\""
+ } else {
+ text = strings.Join(options.TextSearch.Terms, " ")
+ if options.TextSearch.ExcludedTerms != nil {
+ text += " -" + strings.Join(options.TextSearch.ExcludedTerms, " -")
+ }
+ }
+
+ findQuery = findQuery.Filter(OrderedDocument{{"$text", UnorderedDocument{
+ "$search": text,
+ "$language": "none",
+ "$caseSensitive": options.TextSearch.CaseSensitive,
+ "$diacriticSensitive": false,
+ }}})
+ } else {
+ var regexOptions string
+ if options.RegexSearch.CaseInsensitive {
+ regexOptions += "i"
+ }
+ if options.RegexSearch.MultiLine {
+ regexOptions += "m"
+ }
+ if options.RegexSearch.IgnoreWhitespaces {
+ regexOptions += "x"
+ }
+ if options.RegexSearch.DotCharacter {
+ regexOptions += "s"
+ }
+
+ var regex UnorderedDocument
+ if options.RegexSearch.Pattern != "" {
+ regex = UnorderedDocument{"$regex": options.RegexSearch.Pattern, "$options": regexOptions}
+ } else {
+ regex = UnorderedDocument{"$not":
+ UnorderedDocument{"$regex": options.RegexSearch.NotPattern, "$options": regexOptions}}
+ }
+
+ findQuery = findQuery.Filter(OrderedDocument{{"payload_string", regex}})
+ }
+
+ var connections []ConnectionStream
+ startedAt := time.Now()
+ if err := findQuery.All(&connections); err != nil {
+ log.WithError(err).Error("oh no")
+ }
+ affectedConnections := uniqueConnectionIds(connections)
+
+ finishedAt := time.Now()
+ performedSearch := PerformedSearch{
+ ID: NewRowID(),
+ SearchOptions: options,
+ AffectedConnections: affectedConnections,
+ AffectedConnectionsCount: len(affectedConnections),
+ StartedAt: startedAt,
+ FinishedAt: finishedAt,
+ UpdatedAt: finishedAt,
+ Timeout: options.Timeout,
+ }
+ if _, err := sc.storage.Insert(Searches).Context(c).One(performedSearch); err != nil {
+ log.WithError(err).Panic("failed to insert a new performed search")
+ }
+
+ sc.mutex.Lock()
+ sc.performedSearches = append([]PerformedSearch{performedSearch}, sc.performedSearches...)
+ sc.mutex.Unlock()
+
+ return performedSearch
+}
+
+func (sc TextSearch) isZero() bool {
+ return sc.Terms == nil && sc.ExcludedTerms == nil && sc.ExactPhrase == ""
+}
+
+func (sc RegexSearch) isZero() bool {
+ return RegexSearch{} == sc
+}
+
+func uniqueConnectionIds(connections []ConnectionStream) []RowID {
+ keys := make(map[RowID]bool)
+ var out []RowID
+ for _, entry := range connections {
+ if _, value := keys[entry.ConnectionID]; !value {
+ keys[entry.ConnectionID] = true
+ out = append(out, entry.ConnectionID)
+ }
+ }
+ return out
+}
diff --git a/statistics_controller.go b/statistics_controller.go
index 006b230..1714c0b 100644
--- a/statistics_controller.go
+++ b/statistics_controller.go
@@ -26,10 +26,10 @@ import (
type StatisticRecord struct {
RangeStart time.Time `json:"range_start" bson:"_id"`
- ConnectionsPerService map[uint16]int `json:"connections_per_service" bson:"connections_per_service"`
- ClientBytesPerService map[uint16]int `json:"client_bytes_per_service" bson:"client_bytes_per_service"`
- ServerBytesPerService map[uint16]int `json:"server_bytes_per_service" bson:"server_bytes_per_service"`
- DurationPerService map[uint16]int64 `json:"duration_per_service" bson:"duration_per_service"`
+ ConnectionsPerService map[uint16]int `json:"connections_per_service,omitempty" bson:"connections_per_service"`
+ ClientBytesPerService map[uint16]int `json:"client_bytes_per_service,omitempty" bson:"client_bytes_per_service"`
+ ServerBytesPerService map[uint16]int `json:"server_bytes_per_service,omitempty" bson:"server_bytes_per_service"`
+ DurationPerService map[uint16]int64 `json:"duration_per_service,omitempty" bson:"duration_per_service"`
}
type StatisticsFilter struct {
diff --git a/storage.go b/storage.go
index f8b7f9c..304e88c 100644
--- a/storage.go
+++ b/storage.go
@@ -25,16 +25,20 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
+ "time"
)
// Collections names
-const Connections = "connections"
-const ConnectionStreams = "connection_streams"
-const ImportingSessions = "importing_sessions"
-const Rules = "rules"
-const Settings = "settings"
-const Services = "services"
-const Statistics = "statistics"
+const (
+ Connections = "connections"
+ ConnectionStreams = "connection_streams"
+ ImportingSessions = "importing_sessions"
+ Rules = "rules"
+ Searches = "searches"
+ Settings = "settings"
+ Services = "services"
+ Statistics = "statistics"
+)
var ZeroRowID [12]byte
@@ -73,6 +77,7 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro
ConnectionStreams: db.Collection(ConnectionStreams),
ImportingSessions: db.Collection(ImportingSessions),
Rules: db.Collection(Rules),
+ Searches: db.Collection(Services),
Settings: db.Collection(Settings),
Services: db.Collection(Services),
Statistics: db.Collection(Statistics),
@@ -85,9 +90,13 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro
return nil, err
}
- if _, err := collections[ConnectionStreams].Indexes().CreateOne(ctx, mongo.IndexModel{
- Keys: bson.D{{"connection_id", -1}}, // descending
- Options: options.Index(),
+ if _, err := collections[ConnectionStreams].Indexes().CreateMany(ctx, []mongo.IndexModel{
+ {
+ Keys: bson.D{{"connection_id", -1}}, // descending
+ },
+ {
+ Keys: bson.D{{"payload_string", "text"}},
+ },
}); err != nil {
return nil, err
}
@@ -277,6 +286,8 @@ type FindOperation interface {
Projection(filter OrderedDocument) FindOperation
Sort(field string, ascending bool) FindOperation
Limit(n int64) FindOperation
+ Skip(n int64) FindOperation
+ MaxTime(duration time.Duration) FindOperation
First(result interface{}) error
All(results interface{}) error
}
@@ -318,6 +329,16 @@ func (fo MongoFindOperation) Limit(n int64) FindOperation {
return fo
}
+func (fo MongoFindOperation) Skip(n int64) FindOperation {
+ fo.optFind.SetSkip(n)
+ return fo
+}
+
+func (fo MongoFindOperation) MaxTime(duration time.Duration) FindOperation {
+ fo.optFind.SetMaxTime(duration)
+ return fo
+}
+
func (fo MongoFindOperation) Sort(field string, ascending bool) FindOperation {
var sort int
if ascending {
diff --git a/stream_handler.go b/stream_handler.go
index 48dba34..f08bd70 100644
--- a/stream_handler.go
+++ b/stream_handler.go
@@ -22,6 +22,7 @@ import (
"github.com/flier/gohs/hyperscan"
"github.com/google/gopacket/tcpassembly"
log "github.com/sirupsen/logrus"
+ "strings"
"time"
)
@@ -176,6 +177,7 @@ func (sh *StreamHandler) storageCurrentDocument() {
ConnectionID: ZeroRowID,
DocumentIndex: len(sh.documentsIDs),
Payload: sh.buffer.Bytes(),
+ PayloadString: strings.ToValidUTF8(string(sh.buffer.Bytes()), ""),
BlocksIndexes: sh.indexes,
BlocksTimestamps: sh.timestamps,
BlocksLoss: sh.lossBlocks,
--
cgit v1.2.3-70-g09d2