diff options
author | Emiliano Ciavatta | 2020-04-03 10:47:23 +0000 |
---|---|---|
committer | Emiliano Ciavatta | 2020-04-03 10:47:23 +0000 |
commit | 9883cd346f694ad09aac839f9ddc4a25df0e0b0a (patch) | |
tree | 7a5bfdf282b273ffc410a9e8c758fb8db73072c7 /storage.go | |
parent | b02ee06a2dad56650f539f69df5660a88e442059 (diff) |
Add connection_handler and stream_handler
Diffstat (limited to 'storage.go')
-rw-r--r-- | storage.go | 57 |
1 files changed, 36 insertions, 21 deletions
@@ -14,7 +14,13 @@ import ( const defaultConnectionTimeout = 10*time.Second const defaultOperationTimeout = 3*time.Second -type Storage struct { +type Storage interface { + InsertOne(ctx context.Context, collectionName string, document interface{}) (interface{}, error) + UpdateOne(ctx context.Context, collectionName string, filter interface{}, update interface {}, upsert bool) (interface{}, error) + FindOne(ctx context.Context, collectionName string, filter interface{}) (UnorderedDocument, error) +} + +type MongoStorage struct { client *mongo.Client collections map[string]*mongo.Collection } @@ -22,7 +28,7 @@ type Storage struct { type OrderedDocument = bson.D type UnorderedDocument = bson.M -func NewStorage(uri string, port int, database string) Storage { +func NewMongoStorage(uri string, port int, database string) *MongoStorage { opt := options.Client() opt.ApplyURI(fmt.Sprintf("mongodb://%s:%v", uri, port)) client, err := mongo.NewClient(opt) @@ -36,13 +42,13 @@ func NewStorage(uri string, port int, database string) Storage { "connections": db.Collection("connections"), } - return Storage{ + return &MongoStorage{ client: client, collections: colls, } } -func (storage *Storage) Connect(ctx context.Context) error { +func (storage *MongoStorage) Connect(ctx context.Context) error { if ctx == nil { ctx, _ = context.WithTimeout(context.Background(), defaultConnectionTimeout) } @@ -50,7 +56,7 @@ func (storage *Storage) Connect(ctx context.Context) error { return storage.client.Connect(ctx) } -func (storage *Storage) InsertOne(ctx context.Context, collectionName string, +func (storage *MongoStorage) InsertOne(ctx context.Context, collectionName string, document interface{}) (interface{}, error) { collection, ok := storage.collections[collectionName] @@ -70,7 +76,7 @@ func (storage *Storage) InsertOne(ctx context.Context, collectionName string, return result.InsertedID, nil } -func (storage *Storage) UpdateOne(ctx context.Context, collectionName string, +func (storage *MongoStorage) UpdateOne(ctx context.Context, collectionName string, filter interface{}, update interface {}, upsert bool) (interface{}, error) { collection, ok := storage.collections[collectionName] @@ -97,7 +103,30 @@ func (storage *Storage) UpdateOne(ctx context.Context, collectionName string, return result.ModifiedCount == 1, nil } -func (storage *Storage) FindOne(ctx context.Context, collectionName string, +func (storage *MongoStorage) UpdateMany(ctx context.Context, collectionName string, + filter interface{}, update interface {}, upsert bool) (interface{}, error) { + + collection, ok := storage.collections[collectionName] + if !ok { + return nil, errors.New("invalid collection: " + collectionName) + } + + if ctx == nil { + ctx, _ = context.WithTimeout(context.Background(), defaultOperationTimeout) + } + + opts := options.Update().SetUpsert(upsert) + update = bson.D{{"$set", update}} + + result, err := collection.UpdateMany(ctx, filter, update, opts) + if err != nil { + return nil, err + } + + return result.ModifiedCount, nil +} + +func (storage *MongoStorage) FindOne(ctx context.Context, collectionName string, filter interface{}) (UnorderedDocument, error) { collection, ok := storage.collections[collectionName] @@ -121,17 +150,3 @@ func (storage *Storage) FindOne(ctx context.Context, collectionName string, return result, nil } - - -func testStorage() { - storage := NewStorage("localhost", 27017, "testing") - _ = storage.Connect(nil) - - id, err := storage.InsertOne(nil, "connections", bson.M{"_id": "provaaa"}) - if err != nil { - panic(err) - } else { - fmt.Println(id) - } - -} |