Error handling for tx stream.

This commit is contained in:
Chris Kleeschulte 2017-06-06 14:29:58 -04:00
parent fe6896ba98
commit ac26249c40
2 changed files with 44 additions and 25 deletions

View File

@ -644,13 +644,13 @@ WalletService.prototype._endpointResyncAddresses = function() {
if(!oldAddresses) {
return res.status(404).send('Not found');
}
self._removeWallet(walletId, function(err) {
if(err) {
return utils.sendError(err, res);
}
self._createWallet(walletId, function() {
var jobId = utils.generateJobId();
@ -713,48 +713,60 @@ WalletService.prototype._endpointGetTransactions = function() {
walletId: walletId
};
var missingTxidCount = 0;
var transform = new Transform({ objectMode: true, highWaterMark: 1000000 });
var missingTxidCount = 0;
var txStream = new Transform({ objectMode: true, highWaterMark: 1000000 });
//txids are sent in and the actual tx's are found here
transform._transform = function(chunk, enc, callback) {
txStream._transform = function(chunk, enc, callback) {
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex');
if (txid.length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000') {
missingTxidCount++;
log.error('missingTxidCount: ', missingTxidCount);
txStream.emit('error', new Error('Chunk: ' + chunk.toString('hex') + ' did not contain a txid.'));
return callback();
}
self._getTransactionFromDb(options, txid, function(err, tx) {
err = new Error('this is a test error' + txid);
if(err) {
log.error(err);
transform.unpipe();
txStream.emit('error', err);
return callback();
}
var formattedTx = utils.toJSONL(self._formatTransaction(tx));
transform.push(formattedTx);
txStream.push(formattedTx);
callback();
});
};
transform._flush = function(callback) {
txStream.on('error', function(err) {
log.error(err);
utils.sendError(err, res);
txStream.unpipe();
});
txStream._flush = function(callback) {
self.db.resumeSync();
callback();
};
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options));
var dbStream = self.db.createKeyStream(self._getSearchParams(encodingFn, options));
stream.on('close', function() {
stream.unpipe();
dbStream.on('close', function() {
dbStream.unpipe();
});
stream.pipe(transform).pipe(res);
dbStream.pipe(txStream).pipe(res);
res.on('end', function() {
console.log('res has ended');
});
});
});
};
@ -1373,7 +1385,7 @@ WalletService.prototype._setupWriteRoutes = function(app) {
v.checkAddresses,
s._endpointPostAddresses()
);
app.put('/wallets/:walletId/addresses/resync',
app.put('/wallets/:walletId/addresses/resync',
s._endpointResyncAddresses()
);
};

View File

@ -76,16 +76,6 @@ utils.waitForBitcoreNode = function(opts, callback) {
var self = this;
opts.bitcore.process.stdout.on('data', function(data) {
if (opts.debug) {
console.log(data.toString());
}
});
opts.bitcore.process.stderr.on('data', function(data) {
console.log(data.toString());
});
var errorFilter = function(err, res) {
try {
var info = JSON.parse(res);
@ -153,7 +143,22 @@ utils.initializeAndStartService = function(opts, callback) {
};
utils.startBitcoreNode = function(opts, callback) {
this.initializeAndStartService(opts.bitcore, callback);
this.initializeAndStartService(opts.bitcore, function(err) {
if (err) {
return callback(err);
}
opts.bitcore.process.stdout.on('data', function(data) {
if (opts.debug) {
process.stdout.write(data.toString());
}
});
opts.bitcore.process.stderr.on('data', function(data) {
process.stderr.write(data.toString());
});
callback();
});
};
utils.startBitcoind = function(opts, callback) {
@ -341,6 +346,8 @@ utils.getListOfTxs = function(opts, callback) {
path: '/wallet-api/wallets/' + opts.walletId + '/transactions?start=0&end=' + end });
self.queryBitcoreNode(httpOpts, function(err, res) {
console.log(err, res);
if(err) {
return callback(err);
}