I have a collection inside MongoDB with thousands of documents. A document should be processing depending on some logic. To make the point simple, I suppose here there is a field requiresProcessing
indicating whether or not the document must be processed.
To do the processing I created a stream of objects from MongoDB to stream just the objects that must be processed. The code is as follows:
// MongoDB repository module
var config = require('./configurations.js');
var db = require('monk')(config.mongodb.connectionUrl);
var collection = db.get('collection');
var numForProcessing = 0;
var getNextForProcessing = function() {
numForProcessing++;
return collection.find({requiresProcessing: true}, {skip: numForProcessing - 1, limit: 1}).then(function(data) {
return data[0];
});
}
module.exports = {
getNextForProcessing: getNextForProcessing
};
// Stream Definition Module
"use strict";
const Readable = require('stream').Readable;
const repository = require('./repository.js');
class StreamObjectsForProcessing extends Readable {
constructor(options) {
if (options === undefined) options = {};
options.objectMode = true;
super(options);
}
_read() {
let self = this;
repository.getNextForProcessing().then(function(data) {
if (data === undefined) {
self.push(null);
} else {
self.push(data);
}
}).catch(function(err) {
console.log(err);
});
}
}
Then to use it, I simply do as usual:
var stream = new StreamObjectsForProcessing();
stream.on('data', function(data) {
// processing here
});
Although it is working, I want a review for this. Is it a good approach? Or can it be improved?