This commit is contained in:
Chris Kleeschulte 2017-06-22 14:10:23 -04:00
parent 28e29ff8db
commit e8efdafce0
6 changed files with 135 additions and 767 deletions

View File

@ -6,7 +6,6 @@ module.exports.Service = require('./lib/service');
module.exports.errors = require('./lib/errors');
module.exports.services = {};
module.exports.services.Bitcoin = require('./lib/services/bitcoind');
module.exports.services.Web = require('./lib/services/web');
module.exports.scaffold = {};

View File

@ -77,8 +77,9 @@ function checkService(service) {
!service.module.prototype.start ||
!service.module.prototype.stop) {
throw new Error(
'Could not load service "' + service.name + '" as it does not support necessary methods and properties.'
);
'Could not load service "' +
service.name +
'" as it does not support necessary methods and properties.');
}
}
@ -94,7 +95,9 @@ function lookInRequirePathConfig(req, service) {
var serviceFile = service.config.requirePath.replace(/.js$/, '');
return req(serviceFile);
} catch(e) {
log.info('Checked the service\'s requirePath value, but could not find the service, checking elsewhere.');
log.info('Checked the service\'s requirePath value, ' +
'but could not find the service, checking elsewhere. ' +
'Error caught: ' + e.message);
}
}
@ -102,7 +105,8 @@ function lookInCwd(req, service) {
try {
return req(process.cwd + '/' + service);
} catch(e) {
log.info('Checked the current working directory, but did not find the service.');
log.info('Checked the current working directory, but did not find the service. ' +
'Error caught: ' + e.message);
}
}
@ -111,7 +115,8 @@ function lookInBuiltInPath(req, service) {
var serviceFile = path.resolve(__dirname, '../services/' + service.name);
return req(serviceFile);
} catch(e) {
log.info('Checked the built-in path: lib/services, but did not find the service.');
log.info('Checked the built-in path: lib/services, but did not find the service. ' +
'Error caught: ' + e.message);
}
}
@ -124,7 +129,8 @@ function lookInModuleManifest(req, service) {
return req(serviceModule);
}
} catch(e) {
log.info('Checked the module\'s package.json for \'bitcoreNode\' field, but found no service.');
log.info('Checked the module\'s package.json for \'bitcoreNode\' field, but found no service. ' +
'Error caught: ' + e.message);
}
}

View File

@ -1,499 +0,0 @@
'use strict';
var util = require('util');
var bitcore = require('bitcore-lib');
var zmq = require('zmq');
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
var _ = bitcore.deps._;
var index = require('../../');
var errors = index.errors;
var log = index.log;
var Service = require('../../service');
var LRU = require('lru-cache');
function Bitcoin(options) {
if (!(this instanceof Bitcoin)) {
return new Bitcoin(options);
}
Service.call(this, options);
this.options = options;
this.subscriptions = {};
this.subscriptions.rawtransaction = [];
this.subscriptions.hashblock = [];
this.subscriptions.rawblock = [];
this.startRetryTimes = this.options.startRetryTimes || 120;
this.startRetryInterval = this.options.startRetryInterval || 1000;
this._initClients();
this._process = options.process || process;
this.on('error', function(err) {
log.error(err.stack);
});
this._hashBlockCache = LRU(100);
}
util.inherits(Bitcoin, Service);
Bitcoin.dependencies = [];
Bitcoin.prototype._initClients = function() {
var self = this;
this.nodes = [];
this.nodesIndex = 0;
Object.defineProperty(this, 'client', {
get: function() {
var client = self.nodes[self.nodesIndex].client;
self.nodesIndex = (self.nodesIndex + 1) % self.nodes.length;
return client;
},
enumerable: true,
configurable: false
});
};
Bitcoin.prototype.getAPIMethods = function() {
var methods = [
['getBlock', this, this.getBlock, 1]
];
return methods;
};
Bitcoin.prototype.getPublishEvents = function() {
return [
{
name: 'bitcoind/rawtransaction',
scope: this,
subscribe: this.subscribe.bind(this, 'rawtransaction'),
unsubscribe: this.unsubscribe.bind(this, 'rawtransaction')
},
{
name: 'bitcoind/hashblock',
scope: this,
subscribe: this.subscribe.bind(this, 'hashblock'),
unsubscribe: this.unsubscribe.bind(this, 'hashblock')
},
{
name: 'bitcoind/rawblock',
scope: this,
subscribe: this.subscribe.bind(this, 'rawblock'),
unsubscribe: this.unsubscribe.bind(this, 'rawblock')
}
];
};
Bitcoin.prototype.subscribe = function(name, emitter) {
this.subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
};
Bitcoin.prototype.unsubscribe = function(name, emitter) {
var index = this.subscriptions[name].indexOf(emitter);
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'bitcoind/' + name, 'total:', this.subscriptions[name].length);
};
Bitcoin.prototype._tryAllClients = function(func, callback) {
var self = this;
var nodesIndex = this.nodesIndex;
var retry = function(done) {
var client = self.nodes[nodesIndex].client;
nodesIndex = (nodesIndex + 1) % self.nodes.length;
func(client, done);
};
async.retry({times: this.nodes.length, interval: this.tryAllInterval || 1000}, retry, callback);
};
Bitcoin.prototype._wrapRPCError = function(errObj) {
var err = new errors.RPCError(errObj.message);
err.code = errObj.code;
return err;
};
Bitcoin.prototype._getGenesisBlock = function(callback) {
var self = this;
if (self.height === 0) {
return self.getRawBlock(self.tiphash, function(err, blockBuffer) {
if(err) {
return callback(err);
}
self.genesisBuffer = blockBuffer;
callback();
});
}
self.client.getBlockHash(0, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var blockhash = response.result;
self.getRawBlock(blockhash, function(err, blockBuffer) {
if (err) {
return callback(err);
}
self.genesisBuffer = blockBuffer;
callback();
});
});
};
Bitcoin.prototype._getNetworkTip = function(callback) {
var self = this;
self.client.getBestBlockHash(function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
self.tiphash = response.result;
self.client.getBlock(response.result, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
self.height = response.result.height;
callback();
});
});
};
Bitcoin.prototype._initChain = function(callback) {
var self = this;
async.series([
self._getNetworkTip.bind(self),
self._getGenesisBlock.bind(self),
], function(err) {
if(err) {
return callback(err);
}
self.emit('ready');
callback();
});
};
Bitcoin.prototype._zmqRawBlockHandler = function(message) {
var block = new bitcore.Block(message);
this.tiphash = block.hash;
this.height++;
block.__height = this.height;
block.height = this.height;
for (var i = 0; i < this.subscriptions.rawblock.length; i++) {
this.subscriptions.rawblock[i].emit('bitcoind/rawblock', block);
}
};
Bitcoin.prototype._zmqBlockHandler = function(message) {
var self = this;
var hashBlockHex = message.toString('hex');
if (!self._isSendableHashBlock(hashBlockHex)) {
return;
}
self._hashBlockCache.set(hashBlockHex);
self.tiphash = hashBlockHex;
self.height++;
self.emit('block', message);
for (var i = 0; i < this.subscriptions.hashblock.length; i++) {
this.subscriptions.hashblock[i].emit('bitcoind/hashblock', hashBlockHex);
}
};
Bitcoin.prototype._isSendableHashBlock = function(hashBlockHex) {
return hashBlockHex.length === 64 && !this._hashBlockCache.get(hashBlockHex);
};
Bitcoin.prototype._zmqTransactionHandler = function(node, message) {
var self = this;
self.emit('tx', message);
for (var i = 0; i < this.subscriptions.rawtransaction.length; i++) {
this.subscriptions.rawtransaction[i].emit('bitcoind/rawtransaction', message.toString('hex'));
}
};
Bitcoin.prototype._subscribeZmqEvents = function(node) {
var self = this;
node.zmqSubSocket.subscribe('hashblock');
node.zmqSubSocket.subscribe('rawtx');
node.zmqSubSocket.subscribe('rawblock');
node.zmqSubSocket.on('message', function(topic, message) {
var topicString = topic.toString('utf8');
if (topicString === 'rawtx') {
self._zmqTransactionHandler(node, message);
} else if (topicString === 'hashblock') {
self._zmqBlockHandler(message);
} else if (topicString === 'rawblock') {
self._zmqRawBlockHandler(message);
}
});
};
Bitcoin.prototype._initZmqSubSocket = function(node, zmqUrl) {
node.zmqSubSocket = zmq.socket('sub');
node.zmqSubSocket.on('connect', function(fd, endPoint) {
log.info('ZMQ connected to:', endPoint);
});
node.zmqSubSocket.on('connect_delay', function(fd, endPoint) {
if (this.zmqDelayWarningMultiplierCouunt++ >= this.zmqDelayWarningMultiplier) {
log.warn('ZMQ connection delay:', endPoint);
this.zmqDelayWarningMultiplierCouunt = 0;
}
});
node.zmqSubSocket.on('disconnect', function(fd, endPoint) {
log.warn('ZMQ disconnect:', endPoint);
});
node.zmqSubSocket.on('monitor_error', function(err) {
log.error('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
setTimeout(function() {
node.zmqSubSocket.monitor(500, 0);
}, 5000);
});
node.zmqSubSocket.monitor(100, 0);
if (_.isString(zmqUrl)) {
node.zmqSubSocket.connect(zmqUrl);
}
};
Bitcoin.prototype._connectProcess = function(config) {
var self = this;
var node = {};
node.client = new BitcoinRPC({
protocol: config.rpcprotocol || 'http',
host: config.rpchost || '127.0.0.1',
port: config.rpcport,
user: config.rpcuser,
pass: config.rpcpassword,
rejectUnauthorized: _.isUndefined(config.rpcstrict) ? true : config.rpcstrict
});
self._initZmqSubSocket(node, config.zmqpubrawtx);
self._subscribeZmqEvents(node);
return node;
};
Bitcoin.prototype.start = function(callback) {
var self = this;
if (!self.options.connect) {
log.error('A "connect" array is required in the bitcoind service configuration.');
process.exit(-1);
}
self.nodes = self.options.connect.map(self._connectProcess.bind(self));
if (self.nodes.length === 0) {
log.error('Could not connect to any servers in connect array.');
process.exit(-1);
}
async.retry({ interval: 2000, times: 30 }, self._initChain.bind(this), function(err) {
if(err) {
log.error(err.message);
process.exit(-1);
}
log.info('Bitcoin Daemon Ready');
callback();
});
};
Bitcoin.prototype.stop = function(callback) {
callback();
};
Bitcoin.prototype._maybeGetBlockHash = function(blockArg, callback) {
var self = this;
if (_.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))) {
self._tryAllClients(function(client, done) {
client.getBlockHash(blockArg, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
done(null, response.result);
});
}, callback);
} else {
callback(null, blockArg);
}
};
Bitcoin.prototype.getRawBlock = function(blockArg, callback) {
var self = this;
function queryBlock(err, blockhash) {
if (err) {
return callback(err);
}
self._tryAllClients(function(client, done) {
self.client.getBlock(blockhash, false, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
var buffer = new Buffer(response.result, 'hex');
done(null, buffer);
});
}, callback);
}
self._maybeGetBlockHash(blockArg, queryBlock);
};
Bitcoin.prototype.getBlockHeader = function(blockArg, callback) {
var self = this;
function queryHeader(err, blockhash) {
if (err) {
return callback(err);
}
self._tryAllClients(function(client, done) {
client.getBlockHeader(blockhash, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
var result = response.result;
var header = {
hash: result.hash,
version: result.version,
confirmations: result.confirmations,
height: result.height,
chainWork: result.chainwork,
prevHash: result.previousblockhash,
nextHash: result.nextblockhash,
merkleRoot: result.merkleroot,
time: result.time,
medianTime: result.mediantime,
nonce: result.nonce,
bits: result.bits,
difficulty: result.difficulty
};
done(null, header);
});
}, callback);
}
self._maybeGetBlockHash(blockArg, queryHeader);
};
Bitcoin.prototype.getTransaction = function(txid, callback) {
var self = this;
self._tryAllClients(function(client, done) {
//this won't work without a bitcoin node that has a tx index
log.error('Txid: ' + txid + ' not found in index! Calling getRawTransaction to retrieve.');
self.client.getRawTransaction(txid.toString('hex'), 0, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
done(null, response.result);
});
}, callback);
};
Bitcoin.prototype.getBlock = function(blockArg, callback) {
var self = this;
function queryBlock(err, blockhash) {
if (err) {
return callback(err);
}
self._tryAllClients(function(client, done) {
client.getBlock(blockhash, false, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
var blockObj = bitcore.Block.fromString(response.result);
done(null, blockObj);
});
}, callback);
}
self._maybeGetBlockHash(blockArg, queryBlock);
};
Bitcoin.prototype.isSynced = function(callback) {
this.syncPercentage(function(err, percentage) {
if (err) {
return callback(err);
}
if (Math.round(percentage) >= 100) {
callback(null, true);
} else {
callback(null, false);
}
});
};
Bitcoin.prototype.syncPercentage = function(callback) {
var self = this;
self.client.getBlockchainInfo(function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var percentSynced = response.result.verificationprogress * 100;
callback(null, percentSynced);
});
};
module.exports = Bitcoin;

48
lib/services/p2p/bcoin.js Normal file
View File

@ -0,0 +1,48 @@
'use strict';
var index = require('../../');
var log = index.log;
var bcoin = require('bcoin');
var EE = require('events').EventEmitter;
var Bcoin = function(options) {
this._config = this._getConfig(options);
this.emitter = new EE();
};
Bcoin.prototype._getConfig = function(options) {
var config = {
checkpoints: true,
network: options.network || 'main',
listen: true
};
if (options.prefix) {
config.prefix = options.prefix;
}
if (options.logLevel) {
config.logLevel = options.logLevel;
}
return config;
};
Bcoin.prototype.start = function() {
var self = this;
self._bcoin = bcoin.fullnode(self._config);
log.info('Starting Bcoin full node...');
self._bcoin.open().then(function() {
self._bcoin.connect().then(function() {
self.emitter.emit('connect');
self._bcoin.startSync();
});
});
};
Bcoin.prototype.stop = function() {
this._bcoin.stopSync();
this._bcoin.disconnect();
this._bcoin.close();
};
module.exports = Bcoin;

View File

@ -7,6 +7,7 @@ var index = require('../../');
var log = index.log;
var BaseService = require('../../service');
var assert = require('assert');
var Bcoin = require('./bcoin');
var P2P = function(options) {
@ -15,10 +16,12 @@ var P2P = function(options) {
}
BaseService.call(this, options);
this.options = options;
this._options = options;
this._initPubSub();
this._startBcoinIfNecessary();
this._initP2P();
this._initPubSub();
this._bcoin;
};
@ -26,10 +29,25 @@ util.inherits(P2P, BaseService);
P2P.dependencies = [];
P2P.prototype._hasPeers = function() {
return this._options &&
this._options.peers &&
this._options.peers.host;
};
P2P.prototype._startBcoin = function() {
this._bcoin = new Bcoin({
network: this.node.getNetworkName(),
prefix: this.node.datadir
});
this._bcoin.start();
};
P2P.prototype._initP2P = function() {
this._maxPeers = this.options.maxPeers || 60;
this._minPeers = this.options.minPeers || 1;
this._configPeers = this.options.peers;
this._maxPeers = this._options.maxPeers || 60;
this._minPeers = this._options.minPeers || 1;
this._configPeers = this._options.peers;
this.messages = new p2p.Messages({ network: this.node.network });
this._peerHeights = [];
this._peers = [];
@ -44,12 +62,21 @@ P2P.prototype._initPubSub = function() {
this.subscriptions.transaction = [];
};
P2P.prototype._startBcoinIfNecessary = function() {
if (!this._hasPeers()) {
log.info('Peers not explicitly configured, starting a local bcoin node.');
this._startBcoin();
this._options.peers = [{ ip: { v4: '127.0.0.1' }, port: 48444}];
}
};
P2P.prototype.start = function(callback) {
var self = this;
self._initCache();
self._initPool();
this._setupListeners();
this._setListeners();
callback();
};
@ -107,8 +134,8 @@ P2P.prototype._initPool = function() {
var opts = {};
if (this._configPeers) {
opts.addrs = this._configPeers;
opts.dnsSeed = false;
}
opts.dnsSeed = false;
opts.maxPeers = this._maxPeers;
opts.network = this.node.getNetworkName();
this._pool = new p2p.Pool(opts);
@ -200,7 +227,7 @@ P2P.prototype._onPeerHeaders = function(peer, message) {
this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers);
};
P2P.prototype._setupListeners = function() {
P2P.prototype._setListeners = function() {
var self = this;
self._pool.on('peerready', self._onPeerReady.bind(self));
@ -209,12 +236,21 @@ P2P.prototype._setupListeners = function() {
self._pool.on('peertx', self._onPeerTx.bind(self));
self._pool.on('peerblock', self._onPeerBlock.bind(self));
self._pool.on('peerheaders', self._onPeerHeaders.bind(self));
self.node.on('ready', function() {
self._pool.connect();
});
self.node.once('ready', self._connect.bind(self));
if (self._bcoin) {
self._bcoin.emitter.once('connect', self._connect.bind(self));
}
};
P2P.prototype._connect = function() {
this._connectCalled = this._connectCalled > 0 ? 2 : 1;
if (this._connectCalled > 1) {
log.info('Connecting to p2p network.');
this._pool.connect();
}
};
P2P.prototype._broadcast = function(subscribers, name, entity) {
for (var i = 0; i < subscribers.length; i++) {
subscribers[i].emit(name, entity);

View File

@ -39,7 +39,6 @@ var WalletService = function(options) {
inherits(WalletService, BaseService);
WalletService.dependencies = [
'bitcoind',
'web',
'address',
'transaction',
@ -69,12 +68,35 @@ WalletService.prototype.start = function(callback) {
return callback(err);
}
self.setListeners();
self._loadAllBalances(callback);
});
});
};
WalletService.prototype._setListeners = function() {
this._startSubscriptions();
};
WalletService.prototype._startSubscriptions = function() {
var self = this;
if (self._subscribed) {
return;
}
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('block/block', self._onBlock.bind(self));
self.bus.subscribe('block/block');
};
WalletService.prototype._onBlock = function(block) {
};
WalletService.prototype.stop = function(callback) {
setImmediate(callback);
};
@ -105,250 +127,6 @@ WalletService.prototype._checkAddresses = function() {
return Object.keys(this._addressMap).length > 0;
};
WalletService.prototype.blockHandler = function(block, connectBlock, callback) {
var opts = {
block: block,
connectBlock: connectBlock,
serial: true
};
this._blockHandler(opts, callback);
};
WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
var opts = {
block: block,
connectBlock: connectBlock
};
this._blockHandler(opts, callback);
};
WalletService.prototype._blockHandler = function(opts, callback) {
var self = this;
if (!self._checkAddresses()) {
return setImmediate(function() {
callback(null, []);
});
}
async.mapSeries(opts.block.transactions, function(tx, next) {
self._processTransaction(opts, tx, next);
}, function(err, operations) {
if(err) {
return callback(err);
}
var ret = _.compact(_.flattenDeep(operations));
callback(null, ret);
});
};
WalletService.prototype._processTransaction = function(opts, tx, callback) {
var self = this;
tx.outputs.forEach(function(output, index) {
output.index = index;
});
var ioData = tx.inputs.concat(tx.outputs);
async.mapSeries(ioData, function(io, next) {
if (opts.serial) {
self._processSerialIO(opts, tx, io, next);
} else {
self._processConcurrentIO(opts, tx, io, next);
}
}, function(err, operations) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
WalletService.prototype._processConcurrentIO = function(opts, tx, io, callback) {
var self = this;
var walletIds = self._getWalletIdsFromScript(io);
if (!walletIds) {
return callback();
}
var actions = self._getActions(opts.connectBlock);
var operations = walletIds.map(function(walletId) {
return {
type: actions[0],
key: self._encoding.encodeWalletTransactionKey(walletId, opts.block.__height, tx.id)
};
});
setImmediate(function() {
callback(null, operations);
});
};
WalletService.prototype._processSerialIO = function(opts, tx, io, callback) {
var fn = this._processSerialOutput;
if (io instanceof Input) {
fn = this._processSerialInput;
}
fn.call(this, opts, tx, io, callback);
};
WalletService.prototype._getWalletIdsFromScript = function(io) {
if(!io.script) {
log.debug('Invalid script');
return;
}
return this._addressMap[this.getAddressString(io)];
};
WalletService.prototype._getActions = function(connect) {
var action = 'put';
var reverseAction = 'del';
if (!connect) {
action = 'del';
reverseAction = 'put';
}
return [action, reverseAction];
};
WalletService.prototype._processSerialOutput = function(opts, tx, output, callback) {
var self = this;
var walletIds = self._getWalletIdsFromScript(output);
if (!walletIds) {
return callback();
}
var actions = self._getActions(opts.connectBlock);
async.mapSeries(walletIds, function(walletId, next) {
self.balances[walletId] = self.balances[walletId] || 0;
self.balances[walletId] += opts.connectBlock ? output.satoshis : (-1 * output.satoshis);
var operations = [
{
type: actions[0],
key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, output.index),
value: self._encoding.encodeWalletUtxoValue(opts.block.__height, output.satoshis, output._scriptBuffer)
},
{
type: actions[0],
key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, output.index),
value: self._encoding.encodeWalletUtxoSatoshisValue(opts.block.__height, output._scriptBuffer)
},
{
type: 'put',
key: self._encoding.encodeWalletBalanceKey(walletId),
value: self._encoding.encodeWalletBalanceValue(self.balances[walletId])
}
];
next(null, operations);
}, function(err, operations) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
WalletService.prototype._processSerialInput = function(opts, tx, input, callback) {
var self = this;
var walletIds = input.script && input.script.isPublicKeyIn() ?
['p2pk'] :
self._getWalletIdsFromScript(input);
if (!walletIds) {
return callback();
}
var actions = self._getActions(opts.connectBlock);
async.mapSeries(walletIds, function(walletId, next) {
self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) {
if(err) {
return next(err);
}
var utxo = tx.outputs[input.outputIndex];
if (walletId === 'p2pk') {
var pubKey = utxo.script.getPublicKey().toString('hex');
walletId = self._addressMap[pubKey];
if (!walletId) {
return next(null, []);
}
}
self.balances[walletId] = self.balances[walletId] || 0;
self.balances[walletId] += opts.connectBlock ? (-1 * utxo.satoshis) : utxo.satoshis;
var operations = [
{
type: actions[1],
key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex),
value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer)
},
{
type: actions[1],
key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex),
value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer)
},
{
type: 'put',
key: self._encoding.encodeWalletBalanceKey(walletId),
value: self._encoding.encodeWalletBalanceValue(self.balances[walletId])
}
];
next(null, operations);
});
}, function(err, operations) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
WalletService.prototype._loadAllAddresses = function(callback) {
var self = this;
self._addressMap = {};
@ -644,13 +422,13 @@ WalletService.prototype._endpointResyncAddresses = function() {
if(!oldAddresses) {
return res.status(404).send('Not found');
}
self._removeWallet(walletId, function(err) {
if(err) {
return utils.sendError(err, res);
}
self._createWallet(walletId, function() {
var jobId = utils.generateJobId();
@ -713,7 +491,7 @@ WalletService.prototype._endpointGetTransactions = function() {
walletId: walletId
};
var missingTxidCount = 0;
var missingTxidCount = 0;
var transform = new Transform({ objectMode: true, highWaterMark: 1000000 });
//txids are sent in and the actual tx's are found here
transform._transform = function(chunk, enc, callback) {
@ -751,7 +529,7 @@ WalletService.prototype._endpointGetTransactions = function() {
var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options));
stream.on('close', function() {
stream.unpipe();
stream.unpipe();
});
stream.pipe(transform).pipe(res);
@ -1357,7 +1135,7 @@ WalletService.prototype._setupWriteRoutes = function(app) {
v.checkAddresses,
s._endpointPostAddresses()
);
app.put('/wallets/:walletId/addresses/resync',
app.put('/wallets/:walletId/addresses/resync',
s._endpointResyncAddresses()
);
};