Average Aggregation Queries in Meteor

Ok, still in my toy app, I want to find out the average mileage on a group of car owners' odometers. This is pretty easy on the client but doesn't scale. Right? But on the server, I don't exactly see how to accomplish it.

Questions:

  1. How do you implement something on the server then use it on the client?
  2. How do you use the $avg aggregation function of mongo to leverage its optimized aggregation function?
  3. Or alternatively to (2) how do you do a map/reduce on the server and make it available to the client?

The suggestion by @HubertOG was to use Meteor.call, which makes sense and I did this:

# Client side
Template.mileage.average_miles = ->
  answer = null
  Meteor.call "average_mileage", (error, result) ->
    console.log "got average mileage result #{result}"
    answer = result
  console.log "but wait, answer = #{answer}"
  answer

# Server side
Meteor.methods average_mileage: ->
  console.log "server mileage called"
  total = count = 0
  r = Mileage.find({}).forEach (mileage) ->
    total += mileage.mileage
    count += 1
  console.log "server about to return #{total / count}"
  total / count

That would seem to work fine, but it doesn't because as near as I can tell Meteor.call is an asynchronous call and answer will always be a null return. Handling stuff on the server seems like a common enough use case that I must have just overlooked something. What would that be?

Thanks!


As of Meteor 0.6.5, the collection API doesn't support aggregation queries yet because there's no (straightforward) way to do live updates on them. However, you can still write them yourself, and make them available in a Meteor.publish, although the result will be static. In my opinion, doing it this way is still preferable because you can merge multiple aggregations and use the client-side collection API.

Meteor.publish("someAggregation", function (args) {
    var sub = this;
    // This works for Meteor 0.6.5
    var db = MongoInternals.defaultRemoteCollectionDriver().mongo.db;

    // Your arguments to Mongo's aggregation. Make these however you want.
    var pipeline = [
        { $match: doSomethingWith(args) },
        { $group: {
            _id: whatWeAreGroupingWith(args),
            count: { $sum: 1 }
        }}
    ];

    db.collection("server_collection_name").aggregate(        
        pipeline,
        // Need to wrap the callback so it gets called in a Fiber.
        Meteor.bindEnvironment(
            function(err, result) {
                // Add each of the results to the subscription.
                _.each(result, function(e) {
                    // Generate a random disposable id for aggregated documents
                    sub.added("client_collection_name", Random.id(), {
                        key: e._id.somethingOfInterest,                        
                        count: e.count
                    });
                });
                sub.ready();
            },
            function(error) {
                Meteor._debug( "Error doing aggregation: " + error);
            }
        )
    );
});

The above is an example grouping/count aggregation. Some things of note:

  • When you do this, you'll naturally be doing an aggregation on server_collection_name and pushing the results to a different collection called client_collection_name.
  • This subscription isn't going to be live, and will probably be updated whenever the arguments change, so we use a really simple loop that just pushes all the results out.
  • The results of the aggregation don't have Mongo ObjectIDs, so we generate some arbitrary ones of our own.
  • The callback to the aggregation needs to be wrapped in a Fiber. I use Meteor.bindEnvironment here but one can also use a Future for more low-level control.

If you start combining the results of publications like these, you'll need to carefully consider how the randomly generated ids impact the merge box. However, a straightforward implementation of this is just a standard database query, except it is more convenient to use with Meteor APIs client-side.

TL;DR version: Almost anytime you are pushing data out from the server, a publish is preferable to a method.

For more information about different ways to do aggregation, check out this post.


I did this with the 'aggregate' method. (ver 0.7.x)

if(Meteor.isServer){
Future = Npm.require('fibers/future');
Meteor.methods({
    'aggregate' : function(param){
        var fut = new Future();
        MongoInternals.defaultRemoteCollectionDriver().mongo._getCollection(param.collection).aggregate(param.pipe,function(err, result){
            fut.return(result);
        });
        return fut.wait();
    }
    ,'test':function(param){
        var _param = {
            pipe : [
            { $unwind:'$data' },
            { $match:{ 
                'data.y':"2031",
                'data.m':'01',
                'data.d':'01'
            }},
            { $project : {
                '_id':0
                ,'project_id'               : "$project_id"
                ,'idx'                      : "$data.idx"
                ,'y'                        : '$data.y'
                ,'m'                        : '$data.m'
                ,'d'                        : '$data.d'
            }}
        ],
            collection:"yourCollection"
        }
        Meteor.call('aggregate',_param);
    }
});

}


If you want reactivity, use Meteor.publish instead of Meteor.call. There's an example in the docs where they publish the number of messages in a given room (just above the documentation for this.userId), you should be able to do something similar.


You can use Meteor.methods for that.

// server
Meteor.methods({
  average: function() {
    ...
    return something;
  },

});

// client

var _avg = {                      /* Create an object to store value and dependency */
  dep: new Deps.Dependency();
};

Template.mileage.rendered = function() {
  _avg.init = true;
};

Template.mileage.averageMiles = function() {
  _avg.dep.depend();              /* Make the function rerun when _avg.dep is touched */
  if(_avg.init) {                 /* Fetch the value from the server if not yet done */
    _avg.init = false; 
    Meteor.call('average', function(error, result) {
      _avg.val = result;
      _avg.dep.changed();         /* Rerun the helper */
    });
  }
  return _avg.val;
});