From 7662bf1bf5f487b7c54e3b260dee79012fbf2314 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 24 Jan 2017 14:59:25 -0500 Subject: [PATCH] Adjusted Address index. --- lib/services/address/encoding.js | 86 +++++++++++++++++----------- lib/services/address/index.js | 98 ++++++++------------------------ lib/services/transaction.js | 59 +++++++++++++++---- lib/services/wallet-api/utils.js | 80 +++++++++++++++++++++++--- 4 files changed, 197 insertions(+), 126 deletions(-) diff --git a/lib/services/address/encoding.js b/lib/services/address/encoding.js index 62a114df..93464f8f 100644 --- a/lib/services/address/encoding.js +++ b/lib/services/address/encoding.js @@ -6,11 +6,12 @@ var Address = bitcore.Address; var PublicKey = bitcore.PublicKey; var constants = require('./constants'); var $ = bitcore.util.preconditions; +var utils = require('../wallet-api/utils'); var exports = {}; -function Encoding(prefix) { - this.prefix = prefix; +function Encoding(servicePrefix) { + this.servicePrefix = servicePrefix; } Encoding.prototype.getTerminalKey = function(startKey) { @@ -19,9 +20,14 @@ Encoding.prototype.getTerminalKey = function(startKey) { return endKey; }; -Encoding.prototype.encodeAddressIndexKey = function(address, isSpent, height, txid, index, spending) { +Encoding.prototype.encodeAddressIndexKey = function(address, height, txid) { // TODO if later params are given but not earlier ones, throw an error - var buffers = [this.prefix]; + var args = Array.prototype.slice.call(arguments); + //if (!utils.hasRequiredArgsForEncoding(args)) { + // return null; + //} + var prefix = new Buffer('00', 'hex'); + var buffers = [this.servicePrefix, prefix]; var addressSizeBuffer = new Buffer(1); addressSizeBuffer.writeUInt8(address.length); @@ -30,12 +36,6 @@ Encoding.prototype.encodeAddressIndexKey = function(address, isSpent, height, tx buffers.push(addressSizeBuffer); buffers.push(addressBuffer); - if(isSpent !== undefined) { - var isSpentBuffer = new Buffer(1); - isSpentBuffer.writeUInt8(isSpent); - buffers.push(isSpentBuffer); - } - if(height !== undefined) { var heightBuffer = new Buffer(4); heightBuffer.writeUInt32BE(height); @@ -47,49 +47,72 @@ Encoding.prototype.encodeAddressIndexKey = function(address, isSpent, height, tx buffers.push(txidBuffer); } - if(index !== undefined) { - var indexBuffer = new Buffer(4); - indexBuffer.writeUInt32BE(index); - buffers.push(indexBuffer); - } - - if(spending !== undefined) { - var spendingBuffer = new Buffer(1); - spendingBuffer.writeUInt8(spending); - buffers.push(spendingBuffer); - } - return Buffer.concat(buffers); }; Encoding.prototype.decodeAddressIndexKey = function(buffer) { var reader = new BufferReader(buffer); - var prefix = reader.read(2); + var prefix = reader.read(3); var addressSize = reader.readUInt8(); var address = reader.read(addressSize).toString('utf8'); - var isSpent = reader.readUInt8(); var height = reader.readUInt32BE(); var txid = reader.read(32).toString('hex'); - var index = reader.readUInt32BE(); - var spending = reader.readUInt8(); return { address: address, - isSpent: isSpent ? true : false, height: height, txid: txid, - index: index, - spending: spending ? true : false }; }; -Encoding.prototype.encodeAddressIndexValue = function(satoshis, scriptBuffer) { +Encoding.prototype.encodeUtxoIndexKey = function(address, txid, outputIndex) { + var prefix = new Buffer('01', 'hex'); + var buffers = [this.servicePrefix, prefix]; + + var addressSizeBuffer = new Buffer(1); + addressSizeBuffer.writeUInt8(address.length); + var addressBuffer = new Buffer(address, 'utf8'); + + buffers.push(addressSizeBuffer); + buffers.push(addressBuffer); + + if(txid) { + var txidBuffer = new Buffer(txid, 'hex'); + buffers.push(txidBuffer); + } + + if(outputIndex !== undefined) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + buffers.push(outputIndexBuffer); + } + + return Buffer.concat(buffers); +}; + +Encoding.prototype.decodeUtxoIndexKey = function(buffer) { + var reader = new BufferReader(buffer); + var prefix = reader.read(3); + + var addressSize = reader.readUInt8(); + var address = reader.read(addressSize).toString('utf8'); + var txid = reader.read(32).toString('hex'); + var outputIndex = reader.readUInt32BE(4); + + return { + address: address, + txid: txid, + outputIndex: outputIndex + }; +}; + +Encoding.prototype.encodeUtxoIndexValue = function(satoshis, scriptBuffer) { var satoshisBuffer = new Buffer(8); satoshisBuffer.writeDoubleBE(satoshis); return Buffer.concat([satoshisBuffer, scriptBuffer]); }; -Encoding.prototype.decodeAddressIndexValue = function(buffer) { +Encoding.prototype.decodeUtxoIndexValue = function(buffer) { var satoshis = buffer.readDoubleBE(0); var scriptBuffer = buffer.slice(8, buffer.length); return { @@ -97,7 +120,6 @@ Encoding.prototype.decodeAddressIndexValue = function(buffer) { script: scriptBuffer }; }; - // exports.encodeUnspentIndexKey = function(hashTypeBuffer, hashBuffer, txid, index) { // var indexBuffer = new Buffer(4); // indexBuffer.writeUInt32BE(index); diff --git a/lib/services/address/index.js b/lib/services/address/index.js index c4d94193..c764bf25 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -150,8 +150,9 @@ AddressService.prototype.blockHandler = function(block, connectBlock, callback) var operations = []; var transactionLength = txs.length; - async.eachLimit(txs, self.concurrency, function(tx, next) { + for(var i = 0; i < txs.length; i++) { + var tx = txs[i]; var txid = tx.id; var inputs = tx.inputs; var outputs = tx.outputs; @@ -177,12 +178,10 @@ AddressService.prototype.blockHandler = function(block, connectBlock, callback) continue; } - var key = self.encoding.encodeAddressIndexKey(address, false, height, txid, outputIndex); - var value = self.encoding.encodeAddressIndexValue(output.satoshis, output._scriptBuffer); + var key = self.encoding.encodeAddressIndexKey(address, height, txid); operations.push({ type: action, - key: key, - value: value + key: key }); // Collect data for subscribers @@ -197,72 +196,39 @@ AddressService.prototype.blockHandler = function(block, connectBlock, callback) timestamp: block.header.timestamp }; } - - //this.balanceEventHandler(block, address); - - } - - // Publish events to any subscribers for this transaction - for (var addressKey in txmessages) { - //this.transactionEventHandler(txmessages[addressKey]); } if(tx.isCoinbase()) { - return next(); + continue; } - async.eachOfLimit(inputs, self.concurrency, function(input, inputIndex, next) { + //TODO deal with P2PK + for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { var input = inputs[inputIndex]; - var inputHash; - var inputHashType; if(!input.script) { log.debug('Invalid script'); - return next(); + continue; } - self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) { - if(err) { - return next(err); - } + var inputAddress = self._getAddressString(input.script); - var output = tx.outputs[input.outputIndex]; + if(!inputAddress) { + log.warn('Unsupported script for input in transaction ' + txid); + continue; + } - var address = self._getAddressString(input.script, output); + var inputKey = self.encoding.encodeAddressIndexKey(inputAddress, height, txid); - if(!address) { - log.warn('Unsupported script for input in transaction ' + txid); - return next(); - } - - var inputKey = self.encoding.encodeAddressIndexKey(address, true, height, txid, inputIndex, true); - - var outputKey = self.encoding.encodeAddressIndexKey(address, true, tx.__height, tx.id, input.outputIndex, false); - var outputKeyToDelete = self.encoding.encodeAddressIndexKey(address, false, tx.__height, tx.id, input.outputIndex, false); - var outputValue = self.encoding.encodeAddressIndexValue(output.satoshis, output._scriptBuffer); - var inputValue = self.encoding.encodeAddressIndexValue(output.satoshis, input._scriptBuffer); - - operations = operations.concat([{ - type: action, - key: inputKey, - value: inputValue - }, - { - type: action, - key: outputKey, - value: outputValue - }, - { - type: reverseAction, - key: outputKeyToDelete, - value: outputValue - }]); - next(); + operations.push({ + type: action, + key: inputKey }); - }, next); - }, function(err) { - callback(err, operations); + } + } + setImmediate(function() { + callback(null, operations); }); }; @@ -1092,28 +1058,10 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba AddressService.prototype.getAddressTxids = function(address, options, callback) { var self = this; - self._getAddressTxidsByIndex(address, false, options.start, options.end, function(err, txids1) { - if(err) { - return callback(err); - } - - self._getAddressTxidsByIndex(address, true, options.start, options.end, function(err, txids2) { - if(err) { - return callback(err); - } - - callback(null, _.union(txids1, txids2)); - }); - }); -}; - -AddressService.prototype._getAddressTxidsByIndex = function(address, isSpent, startHeight, endHeight, callback) { - var self = this; - var txids = {}; - var start = self.encoding.encodeAddressIndexKey(address, isSpent, startHeight); - var end = self.encoding.encodeAddressIndexKey(address, isSpent, endHeight); + var start = self.encoding.encodeAddressIndexKey(address, options.start); + var end = self.encoding.encodeAddressIndexKey(address, options.end); var stream = self.store.createKeyStream({ gte: start, diff --git a/lib/services/transaction.js b/lib/services/transaction.js index f7ef48c8..a4d45798 100644 --- a/lib/services/transaction.js +++ b/lib/services/transaction.js @@ -4,7 +4,9 @@ var async = require('async'); var BaseService = require('../service'); var inherits = require('util').inherits; var bitcore = require('bitcore-lib'); - +var _ = require('lodash'); +var LRU = require('lru-cache'); +var utils = require('./wallet-api/utils'); /** * The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional * functionality for getting information by bitcoin transaction hash/id. This includes the current @@ -16,7 +18,18 @@ var bitcore = require('bitcore-lib'); function TransactionService(options) { BaseService.call(this, options); this.concurrency = options.concurrency || 20; - this._mempool = {}; + /* upon initialization of the mempool, only txids are obtained from + * a trusted bitcoin node (the bitcoind service's rpchost or p2p option) + * Since, the mempool is very temporal, I see no reason to take the + * resources to gather the actual tx data. Any NEW txs that arrive + * after the mempool is initialized will have their full tx data + * If a tx is queried from the mempool and it does not have tx data, + * then call to the trusted bitcoind will take place and the result set. + */ + this._mempool = LRU({ + max: utils.parseByteCount(options.maxMemPoolSize) || 100 * 1024 * 1024, //100MB + length: function(tx) { if (tx) { return tx.toBuffer().length; } } + }); this.currentTransactions = {}; } @@ -32,22 +45,22 @@ TransactionService.prototype.start = function(callback) { self.store = this.node.services.db.store; - var bus = self.node.openBus(); + var bus = self.node.openBus({ remoteAddress: 'localhost' }); bus.subscribe('bitcoind/rawtransaction'); bus.on('bitcoind/rawtransaction', function(txHex) { var tx = new bitcore.Transaction(txHex); - self._mempool[tx.id] = tx; + self._updateMempool(tx); }); async.series([ function(next) { - self.node.services.bitcoind.getMempool(function(err, txs) { + self.node.services.bitcoind.getMempool(function(err, txidList) { if(err) { return next(err); } - for(var i = 0; i < txs.length; i++) { - self._mempool[txs[i]] = true; + for(var i = 0; i < txidList.length; i++) { + self._updateMempool(txidList[i]); } next(); }); @@ -71,6 +84,21 @@ TransactionService.prototype.stop = function(callback) { setImmediate(callback); }; +TransactionService.prototype._updateMempool = function(tx, action) { + if (action === 'del') { + return this._mempool.del(tx.id); + } + var val, key; + if (_.isString(tx)) { + val = false; + key = tx; + } else { + val = tx; + key = tx.id; + } + return this._mempool.set(key, val); +}; + TransactionService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -93,6 +121,7 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba }); }, function(next) { async.eachSeries(block.transactions, function(tx, next) { + self._updateMempool(tx, action); tx.__timestamp = block.__timestamp; tx.__height = block.__height; @@ -155,9 +184,18 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) }); } - if (options.queryMempool && self._mempool[txid]) { + var memPoolTx = self._mempool.get(txid); + if (options && options.queryMempool && memPoolTx) { return setImmediate(function() { - callback(null, self._mempool[txid]); + callback(null, memPoolTx); + }); + } else if (memPoolTx === false) { + self.node.services.bitcoind.getTransaction(txid, function(err, tx) { + if(err) { + return callback(err); + } + self._updateMempool(tx); + callback(null, tx); }); } @@ -197,7 +235,8 @@ TransactionService.prototype._encodeTransactionValue = function(transaction) { var inputValuesLengthBuffer = new Buffer(2); inputValuesLengthBuffer.writeUInt16BE(inputValues.length * 8); - return new Buffer.concat([heightBuffer, timestampBuffer, inputValuesLengthBuffer, inputValuesBuffer, transaction.toBuffer()]); + return new Buffer.concat([heightBuffer, timestampBuffer, + inputValuesLengthBuffer, inputValuesBuffer, transaction.toBuffer()]); }; TransactionService.prototype._decodeTransactionValue = function(buffer) { diff --git a/lib/services/wallet-api/utils.js b/lib/services/wallet-api/utils.js index 5eae4143..6248fa5d 100644 --- a/lib/services/wallet-api/utils.js +++ b/lib/services/wallet-api/utils.js @@ -444,7 +444,7 @@ exports.sha512KDF = function(passphrase, salt, derivationOptions, callback) { callback(null, derivation); }; -exports.hashPassphrase = function(opts) { +exports.hashPassphrase = function() { return exports.sha512KDF; }; @@ -661,6 +661,13 @@ exports.toIntIfNumberLike = function(a) { }; exports.delimitedStringParse = function(delim, str) { + function tryJSONparse(str) { + try { + return JSON.parse(str); + } catch(e) { + return false; + } + } var ret = []; if (delim === null) { @@ -674,18 +681,73 @@ exports.delimitedStringParse = function(delim, str) { ret = _.compact(ret); return ret.length === 0 ? false : ret; - function tryJSONparse(str) { - try { - return JSON.parse(str); - } catch(e) { - return false; - } - } }; exports.diffTime = function(time) { var diff = process.hrtime(time); return (diff[0] * 1E9 + diff[1])/(1E9 * 1.0); -} +}; + +/* +* input: string representing a number + multiple of bytes, e.g. 500MB, 200KB, 100B +* output: integer representing the byte count +*/ +exports.parseByteCount = function(byteCountString) { + + function finish(n, m) { + var num = parseInt(n); + if (num > 0) { + return num * m; + } + return null; + } + + if (!_.isString) { + return; + } + var str = byteCountString.replace(/\s+/g, ''); + var map = { 'MB': 1E6, 'kB': 1000, 'KB': 1000, 'MiB': (1024 * 1024), + 'KiB': 1024, 'GiB': Math.pow(1024, 3), 'GB': 1E9 }; + var keys = Object.keys(map); + for(var i = 0; i < keys.length; i++) { + var re = new RegExp(keys[i] + '$'); + var match = str.match(re); + if (match) { + var num = str.slice(0, match.index); + return finish(num, map[keys[i]]); + } + } + return finish(byteCountString, 1); +}; + +/* + * input: arguments passed into originating function (whoever called us) + * output: bool args are valid for encoding a key to the database +*/ +exports.hasRequiredArgsForEncoding = function(args) { + //for encoding a key to the database, the passed in args must: + // the first atg should exist + // the second arg is optional, but must exist if there is a third, fourth, nth arg + // the third arg is optional, but not if the second arg is missing, etc. + function exists(arg) { + return !(arg === null || arg === undefined); + } + + if (!exists(args[0])) { + return false; + } + + var pastArgMissing; + + for(var i = 1; i < args.length; i++) { + var argMissing = exists(args[i]); + if (argMissing && pastArgMissing) { + return false; + } + pastArgMissing = argMissing; + } + + return true; +}; module.exports = exports;