package ddp
// ----------------------------------------------------------------------
// Collection
// ----------------------------------------------------------------------
type Update map[string]interface{}
type UpdateListener interface {
CollectionUpdate(collection, operation, id string, doc Update)
}
// Collection managed cached collection data sent from the server in a
// livedata subscription.
//
// It would be great to build an entire mongo compatible local store (minimongo)
type Collection interface {
// FindOne queries objects and returns the first match.
FindOne(id string) Update
// FindAll returns a map of all items in the cache - this is a hack
// until we have time to build out a real minimongo interface.
FindAll() map[string]Update
// AddUpdateListener adds a channel that receives update messages.
AddUpdateListener(listener UpdateListener)
// livedata updates
added(msg Update)
changed(msg Update)
removed(msg Update)
addedBefore(msg Update)
movedBefore(msg Update)
init() // init informs the collection that the connection to the server has begun/resumed
reset() // reset informs the collection that the connection to the server has been lost
}
// NewMockCollection creates an empty collection that does nothing.
func NewMockCollection() Collection {
return &MockCache{}
}
// NewCollection creates a new collection - always KeyCache.
func NewCollection(name string) Collection {
return &KeyCache{name, make(map[string]Update), nil}
}
// KeyCache caches items keyed on unique ID.
type KeyCache struct {
// The name of the collection
Name string
// items contains collection items by ID
items map[string]Update
// listeners contains all the listeners that should be notified of collection updates.
listeners []UpdateListener
// TODO(badslug): do we need to protect from multiple threads
}
func (c *KeyCache) added(msg Update) {
id, fields := parseUpdate(msg)
if fields != nil {
c.items[id] = fields
c.notify("create", id, fields)
}
}
func (c *KeyCache) changed(msg Update) {
id, fields := parseUpdate(msg)
if fields != nil {
item, ok := c.items[id]
if ok {
for key, value := range fields {
item[key] = value
}
c.items[id] = item
c.notify("update", id, item)
}
}
}
func (c *KeyCache) removed(msg Update) {
id, _ := parseUpdate(msg)
if len(id) > 0 {
delete(c.items, id)
c.notify("remove", id, nil)
}
}
func (c *KeyCache) addedBefore(msg Update) {
// for keyed cache, ordered commands are a noop
}
func (c *KeyCache) movedBefore(msg Update) {
// for keyed cache, ordered commands are a noop
}
// init prepares the collection for data updates (called when a new connection is
// made or a connection/session is resumed).
func (c *KeyCache) init() {
// TODO start to patch up the current data with fresh server state
}
func (c *KeyCache) reset() {
// TODO we should mark the collection but maintain it's contents and then
// patch up the current contents with the new contents when we receive them.
//c.items = nil
c.notify("reset", "", nil)
}
// notify sends a Update to all UpdateListener's which should never block.
func (c *KeyCache) notify(operation, id string, doc Update) {
for _, listener := range c.listeners {
listener.CollectionUpdate(c.Name, operation, id, doc)
}
}
// FindOne returns the item with matching id.
func (c *KeyCache) FindOne(id string) Update {
return c.items[id]
}
// FindAll returns a dump of all items in the collection
func (c *KeyCache) FindAll() map[string]Update {
return c.items
}
// AddUpdateListener adds a listener for changes on a collection.
func (c *KeyCache) AddUpdateListener(listener UpdateListener) {
c.listeners = append(c.listeners, listener)
}
// OrderedCache caches items based on list order.
// This is a placeholder, currently not implemented as the Meteor server
// does not transmit ordered collections over DDP yet.
type OrderedCache struct {
// ranks contains ordered collection items for ordered collections
items []interface{}
}
func (c *OrderedCache) added(msg Update) {
// for ordered cache, key commands are a noop
}
func (c *OrderedCache) changed(msg Update) {
}
func (c *OrderedCache) removed(msg Update) {
}
func (c *OrderedCache) addedBefore(msg Update) {
}
func (c *OrderedCache) movedBefore(msg Update) {
}
func (c *OrderedCache) init() {
}
func (c *OrderedCache) reset() {
}
// FindOne returns the item with matching id.
func (c *OrderedCache) FindOne(id string) Update {
return nil
}
// FindAll returns a dump of all items in the collection
func (c *OrderedCache) FindAll() map[string]Update {
return map[string]Update{}
}
// AddUpdateListener does nothing.
func (c *OrderedCache) AddUpdateListener(ch UpdateListener) {
}
// MockCache implements the Collection interface but does nothing with the data.
type MockCache struct {
}
func (c *MockCache) added(msg Update) {
}
func (c *MockCache) changed(msg Update) {
}
func (c *MockCache) removed(msg Update) {
}
func (c *MockCache) addedBefore(msg Update) {
}
func (c *MockCache) movedBefore(msg Update) {
}
func (c *MockCache) init() {
}
func (c *MockCache) reset() {
}
// FindOne returns the item with matching id.
func (c *MockCache) FindOne(id string) Update {
return nil
}
// FindAll returns a dump of all items in the collection
func (c *MockCache) FindAll() map[string]Update {
return map[string]Update{}
}
// AddUpdateListener does nothing.
func (c *MockCache) AddUpdateListener(ch UpdateListener) {
}
// parseUpdate returns the ID and fields from a DDP Update document.
func parseUpdate(up Update) (ID string, Fields Update) {
key, ok := up["id"]
if ok {
switch id := key.(type) {
case string:
updates, ok := up["fields"]
if ok {
switch fields := updates.(type) {
case map[string]interface{}:
return id, Update(fields)
default:
// Don't know what to do...
}
}
return id, nil
}
}
return "", nil
}