aboutsummaryrefslogtreecommitdiff
path: root/connection_handler.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-09 16:49:42 +0000
committerEmiliano Ciavatta2020-04-09 16:49:42 +0000
commit0ecaeab8474317c4589d40a43026f11788d87868 (patch)
treee336ffbb3d7525d1bb74f5b3f3bed1a0a22d89f9 /connection_handler.go
parentf35049b2cc3a38475cf2a7967c92a81f2f296855 (diff)
Add connection_handler tests
Diffstat (limited to 'connection_handler.go')
-rw-r--r--connection_handler.go23
1 files changed, 13 insertions, 10 deletions
diff --git a/connection_handler.go b/connection_handler.go
index de30634..dc23315 100644
--- a/connection_handler.go
+++ b/connection_handler.go
@@ -18,7 +18,7 @@ type BiDirectionalStreamFactory struct {
serverIP gopacket.Endpoint
connections map[StreamFlow]ConnectionHandler
mConnections sync.Mutex
- rulesManager *RulesManager
+ rulesManager RulesManager
rulesDatabase RulesDatabase
mRulesDatabase sync.Mutex
scanners []Scanner
@@ -46,7 +46,7 @@ type connectionHandlerImpl struct {
}
func NewBiDirectionalStreamFactory(storage Storage, serverIP gopacket.Endpoint,
- rulesManager *RulesManager) *BiDirectionalStreamFactory {
+ rulesManager RulesManager) *BiDirectionalStreamFactory {
factory := &BiDirectionalStreamFactory{
storage: storage,
@@ -65,7 +65,7 @@ func NewBiDirectionalStreamFactory(storage Storage, serverIP gopacket.Endpoint,
func (factory *BiDirectionalStreamFactory) updateRulesDatabaseService() {
for {
select {
- case rulesDatabase, ok := <-factory.rulesManager.databaseUpdated:
+ case rulesDatabase, ok := <-factory.rulesManager.DatabaseUpdateChannel():
if !ok {
return
}
@@ -122,6 +122,7 @@ func (factory *BiDirectionalStreamFactory) releaseScanner(scanner Scanner) {
log.WithError(err).Error("failed to realloc an existing scanner")
return
}
+ scanner.version = factory.rulesDatabase.version
}
factory.scanners = append(factory.scanners, scanner)
}
@@ -213,13 +214,15 @@ func (ch *connectionHandlerImpl) Complete(handler *StreamHandler) {
}
streamsIDs := append(client.documentsIDs, server.documentsIDs...)
- n, err := ch.Storage().Update(ConnectionStreams).
- Filter(OrderedDocument{{"_id", UnorderedDocument{"$in": streamsIDs}}}).
- Many(UnorderedDocument{"connection_id": connectionID})
- if err != nil {
- log.WithError(err).WithField("connection", connection).Error("failed to update connection streams")
- } else if int(n) != len(streamsIDs) {
- log.WithError(err).WithField("connection", connection).Error("failed to update all connections streams")
+ if len(streamsIDs) > 0 {
+ n, err := ch.Storage().Update(ConnectionStreams).
+ Filter(OrderedDocument{{"_id", UnorderedDocument{"$in": streamsIDs}}}).
+ Many(UnorderedDocument{"connection_id": connectionID})
+ if err != nil {
+ log.WithError(err).WithField("connection", connection).Error("failed to update connection streams")
+ } else if int(n) != len(streamsIDs) {
+ log.WithError(err).WithField("connection", connection).Error("failed to update all connections streams")
+ }
}
}