aboutsummaryrefslogtreecommitdiff
path: root/storage.go
diff options
context:
space:
mode:
authorEmiliano Ciavatta2020-04-07 18:46:36 +0000
committerEmiliano Ciavatta2020-04-07 18:46:56 +0000
commit468690c60ee2e57ed2ccb4375e9ada5d2fed9473 (patch)
tree138b9f4c69731abef66e789b0e044417824f5c49 /storage.go
parent590405d948530aecdf7399833c3d0b8585f5601b (diff)
Before storage refactor
Diffstat (limited to 'storage.go')
-rw-r--r--storage.go96
1 files changed, 94 insertions, 2 deletions
diff --git a/storage.go b/storage.go
index ea24780..3be56d7 100644
--- a/storage.go
+++ b/storage.go
@@ -2,6 +2,8 @@ package main
import (
"context"
+ "encoding/binary"
+ "encoding/hex"
"errors"
"fmt"
"time"
@@ -11,13 +13,22 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
+const Connections = "connections"
+const ImportedPcaps = "imported_pcaps"
+const Rules = "rules"
+
+var NoFilters = UnorderedDocument{}
+
const defaultConnectionTimeout = 10*time.Second
const defaultOperationTimeout = 3*time.Second
type Storage interface {
InsertOne(ctx context.Context, collectionName string, document interface{}) (interface{}, error)
+ InsertMany(ctx context.Context, collectionName string, documents []interface{}) ([]interface{}, error)
UpdateOne(ctx context.Context, collectionName string, filter interface{}, update interface {}, upsert bool) (interface{}, error)
+ UpdateMany(ctx context.Context, collectionName string, filter interface{}, update interface {}, upsert bool) (interface{}, error)
FindOne(ctx context.Context, collectionName string, filter interface{}) (UnorderedDocument, error)
+ Find(ctx context.Context, collectionName string, filter interface{}, results interface{}) error
}
type MongoStorage struct {
@@ -28,6 +39,14 @@ type MongoStorage struct {
type OrderedDocument = bson.D
type UnorderedDocument = bson.M
+func UniqueKey(timestamp time.Time, payload uint32) string {
+ var key [8]byte
+ binary.BigEndian.PutUint32(key[0:4], uint32(timestamp.Unix()))
+ binary.BigEndian.PutUint32(key[4:8], payload)
+
+ return hex.EncodeToString(key[:])
+}
+
func NewMongoStorage(uri string, port int, database string) *MongoStorage {
opt := options.Client()
opt.ApplyURI(fmt.Sprintf("mongodb://%s:%v", uri, port))
@@ -38,8 +57,9 @@ func NewMongoStorage(uri string, port int, database string) *MongoStorage {
db := client.Database(database)
colls := map[string]*mongo.Collection{
- "imported_pcaps": db.Collection("imported_pcaps"),
- "connections": db.Collection("connections"),
+ Connections: db.Collection(Connections),
+ ImportedPcaps: db.Collection(ImportedPcaps),
+ Rules: db.Collection(Rules),
}
return &MongoStorage{
@@ -76,6 +96,26 @@ func (storage *MongoStorage) InsertOne(ctx context.Context, collectionName strin
return result.InsertedID, nil
}
+func (storage *MongoStorage) InsertMany(ctx context.Context, collectionName string,
+ documents []interface{}) ([]interface{}, error) {
+
+ collection, ok := storage.collections[collectionName]
+ if !ok {
+ return nil, errors.New("invalid collection: " + collectionName)
+ }
+
+ if ctx == nil {
+ ctx, _ = context.WithTimeout(context.Background(), defaultOperationTimeout)
+ }
+
+ result, err := collection.InsertMany(ctx, documents)
+ if err != nil {
+ return nil, err
+ }
+
+ return result.InsertedIDs, nil
+}
+
func (storage *MongoStorage) UpdateOne(ctx context.Context, collectionName string,
filter interface{}, update interface {}, upsert bool) (interface{}, error) {
@@ -150,3 +190,55 @@ func (storage *MongoStorage) FindOne(ctx context.Context, collectionName string,
return result, nil
}
+
+type FindOperation struct {
+ options options.FindOptions
+}
+
+
+
+func (storage *MongoStorage) Find(ctx context.Context, collectionName string,
+ filter interface{}, results interface{}) error {
+
+ collection, ok := storage.collections[collectionName]
+ if !ok {
+ return errors.New("invalid collection: " + collectionName)
+ }
+
+ if ctx == nil {
+ ctx, _ = context.WithTimeout(context.Background(), defaultOperationTimeout)
+ }
+
+ options.FindOptions{
+ AllowDiskUse: nil,
+ AllowPartialResults: nil,
+ BatchSize: nil,
+ Collation: nil,
+ Comment: nil,
+ CursorType: nil,
+ Hint: nil,
+ Limit: nil,
+ Max: nil,
+ MaxAwaitTime: nil,
+ MaxTime: nil,
+ Min: nil,
+ NoCursorTimeout: nil,
+ OplogReplay: nil,
+ Projection: nil,
+ ReturnKey: nil,
+ ShowRecordID: nil,
+ Skip: nil,
+ Snapshot: nil,
+ Sort: nil,
+ }
+ cursor, err := collection.Find(ctx, filter)
+ if err != nil {
+ return err
+ }
+ err = cursor.All(ctx, results)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}