I have about a million records in Redis which I want to dump into Elasticsearch periodically. I just want to make sure that my script is decent enough in terms of speed and no memory leaks.
'use strict';
const redis = require('redis');
const bluebird = require('bluebird');
const request = require('request');
const elasticsearch = require('elasticsearch');
const fs = require('fs');
const _ = require('lodash');
const async = require('async');
const sh = require('shorthash');
const sleep = require('sleep');
const config = require('../config');
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);
let client = redis.createClient({
host: config.redis.url,
port: config.redis.port
});
let ES = elasticsearch.Client({
host: config.elasticsearch.url,
requestTimeout: 30000000
});
var keys = fs.readFileSync('no-keys').toString().split('\n');
keys = keys.filter((e) => e);
let chunkedKeys = _.chunk(keys, 1000);
console.log('We have ' + chunkedKeys.length + ' keys');
_.each(chunkedKeys, (chunkedKey) => {
client.mget(chunkedKey, (mgetError, replies) => {
if (mgetError) {
console.error(mgetError);
}
console.log('MGET complete from Redis');
console.log('We have ' + replies.length + ' documents');
async.mapLimit(replies, 5, (reply, callback) => {
try {
let content = JSON.parse(reply);
let k = sh.unique(content.url);
let body = [{index: {_index: config.elasticsearch.index, _type: 'article', _id: k, _timestamp: (new Date()).toISOString() }}];
body.push(content);
callback(null, body);
} catch(e) {
console.error(e);
callback(e, []);
}
}, (err, results) => {
if(err) {
console.log(err);
}
let mergedResult = _.flatten(results.filter((e) => e));
console.log('Export complete with ' + mergedResult.length);
ES.bulk({body: mergedResult}, () => {
console.log('Import complete');
});
});
});
});