From a30815021e61023f996b1450ddcd9164a6e18bef Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Thu, 8 Oct 2020 17:07:07 +0200 Subject: Add header license to all files --- stream_handler.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'stream_handler.go') diff --git a/stream_handler.go b/stream_handler.go index bccdeee..48dba34 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -1,3 +1,20 @@ +/* + * This file is part of caronte (https://github.com/eciavatta/caronte). + * Copyright (c) 2020 Emiliano Ciavatta. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + package main import ( -- cgit v1.2.3-70-g09d2 From bc989facca1f3381afed2f7c982da7784fad2327 Mon Sep 17 00:00:00 2001 From: Emiliano Ciavatta Date: Sun, 11 Oct 2020 21:49:40 +0200 Subject: [inconsistent] SearchOption validation checkpoint --- application_context.go | 2 + application_router.go | 70 +++++++++++++++ connection_streams_controller.go | 1 + search_controller.go | 189 +++++++++++++++++++++++++++++++++++++++ statistics_controller.go | 8 +- storage.go | 41 ++++++--- stream_handler.go | 2 + 7 files changed, 299 insertions(+), 14 deletions(-) create mode 100644 search_controller.go (limited to 'stream_handler.go') diff --git a/application_context.go b/application_context.go index 9897bb6..fc76d00 100644 --- a/application_context.go +++ b/application_context.go @@ -37,6 +37,7 @@ type ApplicationContext struct { ConnectionsController ConnectionsController ServicesController *ServicesController ConnectionStreamsController ConnectionStreamsController + SearchController *SearchController StatisticsController StatisticsController IsConfigured bool Version string @@ -113,6 +114,7 @@ func (sm *ApplicationContext) configure() { sm.ServicesController = NewServicesController(sm.Storage) sm.ConnectionsController = NewConnectionsController(sm.Storage, sm.ServicesController) sm.ConnectionStreamsController = NewConnectionStreamsController(sm.Storage) + sm.SearchController = NewSearchController(sm.Storage) sm.StatisticsController = NewStatisticsController(sm.Storage) sm.IsConfigured = true } diff --git a/application_router.go b/application_router.go index 89b471b..dc9f9d4 100644 --- a/application_router.go +++ b/application_router.go @@ -22,6 +22,8 @@ import ( "fmt" "github.com/gin-gonic/contrib/static" "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/go-playground/validator/v10" log "github.com/sirupsen/logrus" "net/http" "os" @@ -294,6 +296,74 @@ func CreateApplicationRouter(applicationContext *ApplicationContext, } }) + api.GET("/searches", func(c *gin.Context) { + success(c, applicationContext.SearchController.PerformedSearches()) + }) + + api.POST("/searches/perform", func(c *gin.Context) { + var options SearchOptions + + + parentIsZero := func (fl validator.FieldLevel) bool { + log.Println(fl.FieldName()) + log.Println("noooooo") + return fl.Parent().IsZero() + } + eitherWith := func (fl validator.FieldLevel) bool { + otherField := fl.Parent().FieldByName(fl.Param()) + log.Println(fl.Param()) + log.Println("bbbbbbbbbb") + return (fl.Field().IsZero() && !otherField.IsZero()) || (!fl.Field().IsZero() && otherField.IsZero()) + } + aaa := func (fl validator.FieldLevel) bool { + + log.Println("awww") + return fl.Field().IsZero() + } + + bbb := func (fl validator.FieldLevel) bool { + + log.Println("iiiii") + return true + } + + if validate, ok := binding.Validator.Engine().(*validator.Validate); ok { + if err := validate.RegisterValidation("parent_is_zero", parentIsZero); err != nil { + log.WithError(err).Panic("cannot register 'topzero' validator") + } + if err := validate.RegisterValidation("either_with", eitherWith); err != nil { + log.WithError(err).Panic("cannot register 'either_with' validator") + } + if err := validate.RegisterValidation("aaa", aaa); err != nil { + log.WithError(err).Panic("cannot register 'either_with' validator") + } + if err := validate.RegisterValidation("bbb", bbb); err != nil { + log.WithError(err).Panic("cannot register 'either_with' validator") + } + } else { + log.Panic("cannot ") + } + + + if err := c.ShouldBindJSON(&options); err != nil { + badRequest(c, err) + return + } + + log.Println(options) + + + success(c, "ok") + + + + + + + + //success(c, applicationContext.SearchController.PerformSearch(c, options)) + }) + api.GET("/streams/:id", func(c *gin.Context) { id, err := RowIDFromHex(c.Param("id")) if err != nil { diff --git a/connection_streams_controller.go b/connection_streams_controller.go index 89e484d..eef1a2a 100644 --- a/connection_streams_controller.go +++ b/connection_streams_controller.go @@ -39,6 +39,7 @@ type ConnectionStream struct { FromClient bool `bson:"from_client"` DocumentIndex int `bson:"document_index"` Payload []byte `bson:"payload"` + PayloadString string `bson:"payload_string"` BlocksIndexes []int `bson:"blocks_indexes"` BlocksTimestamps []time.Time `bson:"blocks_timestamps"` BlocksLoss []bool `bson:"blocks_loss"` diff --git a/search_controller.go b/search_controller.go new file mode 100644 index 0000000..ad47dbc --- /dev/null +++ b/search_controller.go @@ -0,0 +1,189 @@ +/* + * This file is part of caronte (https://github.com/eciavatta/caronte). + * Copyright (c) 2020 Emiliano Ciavatta. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package main + +import ( + "context" + log "github.com/sirupsen/logrus" + "strings" + "sync" + "time" +) + +const ( + secondsToNano = 1000 * 1000 * 1000 + maxSearchTimeout = 60 * secondsToNano +) + +type PerformedSearch struct { + ID RowID `bson:"_id" json:"id"` + SearchOptions SearchOptions `bson:"search_options" json:"search_options"` + AffectedConnections []RowID `bson:"affected_connections" json:"affected_connections,omitempty"` + AffectedConnectionsCount int `bson:"affected_connections_count" json:"affected_connections_count"` + StartedAt time.Time `bson:"started_at" json:"started_at"` + FinishedAt time.Time `bson:"finished_at" json:"finished_at"` + UpdatedAt time.Time `bson:"updated_at" json:"updated_at"` + Timeout time.Duration `bson:"timeout" json:"timeout"` +} + +type SearchOptions struct { + TextSearch TextSearch `bson:"text_search" json:"text_search" validate:"either_with=RegexSearch"` + RegexSearch RegexSearch `bson:"regex_search" json:"regex_search" validate:"either_with=TextSearch"` + Timeout time.Duration `bson:"timeout" json:"timeout" binding:"max=60"` +} + +type TextSearch struct { + Terms []string `bson:"terms" json:"terms" binding:"parent_is_zero|either_with=ExactPhrase,isdefault|min=1,dive,min=3"` + ExcludedTerms []string `bson:"excluded_terms" json:"excluded_terms" binding:"required_with=Terms,dive,isdefault|min=1"` + ExactPhrase string `bson:"exact_phrase" json:"exact_phrase" binding:"isdefault|min=3,parent_is_zero|either_with=Terms"` + CaseSensitive bool `bson:"case_sensitive" json:"case_sensitive"` +} + +type RegexSearch struct { + Pattern string `bson:"pattern" json:"pattern" binding:"parent_is_zero|either_with=NotPattern,isdefault|min=3"` + NotPattern string `bson:"not_pattern" json:"not_pattern" binding:"parent_is_zero|either_with=Pattern,isdefault|min=3"` + CaseInsensitive bool `bson:"case_insensitive" json:"case_insensitive"` + MultiLine bool `bson:"multi_line" json:"multi_line"` + IgnoreWhitespaces bool `bson:"ignore_whitespaces" json:"ignore_whitespaces"` + DotCharacter bool `bson:"dot_character" json:"dot_character"` +} + +type SearchController struct { + storage Storage + performedSearches []PerformedSearch + mutex sync.Mutex +} + +func NewSearchController(storage Storage) *SearchController { + var searches []PerformedSearch + if err := storage.Find(Searches).All(&searches); err != nil { + // log.WithError(err).Panic("failed to retrieve performed searches") + } + + return &SearchController{ + storage: storage, + performedSearches: searches, + } +} + +func (sc *SearchController) PerformedSearches() []PerformedSearch { + sc.mutex.Lock() + defer sc.mutex.Unlock() + + return sc.performedSearches +} + +func (sc *SearchController) PerformSearch(c context.Context, options SearchOptions) PerformedSearch { + findQuery := sc.storage.Find(ConnectionStreams).Projection(OrderedDocument{{"connection_id", 1}}).Context(c) + timeout := options.Timeout * secondsToNano + if timeout <= 0 || timeout > maxSearchTimeout { + timeout = maxSearchTimeout + } + findQuery = findQuery.MaxTime(timeout) + + if !options.TextSearch.isZero() { + var text string + if options.TextSearch.ExactPhrase != "" { + text = "\"" + options.TextSearch.ExactPhrase + "\"" + } else { + text = strings.Join(options.TextSearch.Terms, " ") + if options.TextSearch.ExcludedTerms != nil { + text += " -" + strings.Join(options.TextSearch.ExcludedTerms, " -") + } + } + + findQuery = findQuery.Filter(OrderedDocument{{"$text", UnorderedDocument{ + "$search": text, + "$language": "none", + "$caseSensitive": options.TextSearch.CaseSensitive, + "$diacriticSensitive": false, + }}}) + } else { + var regexOptions string + if options.RegexSearch.CaseInsensitive { + regexOptions += "i" + } + if options.RegexSearch.MultiLine { + regexOptions += "m" + } + if options.RegexSearch.IgnoreWhitespaces { + regexOptions += "x" + } + if options.RegexSearch.DotCharacter { + regexOptions += "s" + } + + var regex UnorderedDocument + if options.RegexSearch.Pattern != "" { + regex = UnorderedDocument{"$regex": options.RegexSearch.Pattern, "$options": regexOptions} + } else { + regex = UnorderedDocument{"$not": + UnorderedDocument{"$regex": options.RegexSearch.NotPattern, "$options": regexOptions}} + } + + findQuery = findQuery.Filter(OrderedDocument{{"payload_string", regex}}) + } + + var connections []ConnectionStream + startedAt := time.Now() + if err := findQuery.All(&connections); err != nil { + log.WithError(err).Error("oh no") + } + affectedConnections := uniqueConnectionIds(connections) + + finishedAt := time.Now() + performedSearch := PerformedSearch{ + ID: NewRowID(), + SearchOptions: options, + AffectedConnections: affectedConnections, + AffectedConnectionsCount: len(affectedConnections), + StartedAt: startedAt, + FinishedAt: finishedAt, + UpdatedAt: finishedAt, + Timeout: options.Timeout, + } + if _, err := sc.storage.Insert(Searches).Context(c).One(performedSearch); err != nil { + log.WithError(err).Panic("failed to insert a new performed search") + } + + sc.mutex.Lock() + sc.performedSearches = append([]PerformedSearch{performedSearch}, sc.performedSearches...) + sc.mutex.Unlock() + + return performedSearch +} + +func (sc TextSearch) isZero() bool { + return sc.Terms == nil && sc.ExcludedTerms == nil && sc.ExactPhrase == "" +} + +func (sc RegexSearch) isZero() bool { + return RegexSearch{} == sc +} + +func uniqueConnectionIds(connections []ConnectionStream) []RowID { + keys := make(map[RowID]bool) + var out []RowID + for _, entry := range connections { + if _, value := keys[entry.ConnectionID]; !value { + keys[entry.ConnectionID] = true + out = append(out, entry.ConnectionID) + } + } + return out +} diff --git a/statistics_controller.go b/statistics_controller.go index 006b230..1714c0b 100644 --- a/statistics_controller.go +++ b/statistics_controller.go @@ -26,10 +26,10 @@ import ( type StatisticRecord struct { RangeStart time.Time `json:"range_start" bson:"_id"` - ConnectionsPerService map[uint16]int `json:"connections_per_service" bson:"connections_per_service"` - ClientBytesPerService map[uint16]int `json:"client_bytes_per_service" bson:"client_bytes_per_service"` - ServerBytesPerService map[uint16]int `json:"server_bytes_per_service" bson:"server_bytes_per_service"` - DurationPerService map[uint16]int64 `json:"duration_per_service" bson:"duration_per_service"` + ConnectionsPerService map[uint16]int `json:"connections_per_service,omitempty" bson:"connections_per_service"` + ClientBytesPerService map[uint16]int `json:"client_bytes_per_service,omitempty" bson:"client_bytes_per_service"` + ServerBytesPerService map[uint16]int `json:"server_bytes_per_service,omitempty" bson:"server_bytes_per_service"` + DurationPerService map[uint16]int64 `json:"duration_per_service,omitempty" bson:"duration_per_service"` } type StatisticsFilter struct { diff --git a/storage.go b/storage.go index f8b7f9c..304e88c 100644 --- a/storage.go +++ b/storage.go @@ -25,16 +25,20 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "time" ) // Collections names -const Connections = "connections" -const ConnectionStreams = "connection_streams" -const ImportingSessions = "importing_sessions" -const Rules = "rules" -const Settings = "settings" -const Services = "services" -const Statistics = "statistics" +const ( + Connections = "connections" + ConnectionStreams = "connection_streams" + ImportingSessions = "importing_sessions" + Rules = "rules" + Searches = "searches" + Settings = "settings" + Services = "services" + Statistics = "statistics" +) var ZeroRowID [12]byte @@ -73,6 +77,7 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro ConnectionStreams: db.Collection(ConnectionStreams), ImportingSessions: db.Collection(ImportingSessions), Rules: db.Collection(Rules), + Searches: db.Collection(Services), Settings: db.Collection(Settings), Services: db.Collection(Services), Statistics: db.Collection(Statistics), @@ -85,9 +90,13 @@ func NewMongoStorage(uri string, port int, database string) (*MongoStorage, erro return nil, err } - if _, err := collections[ConnectionStreams].Indexes().CreateOne(ctx, mongo.IndexModel{ - Keys: bson.D{{"connection_id", -1}}, // descending - Options: options.Index(), + if _, err := collections[ConnectionStreams].Indexes().CreateMany(ctx, []mongo.IndexModel{ + { + Keys: bson.D{{"connection_id", -1}}, // descending + }, + { + Keys: bson.D{{"payload_string", "text"}}, + }, }); err != nil { return nil, err } @@ -277,6 +286,8 @@ type FindOperation interface { Projection(filter OrderedDocument) FindOperation Sort(field string, ascending bool) FindOperation Limit(n int64) FindOperation + Skip(n int64) FindOperation + MaxTime(duration time.Duration) FindOperation First(result interface{}) error All(results interface{}) error } @@ -318,6 +329,16 @@ func (fo MongoFindOperation) Limit(n int64) FindOperation { return fo } +func (fo MongoFindOperation) Skip(n int64) FindOperation { + fo.optFind.SetSkip(n) + return fo +} + +func (fo MongoFindOperation) MaxTime(duration time.Duration) FindOperation { + fo.optFind.SetMaxTime(duration) + return fo +} + func (fo MongoFindOperation) Sort(field string, ascending bool) FindOperation { var sort int if ascending { diff --git a/stream_handler.go b/stream_handler.go index 48dba34..f08bd70 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -22,6 +22,7 @@ import ( "github.com/flier/gohs/hyperscan" "github.com/google/gopacket/tcpassembly" log "github.com/sirupsen/logrus" + "strings" "time" ) @@ -176,6 +177,7 @@ func (sh *StreamHandler) storageCurrentDocument() { ConnectionID: ZeroRowID, DocumentIndex: len(sh.documentsIDs), Payload: sh.buffer.Bytes(), + PayloadString: strings.ToValidUTF8(string(sh.buffer.Bytes()), ""), BlocksIndexes: sh.indexes, BlocksTimestamps: sh.timestamps, BlocksLoss: sh.lossBlocks, -- cgit v1.2.3-70-g09d2