diff --git a/lib/encoding.js b/lib/encoding.js index 32e178ba..64a51d6d 100644 --- a/lib/encoding.js +++ b/lib/encoding.js @@ -95,16 +95,21 @@ Encoding.prototype.decodeUtxoIndexKey = function(buffer) { }; }; -Encoding.prototype.encodeUtxoIndexValue = function(satoshis, scriptBuffer) { +Encoding.prototype.encodeUtxoIndexValue = function(height, satoshis, scriptBuffer) { + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); var satoshisBuffer = new Buffer(8); satoshisBuffer.writeDoubleBE(satoshis); - return Buffer.concat([satoshisBuffer, scriptBuffer]); + return Buffer.concat([height, satoshisBuffer, scriptBuffer]); }; Encoding.prototype.decodeUtxoIndexValue = function(buffer) { - var satoshis = buffer.readDoubleBE(0); - var scriptBuffer = buffer.slice(8, buffer.length); + var reader = new BufferReader(buffer); + var height = reader.readUInt32BE(); + var satoshis = reader.readDoubleBE(); + var scriptBuffer = reader.read(buffer.length - 12); return { + height: height, satoshis: satoshis, script: scriptBuffer }; @@ -191,7 +196,7 @@ Encoding.prototype.decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -Encoding.prototype.encodeWalletTransactionKey = function(walletId, height, blockIndex) { +Encoding.prototype.encodeWalletTransactionKey = function(walletId, height) { var buffers = [this.servicePrefix]; var walletIdSizeBuffer = new Buffer(1); @@ -207,12 +212,6 @@ Encoding.prototype.encodeWalletTransactionKey = function(walletId, height, block buffers.push(heightBuffer); } - if(blockIndex !== undefined) { - var blockIndexBuffer = new Buffer(4); - blockIndexBuffer.writeUInt32BE(blockIndex); - buffers.push(blockIndexBuffer); - } - return Buffer.concat(buffers); }; @@ -279,27 +278,23 @@ Encoding.prototype.decodeWalletUtxoKey = function(buffer) { }; }; -Encoding.prototype.encodeWalletUtxoValue = function(height, satoshis, address) { +Encoding.prototype.encodeWalletUtxoValue = function(height, satoshis, scriptBuffer) { var heightBuffer = new Buffer(4); heightBuffer.writeUInt32BE(height); var satoshisBuffer = new Buffer(8); satoshisBuffer.writeDoubleBE(satoshis); - var addressSizeBuffer = new Buffer(1); - addressSizeBuffer.writeUInt8(address.length); - var addressBuffer = new Buffer(address, 'utf8'); - return Buffer.concat([height, satoshisBuffer, addressBuffer]); + return Buffer.concat([height, satoshisBuffer, scriptBuffer]); }; Encoding.prototype.decodeWalletUtxoValue = function(buffer) { var reader = new BufferReader(buffer); var height = reader.readUInt32BE(); var satoshis = reader.readDoubleBE(); - var addressSize = reader.readUInt8(); - var address = reader.read(addressSize).toString('utf8'); + var scriptBuffer = reader.read(buffer.length - 12); return { height: height, satoshis: satoshis, - address: address + script: scriptBuffer }; }; @@ -350,37 +345,33 @@ Encoding.prototype.decodeWalletUtxoSatoshisKey = function(buffer) { }; }; -Encoding.prototype.encodeWalletUtxoSatoshisValue = function(height, address) { +Encoding.prototype.encodeWalletUtxoSatoshisValue = function(height, scriptBuffer) { var heightBuffer = new Buffer(4); heightBuffer.writeUInt32BE(height); - var addressSizeBuffer = new Buffer(1); - addressSizeBuffer.writeUInt8(address.length); - var addressBuffer = new Buffer(address, 'utf8'); - return Buffer.concat([height, addressBuffer]); + return Buffer.concat([height, scriptBuffer]); }; Encoding.prototype.decodeWalletUtxoSatoshisValue = function(buffer) { var reader = new BufferReader(buffer); var height = reader.readUInt32BE(); - var addressSize = reader.readUInt8(); - var address = reader.read(addressSize).toString('utf8'); + var scriptBuffer = reader.read(buffer.length - 4); return { height: height, - address: address + script: scriptBuffer }; }; -Encoding.prototype.encodeWalletKey = function(walletId) { +Encoding.prototype.encodeWalletAddressesKey = function(walletId) { var prefix = new Buffer('00', 'hex'); var walletIdBuffer = new Buffer(walletId, 'hex'); return Buffer.concat([this.servicePrefix, prefix, walletIdBuffer]); }; -Encoding.prototype.decodeWalletKey = function(buffer) { +Encoding.prototype.decodeWalletAddressesKey = function(buffer) { return buffer.slice(3).toString('hex'); }; -Encoding.prototype.encodeWalletValue = function(addresses, balance) { +Encoding.prototype.encodeWalletAddressesValue = function(addresses) { var bufferList = []; var addressesLengthBuffer = new Buffer(4); addressesLengthBuffer.writeUInt32BE(addresses.length); @@ -391,13 +382,11 @@ Encoding.prototype.encodeWalletValue = function(addresses, balance) { bufferList.push(addressSizeBuffer); bufferList.push(new Buffer(addresses[i], 'utf8')); } - var balanceBuffer = new Buffer(8); - balanceBuffer.writeUInt32BE(balance); - bufferList.push(balanceBuffer); + return Buffer.concat(bufferList); }; -Encoding.prototype.decodeWalletValue = function(buffer) { +Encoding.prototype.decodeWalletAddressesValue = function(buffer) { var reader = new BufferReader(buffer); var addressesLength = reader.readUInt32BE(); var addresses = []; @@ -406,13 +395,31 @@ Encoding.prototype.decodeWalletValue = function(buffer) { addressSize = reader.readUInt8(addressSize); addresses.push(reader.read(addressSize).toString('utf8')); } + + return addresses; +}; + +Encoding.prototype.encodeWalletBalanceKey = function(walletId) { + var prefix = new Buffer('01', 'hex'); + var walletIdBuffer = new Buffer(walletId, 'hex'); + return Buffer.concat([this.servicePrefix, prefix, walletIdBuffer]); +}; + +Encoding.prototype.decodeWalletBalanceKey = function(buffer) { + return buffer.slice(3).toString('hex'); +}; + +Encoding.prototype.encodeWalletBalanceValue = function(balance) { + var balanceBuffer = new Buffer(8); + balanceBuffer.writeUInt32BE(balance); + return balanceBuffer; +}; + +Encoding.prototype.decodeWalletBalanceValue = function(buffer) { + var reader = new BufferReader(buffer); var balance = reader.readDoubleBE(); - return { - addresses: addresses, - balance: balance - }; - + return balance; }; module.exports = Encoding; diff --git a/lib/services/address/index.js b/lib/services/address/index.js index d2cd39e0..c18b02b3 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -271,7 +271,7 @@ AddressService.prototype.blockHandler = function(block, connectBlock, callback) } var key = self.encoding.encodeUtxoIndexKey(address, txid, outputIndex); - var value = self.encoding.encodeUtxoIndexValue(output.satoshis, output._scriptBuffer); + var value = self.encoding.encodeUtxoIndexValue(block.__height, output.satoshis, output._scriptBuffer); operations.push({ type: action, key: key, @@ -307,7 +307,7 @@ AddressService.prototype.blockHandler = function(block, connectBlock, callback) } else { // uncommon and slower, this happens during a reorg self.node.services.transaction.getTransaction(input.prevTxId, {}, function(err, tx) { var utxo = tx.outputs[input.outputIndex]; - var inputValue = self.encoding.encodeUtxoIndexValue(utxo.satoshis, utxo._scriptBuffer); + var inputValue = self.encoding.encodeUtxoIndexValue(tx.__height, utxo.satoshis, utxo._scriptBuffer); operations.push({ type: 'put', key: inputKey, @@ -1186,6 +1186,35 @@ AddressService.prototype.getAddressTxids = function(address, options, callback) }); }; +AddressService.prototype.getAddressTxidsWithHeights = function(address, options, callback) { + var self = this; + + var txids = {}; + + var start = self.encoding.encodeAddressIndexKey(address, options.start); + var end = self.encoding.encodeAddressIndexKey(address, options.end); + + var stream = self.store.createKeyStream({ + gte: start, + lt: end + }); + + var streamErr = null; + + stream.on('data', function(buffer) { + var key = self.encoding.decodeAddressIndexKey(buffer); + txids[key.txid] = key.height; + }); + + stream.on('end', function() { + callback(streamErr, txids); + }); + + stream.on('error', function(err) { + streamErr = err; + }); +}; + /** * This will give an object with: * balance - confirmed balance diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 3e729d31..23071b23 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -33,6 +33,7 @@ var WalletService = function(options) { }); this._addressMap = {}; + this.balances = {}; }; inherits(WalletService, BaseService); @@ -48,6 +49,8 @@ WalletService.prototype.getAPIMethods = function() { WalletService.prototype.start = function(callback) { var self = this; + self.store = self.node.services.db.store; + self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { if(err) { return callback(err); @@ -56,7 +59,13 @@ WalletService.prototype.start = function(callback) { self.servicePrefix = servicePrefix; self._encoding = new Encoding(self.servicePrefix); - self._loadAllAddresses(callback); + self._loadAllAddresses(function(err) { + if(err) { + return callback(err); + } + + self._loadAllBalances(callback); + }); }); }; @@ -81,6 +90,7 @@ WalletService.prototype.blockHandler = function(block, connectBlock, callback) { } var operations = []; + var walletIdsNeedingUpdate = {}; async.eachSeries(txs, function(tx, next) { var inputs = tx.inputs; @@ -105,17 +115,25 @@ WalletService.prototype.blockHandler = function(block, connectBlock, callback) { var walletIds = self._addressMap[address]; walletIds.forEach(function(walletId) { + walletIdsNeedingUpdate[walletId] = true; + operations.push({ type: action, key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, outputIndex), - value: self._encoding.encodeWalletUtxoValue(block.__height, output.satoshis, address) + value: self._encoding.encodeWalletUtxoValue(block.__height, output.satoshis, output._scriptBuffer) }); operations.push({ type: action, key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, outputIndex), - value: self._encoding.encodeWalletUtxoValue(block.__height, address) + value: self._encoding.encodeWalletUtxoValue(block.__height, output._scriptBuffer) }); + + if(connectBlock) { + self.balances[walletId] += output.satoshis; + } else { + self.balances[walletId] -= output.satoshis; + } }); } @@ -139,6 +157,8 @@ WalletService.prototype.blockHandler = function(block, connectBlock, callback) { var walletIds = self._addressMap[inputAddress]; async.each(walletIds, function(walletId, next) { + walletIdsNeedingUpdate[walletId] = true; + self.node.services.transaction.getTransaction(input.prevTxId, {}, function(err, tx) { if(err) { return next(err); @@ -149,15 +169,21 @@ WalletService.prototype.blockHandler = function(block, connectBlock, callback) { operations.push({ type: reverseAction, key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex), - value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, inputAddress) + value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer) }); operations.push({ type: reverseAction, key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex), - value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, inputAddress) + value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer) }); + if(connectBlock) { + self.balances[walletId] -= output.satoshis; + } else { + self.balances[walletId] += output.satoshis; + } + next(); }); }, next); @@ -167,6 +193,15 @@ WalletService.prototype.blockHandler = function(block, connectBlock, callback) { return callback(err); } + // update balances + for(var walletId in walletIdsNeedingUpdate) { + operations.push({ + type: 'put', + key: self._encoding.encodeWalletBalanceKey(walletId), + value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) + }); + } + callback(null, operations); }); }; @@ -210,7 +245,7 @@ WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, c walletIds.forEach(function(walletId) { operations.push({ type: action, - key: self._encoding.encodeWalletTransactionKey(walletId, block.__height, i), + key: self._encoding.encodeWalletTransactionKey(walletId, block.__height), value: self._encoding.encodeWalletTransactionValue(tx.id) }) }); @@ -240,7 +275,7 @@ WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, c walletIds.forEach(function(walletId) { operations.push({ type: action, - key: self._encoding.encodeWalletTransactionKey(walletId, block.__height, i), + key: self._encoding.encodeWalletTransactionKey(walletId, block.__height), value: self._encoding.encodeWalletTransactionValue(tx.id) }); }); @@ -254,6 +289,8 @@ WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, c WalletService.prototype._loadAllAddresses = function(callback) { var self = this; + self._addressMap = {}; + var start = self._encoding.encodeWalletKey('00'); var end = self._encoding.encodeWalletKey(Array(65).join('f')); @@ -265,8 +302,8 @@ WalletService.prototype._loadAllAddresses = function(callback) { var streamErr = null; stream.on('data', function(data) { - var key = self._encoding.decodeWalletKey(data.key); - var value = self._encoding.decodeWalletValue(data.value); + var key = self._encoding.decodeWalletAddressesKey(data.key); + var value = self._encoding.decodeWalletAddressesValue(data.value); value.addresses.forEach(function(address) { if(!self._addressMap[address]) { @@ -286,6 +323,38 @@ WalletService.prototype._loadAllAddresses = function(callback) { }); }; +WalletService.prototype._loadAllBalances = function(callback) { + var self = this; + + self._balances = {}; + + var start = self._encoding.encodeWalletBalanceKey('00'); + var end = self._encoding.encodeWalletBalanceKey(Array(65).join('f')); + + var stream = self.db.createReadStream({ + gte: start, + lt: end + }); + + var streamErr = null; + + stream.on('data', function(data) { + var walletId = self._encoding.decodeWalletBalanceKey(data.key); + var balance = self._encoding.decodeWalletBalanceValue(data.value); + + self._balances[walletId] = balance; + }); + + stream.on('error', function(err) { + streamErr = err; + }); + + stream.on('end', function() { + callback(streamErr); + }); +}; + + WalletService.prototype._endpointUTXOs = function() { var self = this; return function(req, res) { @@ -365,12 +434,13 @@ WalletService.prototype._endpointPostAddresses = function() { return function(req, res) { var addresses = req.addresses; var walletId = utils.getWalletId(); - self._storeAddresses(walletId, addresses, function(err, hash) { + + self._importAddresses(walletId, addresses, function(err) { if(err) { return utils.sendError(err, res); } res.status(201).jsonp({ - walletId: walletId, + walletId: walletId }); }); }; @@ -419,11 +489,10 @@ WalletService.prototype._endpointPutAddresses = function() { return res.status(404).send('Not found'); } - var allAddresses = _.union(oldAddresses, newAddresses); + var addAddresses = _.without(newAddresses, oldAddresses); + var amountAdded = addAddresses.length; - var amountAdded = allAddresses.length - oldAddresses.length; - - self._storeAddresses(walletId, allAddresses, function(err) { + self._importAddresses(walletId, addAddresses, function(err) { if(err) { return utils.sendError(err, res); } @@ -437,62 +506,51 @@ WalletService.prototype._endpointPutAddresses = function() { }; }; -WalletService.prototype._getUtxos = function(walletId, height, options, callback) { - // TODO get the balance only to this height +WalletService.prototype._getUtxos = function(walletId, callback) { var self = this; - self._getAddresses(walletId, function(err, addresses) { - if(err) { - return callback(err); - } - self.node.services.bitcoind.getAddressUnspentOutputs(addresses, options, callback); + var stream = self.store.createReadStream({ + gte: self._encoding.encodeWalletUtxoKey(walletId), + lt: self._encoding.encodeWalletUtxoKey(walletId, Array(33).join('f')) // come up with better terminal key }); -}; -WalletService.prototype._getBalance = function(walletId, height, options, callback) { - // TODO get the balance only to this height - var self = this; - self._getAddresses(walletId, function(err, addresses) { - if(err) { - return callback(err); - } + var utxos = []; + var streamErr = null; - self.node.services.bitcoind.getAddressUnspentOutputs(addresses, options, function(err, utxos) { - if(err) { - return callback(err); - } - if (options.byAddress) { - var result = self._getBalanceByAddress(addresses, utxos); - return callback(null, { - addresses: result, - height: null - }); - } - var balance = 0; - utxos.forEach(function(utxo) { - balance += utxo.satoshis; - }); - callback(null, { - balance: balance, - height: null - }); + stream.on('data', function(data) { + var key = self._encoding.decodeWalletUtxoKey(data.key); + var value = self._encoding.decodeWalletUtxoValue(data.value); + + utxos.push({ + txid: key.txid, + outputIndex: key.outputIndex, + height: value.height, + satoshis: value.satoshis, + script: value.script }); }); + + stream.on('error', function(err) { + streamErr = err; + }); + + stream.on('end', function() { + callback(streamErr, utxos); + }); }; -WalletService.prototype._getBalanceByAddress = function(addresses, utxos) { - var res = {}; - utxos.forEach(function(utxo) { - if (res[utxo.address]) { - res[utxo.address] += utxo.satoshis; - } else { - res[utxo.address] = utxo.satoshis; +WalletService.prototype._getBalance = function(walletId, callback) { + var self = this; + + var key = self._encoding.encodeWalletBalanceKey(walletId); + + self.store.get(key, function(err, buffer) { + if(err) { + return callback(err); } + + callback(null, self._encoding.decodeWalletBalanceValue(buffer)); }); - addresses.forEach(function(address) { - res[address] = res[address] || 0; - }); - return res; }; WalletService.prototype._chunkAdresses = function(addresses) { @@ -558,11 +616,150 @@ WalletService.prototype._getTransactions = function(walletId, options, callback) }; WalletService.prototype._getAddresses = function(walletId, callback) { - this._db.get(walletId, callback); + var key = this._encoding.encodeWalletAddressKey(walletId); + this.store.get(key, callback); +}; + +WalletService.prototype._importAddresses = function(walletId, addresses, callback) { + var self = this; + + self._getAddresses(walletId, function(err, oldAddresses) { + if(err) { + return callback(err); + } + + async.parallel( + [ + self._getUTXOIndexOperations.bind(self, walletId, addresses), + self._getTxidOperations.bind(self, walletId, addresses) + ], + function(err, results) { + if(err) { + return callback(err); + } + + var operations = results[0].concat(results[1]); + operations.push({ + type: 'put', + key: self._encoding.encodeWalletAddressKey(walletId), + value: self._encoding.encodeWalletAddressValue(oldAddresses.concat(addresses)) + }); + + self.store.batch(operations, function(err) { + if(err) { + return callback(err); + } + + // TODO check if height has changed since we first entered the function + // if it has, we need to get operations for the new blocks + + // Update addressMap and wallet balances + self._loadAllAddresses(function(err) { + if(err) { + return callback(err); + } + + self._loadAllBalances(callback); + }); + }); + } + ); + }); +}; + +WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, callback) { + var self = this; + + // TODO what if initialBalance changes while we are getting unspent outputs on new addresses? + + var balance = 0; + + self._getBalance(walletId, function(err, initialBalance) { + if(err && !err.notFound) { + return callback(err); + } + + if(initialBalance) { + balance = initialBalance; + } + + self.services.address.getUnspentOutputs(addresses, function(err, utxos) { + if(err) { + return callback(err); + } + + var operations = []; + + for(var i = 0; i < utxos.length; i++) { + var utxo = utxos[i]; + + balance += utxo.satoshis; + + operations.push({ + type: 'put', + key: self._encoding.encodeWalletUtxoKey(walletId, utxo.txid, utxo.outputIndex), + value: self._encoding.encodeWalletUtxoValue(utxo.height, utxo.satoshis, utxo.script) + }); + + operations.push({ + type: 'put', + key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, utxo.txid, utxo.outputIndex), + value: self._encoding.encodeWalletUtxoSatoshisValue(utxo.height, utxo.script) + }); + } + + operations.push({ + type: 'put', + key: self._encoding.encodeWalletBalanceKey(walletId), + value: self._encoding.encodeWalletBalanceValue(balance) + }); + + callback(null, operations); + }); + }); +}; + +WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, callback) { + var self = this; + + var txids = {}; + + async.eachLimit(addresses, 10, function(address, next) { + self.services.address.getAddressTxidsWithHeights(address, {}, function(err, tmpTxids) { + if(err) { + return next(err); + } + + txids = _.merge(txids, tmpTxids); + return next(); + }); + }, function(err) { + if(err) { + return callback(err); + } + + operations = Object.keys(txids).map(function(txid) { + return { + type: 'put', + key: self._encoding.encodeWalletTransactionKey(walletId, txids[txid]), + value: self._encoding.encodeWalletTransactionValue(txid) + }; + }); + + callback(null, operations); + }); }; WalletService.prototype._storeAddresses = function(walletId, addresses, callback) { - this._db.put(walletId, addresses, callback); + var key = this._encoding.encodeWalletAddressKey(walletId); + var value = this._encoding.encodeWalletValue(addresses); + this.store.put(key, value, callback); +}; + +WalletService.prototype._storeBalance = function(walletId, balance, callback) { + var key = this._encoding.encodeWalletBalanceKey(walletId); + var value = this._encoding.encodeWalletBalanceValue(balance); + this.store.put(key, value, callback); }; WalletService.prototype._endpointGetInfo = function() {