diff options
author | Emiliano Ciavatta | 2020-04-09 16:49:42 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-09 16:49:42 +0000 |
commit | 0ecaeab8474317c4589d40a43026f11788d87868 (patch) | |
tree | e336ffbb3d7525d1bb74f5b3f3bed1a0a22d89f9 /connection_handler.go | |
parent | f35049b2cc3a38475cf2a7967c92a81f2f296855 (diff) |
Add connection_handler tests
Diffstat (limited to 'connection_handler.go')
-rw-r--r-- | connection_handler.go | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/connection_handler.go b/connection_handler.go index de30634..dc23315 100644 --- a/connection_handler.go +++ b/connection_handler.go @@ -18,7 +18,7 @@ type BiDirectionalStreamFactory struct { serverIP gopacket.Endpoint connections map[StreamFlow]ConnectionHandler mConnections sync.Mutex - rulesManager *RulesManager + rulesManager RulesManager rulesDatabase RulesDatabase mRulesDatabase sync.Mutex scanners []Scanner @@ -46,7 +46,7 @@ type connectionHandlerImpl struct { } func NewBiDirectionalStreamFactory(storage Storage, serverIP gopacket.Endpoint, - rulesManager *RulesManager) *BiDirectionalStreamFactory { + rulesManager RulesManager) *BiDirectionalStreamFactory { factory := &BiDirectionalStreamFactory{ storage: storage, @@ -65,7 +65,7 @@ func NewBiDirectionalStreamFactory(storage Storage, serverIP gopacket.Endpoint, func (factory *BiDirectionalStreamFactory) updateRulesDatabaseService() { for { select { - case rulesDatabase, ok := <-factory.rulesManager.databaseUpdated: + case rulesDatabase, ok := <-factory.rulesManager.DatabaseUpdateChannel(): if !ok { return } @@ -122,6 +122,7 @@ func (factory *BiDirectionalStreamFactory) releaseScanner(scanner Scanner) { log.WithError(err).Error("failed to realloc an existing scanner") return } + scanner.version = factory.rulesDatabase.version } factory.scanners = append(factory.scanners, scanner) } @@ -213,13 +214,15 @@ func (ch *connectionHandlerImpl) Complete(handler *StreamHandler) { } streamsIDs := append(client.documentsIDs, server.documentsIDs...) - n, err := ch.Storage().Update(ConnectionStreams). - Filter(OrderedDocument{{"_id", UnorderedDocument{"$in": streamsIDs}}}). - Many(UnorderedDocument{"connection_id": connectionID}) - if err != nil { - log.WithError(err).WithField("connection", connection).Error("failed to update connection streams") - } else if int(n) != len(streamsIDs) { - log.WithError(err).WithField("connection", connection).Error("failed to update all connections streams") + if len(streamsIDs) > 0 { + n, err := ch.Storage().Update(ConnectionStreams). + Filter(OrderedDocument{{"_id", UnorderedDocument{"$in": streamsIDs}}}). + Many(UnorderedDocument{"connection_id": connectionID}) + if err != nil { + log.WithError(err).WithField("connection", connection).Error("failed to update connection streams") + } else if int(n) != len(streamsIDs) { + log.WithError(err).WithField("connection", connection).Error("failed to update all connections streams") + } } } |