diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 95c5b229..a30a1e74 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -16,6 +16,7 @@ var bodyParser = require('body-parser'); var LRU = require('lru-cache'); var Encoding = require('./encoding'); var Readable = require('stream').Readable; +var Input = require('bitcore-lib').Transaction.Input; var WalletService = function(options) { BaseService.call(this, options); @@ -83,135 +84,6 @@ WalletService.prototype.getPublishEvents = function() { return []; }; -WalletService.prototype.blockHandler = function(block, connectBlock, callback) { - var self = this; - - var txs = block.transactions; - - var action = 'put'; - var reverseAction = 'del'; - if (!connectBlock) { - action = 'del'; - reverseAction = 'put'; - } - - var operations = []; - var walletIdsNeedingUpdate = {}; - - async.eachSeries(txs, function(tx, next) { - var inputs = tx.inputs; - var outputs = tx.outputs; - - for (var outputIndex = 0; outputIndex < outputs.length; outputIndex++) { - var output = outputs[outputIndex]; - - var script = output.script; - - if(!script) { - log.debug('Invalid script'); - continue; - } - - var address = self.getAddressString(script); - - if(!address || !self._addressMap[address]) { - continue; - } - - var walletIds = self._addressMap[address]; - - for(var i = 0; i < walletIds.length; i++) { - var walletId = walletIds[i]; - walletIdsNeedingUpdate[walletId] = true; - - operations.push({ - type: action, - key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, outputIndex), - value: self._encoding.encodeWalletUtxoValue(block.__height, output.satoshis, output._scriptBuffer) - }); - - operations.push({ - type: action, - key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, outputIndex), - value: self._encoding.encodeWalletUtxoSatoshisValue(block.__height, output._scriptBuffer) - }); - - if(connectBlock) { - self.balances[walletId] += output.satoshis; - } else { - self.balances[walletId] -= output.satoshis; - } - } - } - - if(tx.isCoinbase()) { - return next(); - } - - //TODO deal with P2PK - async.each(inputs, function(input, next) { - if(!input.script) { - log.debug('Invalid script'); - return next(); - } - - var inputAddress = self.getAddressString(input.script); - - if(!inputAddress || !self._addressMap[inputAddress]) { - return next(); - } - - var walletIds = self._addressMap[inputAddress]; - - async.each(walletIds, function(walletId, next) { - walletIdsNeedingUpdate[walletId] = true; - - self.node.services.transaction.getTransaction(input.prevTxId, {}, function(err, tx) { - if(err) { - return next(err); - } - - var utxo = tx.outputs[input.outputIndex]; - - operations.push({ - type: reverseAction, - key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex), - value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer) - }); - - operations.push({ - type: reverseAction, - key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex), - value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer) - }); - - if(connectBlock) { - self.balances[walletId] -= output.satoshis; - } else { - self.balances[walletId] += output.satoshis; - } - - next(); - }); - }, next); - }, next); - }, function(err) { - if(err) { - return callback(err); - } - - // update balances - for(var walletId in walletIdsNeedingUpdate) { - operations.push({ - type: 'put', - key: self._encoding.encodeWalletBalanceKey(walletId), - value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) - }); - } - - callback(null, operations); - }); -}; WalletService.prototype.getAddressString = function(script, output) { var address = script.toAddress(); @@ -239,11 +111,22 @@ WalletService.prototype.getAddressString = function(script, output) { return null; }; +WalletService.prototype.blockHandler = function(block, connectBlock, callback) { + var opts = { + block: block, + connectBlock: connectBlock, + fnProcessIO: this._processSerialIO, + serial: true + }; +console.log(block.__height); + this._blockHandler(opts, callback); +}; + WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { var opts = { block: block, connectBlock: connectBlock, - fnEncodingKey: this._encoding.encodeWalletTransactionKey + fnProcessIO: this._processConcurrentIO }; this._blockHandler(opts, callback); }; @@ -253,62 +136,201 @@ WalletService.prototype._blockHandler = function(opts, callback) { var txs = opts.block.transactions; - var operations = []; - txs.forEach(function(tx) { - var ops = self._processTransaction(opts, tx); - if (ops) { - operations.concat(ops); + async.mapSeries(txs, function(tx, next) { + self._processTransaction(opts, tx, next); + }, function(err, operations) { + if(err) { + return callback(err); } + callback(null, _.compact(operations)); }); - //function has a callback, but is synchronuous nonetheless - callback(null, operations); }; -WalletService.prototype._processTransaction = function(opts, tx) { +WalletService.prototype._processTransaction = function(opts, tx, callback) { var self = this; if(tx.isCoinbase()) { - return; + return callback(); } + tx.outputs.forEach(function(output, index) { + output.index = index; + }); + var ioData = tx.inputs.concat(tx.outputs); - var operations = []; - ioData.forEach(function(io) { - var ops = self._processIO(opts, tx, io); - if (ops) { - operations.concat(ops); + async.mapSeries(ioData, function(io, next) { + if (opts.serial) { + self._processSerialIO(opts, tx, io, next); + } else { + self._processConcurrentIO(opts, tx, io, next); } + }, function(err, operations) { + if(err) { + return callback(err); + } + callback(null, _.compact(operations)); }); - return operations; - }; -WalletService.prototype._processIO = function(opts, tx, io) { +WalletService.prototype._processConcurrentIO = function(opts, tx, io, callback) { - if(!io.script) { + var walletIds = this._getWalletIdsFromScript(io.script); + if (!walletIds) { + return callback(); + } + var actions = this._getActions(opts.connectBlock); + + var operations = walletIds.forEach(function(walletId) { + return { + type: actions[0], + key: this._encoding.encodeWalletTransactionKey(walletId, opts.block.__height, tx.id) + }; + }); + + setImmediate(function() { + callback(null, operations); + }); + +}; + + +WalletService.prototype._processSerialIO = function(opts, tx, io, callback) { + var fn = this._processSerialOutput; + if (io instanceof Input) { + fn = this._processSerialInput; + } + fn(opts, tx. io, callback); +}; + +WalletService.prototype._getWalletIdsFromScript= function(script) { + + if(!script) { log.debug('Invalid script'); return; } - var address = this.getAddressString(io.script); + var address = this.getAddressString(script); if(!address || !this._addressMap[address]) { return; } - var walletIds = this._addressMap[address]; + return this._addressMap[address]; +}; - var action = opts.connectBlock ? 'put' : 'del'; +WalletService.prototype._getActions = function(connect) { + var action = 'put'; + var reverseAction = 'del'; + if (!connect) { + action = 'del'; + reverseAction = 'put'; + } + return [action, reverseAction]; +}; + +WalletService.prototype._processSerialOutput = function(opts, tx, output, callback) { + + var self = this; + var walletIds = self._getWalletIdsFromScript(output.script); + + if (!walletIds) { + return callback(); + } + + var actions = self._getActions(opts.connectBlock); + var walletIdsNeedingUpdate = {}; + + + async.mapSeries(walletIds, function(walletId, next) { + + walletIdsNeedingUpdate[walletId] = true; + + var operations = [{ + type: actions[0], + key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, output.index), + value: self._encoding.encodeWalletUtxoValue(opts.block.__height, output.satoshis, output._scriptBuffer) + }, + { + type: actions[0], + key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, output.index), + value: self._encoding.encodeWalletUtxoSatoshisValue(opts.block.__height, output._scriptBuffer) + }]; + + if(opts.connectBlock) { + self.balances[walletId] += output.satoshis; + } else { + self.balances[walletId] -= output.satoshis; + } + + next(null, operations); + }, function(err, operations) { + + if(err) { + return callback(err); + } + callback(null, operations); - return walletIds.map(function(walletId) { - return { - type: action, - key: opts.fnEncoding.call(walletId, opts.block.__height, tx.id) - }; }); + +}; + +WalletService.prototype._processSerialInput = function(opts, tx, input, callback) { + + var self = this; + + var actions = self._getActions(opts.connectBlock); + + var walletIds = self._getWalletIdsFromScript(input.script); + + if (walletIds) { + return callback(); + } + var walletIdsNeedingUpdate = []; + + async.mapSeries(walletIds, function(walletId, next) { + + walletIdsNeedingUpdate[walletId] = true; + + self.node.services.transaction.getTransaction(input.prevTxId, {}, function(err, tx) { + + if(err) { + return next(err); + } + + var utxo = tx.outputs[input.outputIndex]; + + var operations = [{ + type: actions[1], + key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex), + value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer) + }, + { + type: actions[1], + key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex), + value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer) + }]; + + if(self.connectBlock) { + self.balances[walletId] -= utxo.satoshis; + } else { + self.balances[walletId] += utxo.satoshis; + } + + next(null, operations); + + }); + }, function(err, operations) { + + if(err) { + return callback(err); + } + callback(null, operations); + + }); + }; WalletService.prototype._loadAllAddresses = function(callback) {