From 28e29ff8db68bf6a18b1c9a0763e4083a764aab8 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 22 Jun 2017 09:00:29 -0400 Subject: [PATCH] wip --- lib/services/address/index.js | 353 ++---------------------------- lib/services/block/index.js | 11 +- lib/services/fee/index.js | 53 +++++ lib/services/mempool/index.js | 64 ++---- lib/services/transaction/index.js | 74 ++----- lib/utils.js | 23 ++ 6 files changed, 141 insertions(+), 437 deletions(-) create mode 100644 lib/services/fee/index.js diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 4d1c2097..6ee6613b 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -7,45 +7,39 @@ var async = require('async'); var index = require('../../'); var log = index.log; var errors = index.errors; -var bitcore = require('bitcore-lib'); -var $ = bitcore.util.preconditions; -var _ = bitcore.deps._; -var EventEmitter = require('events').EventEmitter; -var Address = bitcore.Address; +var _ = require('lodash'); var Encoding = require('./encoding'); var utils = require('../../utils'); var AddressService = function(options) { BaseService.call(this, options); - this.concurrency = options.concurrency || 20; }; inherits(AddressService, BaseService); -AddressService.dependencies = [ - 'bitcoind', - 'db', - 'transaction' -]; +AddressService.dependencies = ['transaction']; AddressService.prototype.start = function(callback) { var self = this; this.db = this.node.services.db; this.db.getPrefix(this.name, function(err, prefix) { + if(err) { return callback(err); } + self.prefix = prefix; self._encoding = new Encoding(self.prefix); - + self._setListeners(); callback(); + }); }; AddressService.prototype.stop = function(callback) { - setImmediate(callback); + callback(); }; AddressService.prototype.getAPIMethods = function() { @@ -53,8 +47,6 @@ AddressService.prototype.getAPIMethods = function() { ['getBalance', this, this.getBalance, 2], ['getOutputs', this, this.getOutputs, 2], ['getUtxos', this, this.getUtxos, 2], - ['getInputForOutput', this, this.getInputForOutput, 2], - ['isSpent', this, this.isSpent, 2], ['getAddressHistory', this, this.getAddressHistory, 2], ['getAddressSummary', this, this.getAddressSummary, 1] ]; @@ -64,296 +56,36 @@ AddressService.prototype.getPublishEvents = function() { return []; }; -AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { +AddressService.prototype._setupListeners = function() { + this._startSubscriptions(); +}; + +AddressService.prototype._startSubscriptions = function() { + var self = this; - var txs = block.transactions; - var height = block.__height; - - var action = 'put'; - var reverseAction = 'del'; - if (!connectBlock) { - action = 'del'; - reverseAction = 'put'; + if (self._subscribed) { + return; } - var operations = []; + self._subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); - for(var i = 0; i < txs.length; i++) { + self.bus.subscribe('block/reorg'); + self.bus.on('block/block', self._onBlock.bind(self)); + self.bus.on('block/reorg', self._onReorg.bind(self)); - var tx = txs[i]; - var txid = tx.id; - var inputs = tx.inputs; - var outputs = tx.outputs; - - // Subscription messages - var txmessages = {}; - - var outputLength = outputs.length; - for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { - var output = outputs[outputIndex]; - - var script = output.script; - - if(!script) { - log.debug('Invalid script'); - continue; - } - - var address = self.getAddressString(script); - if(!address) { - continue; - } - - var key = self._encoding.encodeAddressIndexKey(address, height, txid); - operations.push({ - type: action, - key: key - }); - - // Collect data for subscribers - if (txmessages[address]) { - txmessages[address].outputIndexes.push(outputIndex); - } else { - txmessages[address] = { - tx: tx, - height: height, - outputIndexes: [outputIndex], - address: address, - timestamp: block.header.timestamp - }; - } - } - - if(tx.isCoinbase()) { - continue; - } - - //TODO deal with P2PK - for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { - var input = inputs[inputIndex]; - - if(!input.script) { - log.debug('Invalid script'); - continue; - } - - var inputAddress = self.getAddressString(input.script); - - if(!inputAddress) { - continue; - } - - var inputKey = self._encoding.encodeAddressIndexKey(inputAddress, height, txid); - - operations.push({ - type: action, - key: inputKey - }); - - } - } - setImmediate(function() { - callback(null, operations); - }); }; -AddressService.prototype.blockHandler = function(block, connectBlock, callback) { - var self = this; +AddressService.prototype._onBlock = function(block) { - var txs = block.transactions; - - var action = 'put'; - var reverseAction = 'del'; - if (!connectBlock) { - action = 'del'; - reverseAction = 'put'; - } - - var operations = []; - - async.eachSeries(txs, function(tx, next) { - var txid = tx.id; - var inputs = tx.inputs; - var outputs = tx.outputs; - - var outputLength = outputs.length; - for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { - var output = outputs[outputIndex]; - - var script = output.script; - - if(!script) { - log.debug('Invalid script'); - continue; - } - - var address = self.getAddressString(script); - - if(!address) { - continue; - } - - var key = self._encoding.encodeUtxoIndexKey(address, txid, outputIndex); - var value = self._encoding.encodeUtxoIndexValue(block.__height, output.satoshis, output._scriptBuffer); - operations.push({ - type: action, - key: key, - value: value - }); - - } - - if(tx.isCoinbase()) { - return next(); - } - - //TODO deal with P2PK - async.each(inputs, function(input, next) { - if(!input.script) { - log.debug('Invalid script'); - return next(); - } - - var inputAddress = self.getAddressString(input.script); - - if(!inputAddress) { - return next(); - } - - var inputKey = self._encoding.encodeUtxoIndexKey(inputAddress, input.prevTxId, input.outputIndex); - //common case is connecting blocks and deleting outputs spent by these inputs - if (connectBlock) { - operations.push({ - type: 'del', - key: inputKey - }); - next(); - } else { // uncommon and slower, this happens during a reorg - self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) { - var utxo = tx.outputs[input.outputIndex]; - var inputValue = self._encoding.encodeUtxoIndexValue(tx.__height, utxo.satoshis, utxo._scriptBuffer); - operations.push({ - type: 'put', - key: inputKey, - value: inputValue - }); - next(); - }); - } - }, function(err) { - if(err) { - return next(err); - } - next(); - }); - }, function(err) { - //we are aync predicated on reorg sitch - if(err) { - return callback(err); - } - callback(null, operations); - }); }; -AddressService.prototype.getAddressString = function(script, output) { - var address = script.toAddress(this.node.network.name); - if(address) { - return address.toString(); - } - - try { - var pubkey = script.getPublicKey(); - if(pubkey) { - return pubkey.toString('hex'); - } - } catch(e) { - } - - //TODO add back in P2PK, but for this we need to look up the utxo for this script - if(output && output.script && output.script.isPublicKeyOut()) { - return output.script.getPublicKey().toString('hex'); - } - - return null; +AddressService.prototype._onReorg = function(commonAncestorBlock) { }; -/** - * This function is responsible for emitting events to any subscribers to the - * `address/transaction` event. - * @param {Object} obj - * @param {Transaction} obj.tx - The transaction - * @param {Object} obj.addressInfo - * @param {String} obj.addressInfo.hashHex - The hex string of address hash for the subscription - * @param {String} obj.addressInfo.hashBuffer - The address hash buffer - * @param {String} obj.addressInfo.addressType - The address type - * @param {Array} obj.outputIndexes - Indexes of the inputs that includes the address - * @param {Array} obj.inputIndexes - Indexes of the outputs that includes the address - * @param {Date} obj.timestamp - The time of the block the transaction was included - * @param {Number} obj.height - The height of the block the transaction was included - * @param {Boolean} obj.rejected - If the transaction was not accepted in the mempool - */ -AddressService.prototype.transactionEventHandler = function(obj) { - if(this.subscriptions['address/transaction'][obj.addressInfo.hashHex]) { - var emitters = this.subscriptions['address/transaction'][obj.addressInfo.hashHex]; - var address = new Address({ - hashBuffer: obj.addressInfo.hashBuffer, - network: this.node.network, - type: obj.addressInfo.addressType - }); - for(var i = 0; i < emitters.length; i++) { - emitters[i].emit('address/transaction', { - rejected: obj.rejected, - height: obj.height, - timestamp: obj.timestamp, - inputIndexes: obj.inputIndexes, - outputIndexes: obj.outputIndexes, - address: address, - tx: obj.tx - }); - } - } -}; -/** - * The function is responsible for emitting events to any subscribers for the - * `address/balance` event. - * @param {Block} block - * @param {Object} obj - * @param {String} obj.hashHex - * @param {Buffer} obj.hashBuffer - * @param {String} obj.addressType - */ -AddressService.prototype.balanceEventHandler = function(block, obj) { - if(this.subscriptions['address/balance'][obj.hashHex]) { - var emitters = this.subscriptions['address/balance'][obj.hashHex]; - var address = new Address({ - hashBuffer: obj.hashBuffer, - network: this.node.network, - type: obj.addressType - }); - this.getBalance(address, true, function(err, balance) { - if(err) { - return this.emit(err); - } - for(var i = 0; i < emitters.length; i++) { - emitters[i].emit('address/balance', address, balance, block); - } - }); - } -}; - -/** - * The Bus will use this function to subscribe to the available - * events for this service. For information about the available events - * please see `getPublishEvents`. - * @param {String} name - The name of the event - * @param {EventEmitter} emitter - An event emitter instance - * @param {Array} addresses - An array of addresses to subscribe - */ AddressService.prototype.subscribe = function(name, emitter, addresses) { - $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); - $.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses'); for(var i = 0; i < addresses.length; i++) { var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex'); @@ -364,16 +96,7 @@ AddressService.prototype.subscribe = function(name, emitter, addresses) { } }; -/** - * The Bus will use this function to unsubscribe to the available - * events for this service. - * @param {String} name - The name of the event - * @param {EventEmitter} emitter - An event emitter instance - * @param {Array} addresses - An array of addresses to subscribe - */ AddressService.prototype.unsubscribe = function(name, emitter, addresses) { - $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); - $.checkArgument(Array.isArray(addresses) || _.isUndefined(addresses), 'Second argument is expected to be an Array of addresses or undefined'); if(!addresses) { return this.unsubscribeAll(name, emitter); @@ -391,13 +114,7 @@ AddressService.prototype.unsubscribe = function(name, emitter, addresses) { } }; -/** - * A helper function for the `unsubscribe` method to unsubscribe from all addresses. - * @param {String} name - The name of the event - * @param {EventEmitter} emitter - An instance of an event emitter - */ AddressService.prototype.unsubscribeAll = function(name, emitter) { - $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); for(var hashHex in this.subscriptions[name]) { var emitters = this.subscriptions[name][hashHex]; @@ -408,13 +125,6 @@ AddressService.prototype.unsubscribeAll = function(name, emitter) { } }; -/** - * Will sum the total of all unspent outputs to calculate the balance - * for an address. - * @param {String} address - The base58check encoded address - * @param {Boolean} queryMempool - Include mempool in the results - * @param {Function} callback - */ AddressService.prototype.getBalance = function(address, queryMempool, callback) { this.getUtxos(address, queryMempool, function(err, outputs) { if(err) { @@ -493,10 +203,6 @@ AddressService.prototype.getUtxosForAddress = function(address, queryMempool, ca }; AddressService.prototype.isUnspent = function(output, options, callback) { - $.checkArgument(_.isFunction(callback)); - this.isSpent(output, options, function(spent) { - callback(!spent); - }); }; AddressService.prototype.getAddressHistory = function(addresses, options, callback) { @@ -594,21 +300,6 @@ AddressService.prototype.getAddressTxidsWithHeights = function(address, options, }); }; -/** - * This will give an object with: - * balance - confirmed balance - * unconfirmedBalance - unconfirmed balance - * totalReceived - satoshis received - * totalSpent - satoshis spent - * appearances - number of transactions - * unconfirmedAppearances - number of unconfirmed transactions - * txids - list of txids (unless noTxList is set) - * - * @param {String} address - * @param {Object} options - * @param {Boolean} [options.noTxList] - if set, txid array will not be included - * @param {Function} callback - */ AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { var self = this; diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 9a00d004..d2ed548f 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -20,6 +20,7 @@ var BlockService = function(options) { this.db = this.node.services.db; this.subscriptions = {}; this.subscriptions.block = []; + this.subscriptions.reorg = []; this.tip = null; this._blockHeaderQueue = LRU(50); //hash -> header, height -> header, this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's @@ -46,7 +47,7 @@ BlockService.prototype.start = function(callback) { self.prefix = prefix; self.encoding = new Encoding(self.prefix); - self._setHandlers(); + self._setListeners(); callback(); }); }; @@ -84,6 +85,12 @@ BlockService.prototype.getPublishEvents = function() { scope: this, subscribe: this.subscribe.bind(this, 'block'), unsubscribe: this.unsubscribe.bind(this, 'block') + }, + { + name: 'block/reorg', + scope: this, + subscribe: this.subscribe.bind(this, 'reorg'), + unsubscribe: this.unsubscribe.bind(this, 'reorg') } ]; }; @@ -352,7 +359,7 @@ BlockService.prototype._setBlockHeaderQueue = function(header) { }; -BlockService.prototype._setHandlers = function() { +BlockService.prototype._setListeners = function() { var self = this; self.p2p.once('bestHeight', function(height) { diff --git a/lib/services/fee/index.js b/lib/services/fee/index.js new file mode 100644 index 00000000..9c853732 --- /dev/null +++ b/lib/services/fee/index.js @@ -0,0 +1,53 @@ +'use strict'; + +var assert = require('assert'); +var BaseService = require('../../service'); +var inherits = require('util').inherits; +var index = require('../../'); +var log = index.log; +var BitcoreRPC = require('bitcoind-rpc'); + +var FeeService = function(options) { + this._config = options.rpc || { + user: 'bitcoin', + pass: 'local321', + host: 'localhost', + protocol: 'http', + port: this._getDefaultPort() + }; + BaseService.call(this, options); + +}; + +inherits(FeeService, BaseService); + +FeeService.dependencies = []; + +FeeService.prototype.start = function() { + return this.node.network.port - 1; +}; + +FeeService.prototype.start = function(callback) { + callback(); +}; + +FeeService.prototype.stop = function(callback) { + callback(); +}; + +FeeService.prototype.getAPIMethods = function() { + return [ + ['estimateFee', this, this.estimateFee, 1] + ]; +}; + +FeeService.prototype.estimateFee = function(blocks, callback) { + var client = new BitcoreRPC(this._config); + client.estimateFee(blocks || 4, callback); +}; + + + + +module.exports = FeeService; + diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 007b16ef..4088efc9 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -19,29 +19,8 @@ var MempoolService = function(options) { util.inherits(MempoolService, BaseService); -MempoolService.dependencies = [ 'bitcoind', 'db' ]; +MempoolService.dependencies = ['db']; -MempoolService.prototype.blockHandler = function(block, connectBlock, callback) { - var self = this; - - if (!self._handleBlocks) { - return setImmediate(callback); - } - - var txs = block.transactions; - - var action = 'del'; - if (!connectBlock) { - action = 'put'; - } - - for(var i = 0; i < txs.length; i++) { - - var tx = txs[i]; - self._updateMempool(tx, action); - } - setImmediate(callback); -}; MempoolService.prototype.getPublishEvents = function() { return []; @@ -53,39 +32,37 @@ MempoolService.prototype.getAPIMethods = function() { MempoolService.prototype.start = function(callback) { var self = this; - self.node.services.db.on('synced', function() { - var bus = self.node.openBus({ remoteAddress: 'localhost' }); - bus.subscribe('bitcoind/rawtransaction'); - bus.on('bitcoind/rawtransaction', function(txHex) { - var tx = new bitcore.Transaction(txHex); - self._updateMempool(tx, 'put'); - }); - self._handleBlocks = true; - }); self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { if(err) { return callback(err); } self.servicePrefix = servicePrefix; self._encoding = new Encoding(self.servicePrefix); + self._setListeners(); callback(); }); }; -/** - * Function to be called when bitcore-node is stopped - */ -MempoolService.prototype.stop = function(done) { - setImmediate(done); +MempoolService.prototype._setListeners = function() { + this._startSubscriptions(); +}; + +MempoolService.prototype._startSubscriptions = function() { + var bus = self.node.openBus({ remoteAddress: 'localhost' }); + bus.on('transaction/transaction', this._onTransaction.bind(self)); + bus.subscribe('tranaction/transaction'); +}; + +MempoolService.prototype._onTransaction = function(tx) { + +}; + +MempoolService.prototype.stop = function(callback) { + callback(); }; -/** - * Setup express routes - * @param {Express} app - */ MempoolService.prototype.setupRoutes = function() { - // Setup express routes here }; MempoolService.prototype.getRoutePrefix = function() { @@ -165,17 +142,12 @@ MempoolService.prototype.getAddressString = function(script, output) { return pubkey.toString('hex'); } } catch(e) { - //log.warn('Error getting public key from: ', script.toASM(), script.toHex()); - // if there is an error, it's because a pubkey can not be extracted from the script - // continue on and return null } - //TODO add back in P2PK, but for this we need to look up the utxo for this script if(output && output.script && output.script.isPublicKeyOut()) { return output.script.getPublicKey().toString('hex'); } - //log.warn('No utxo given for script spending a P2PK: ', script.toASM(), script.toHex()); return null; }; diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index aaa9cb33..de4fa198 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -7,14 +7,6 @@ var inherits = require('util').inherits; var Encoding = require('./encoding'); var levelup = require('levelup'); -/** - * The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional - * functionality for getting information by bitcoin transaction hash/id. This includes the current - * bitcoin memory pool as validated by a trusted bitcoind instance. - * @param {Object} options - * @param {Node} options.node - An instance of the node - * @param {String} options.name - An optional name of the service - */ function TransactionService(options) { BaseService.call(this, options); this.concurrency = options.concurrency || 20; @@ -40,6 +32,7 @@ TransactionService.prototype.start = function(callback) { } self.prefix = prefix; self.encoding = new Encoding(self.prefix); + self._setListeners(); callback(); }); }; @@ -48,62 +41,27 @@ TransactionService.prototype.stop = function(callback) { setImmediate(callback); }; -TransactionService.prototype.blockHandler = function(block, connectBlock, callback) { + +TransactionService.prototype._setListeners = function() { + this._startSubscriptions(); +}; + +TransactionService.prototype._startSubscriptions = function() { var self = this; - var action = 'put'; - var reverseAction = 'del'; - if (!connectBlock) { - action = 'del'; - reverseAction = 'put'; + + if (self._subscribed) { + return; } - var operations = []; + self._subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); - this.currentTransactions = {}; + self.bus.on('block/block', self._onBlock.bind(self)); + self.bus.subscribe('block/block'); - async.series([ - function(next) { - // if the timestamp service does not have our data yet, but might in the future, we don't want to let this hold us up - // - self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) { - if(err) { - return next(err); - } - block.__timestamp = timestamp; - next(); - }); - }, function(next) { - async.eachSeries(block.transactions, function(tx, next) { - tx.__timestamp = block.__timestamp; - tx.__height = block.__height; - - self._getInputValues(tx, function(err, inputValues) { - if(err) { - return next(err); - } - tx.__inputValues = inputValues; - self.currentTransactions[tx.id] = tx; - - operations.push({ - type: action, - key: self.encoding.encodeTransactionKey(tx.id), - value: self.encoding.encodeTransactionValue(tx) - }); - next(); - }); - }, function(err) { - if(err) { - return next(err); - } - next(); - }); - }], function(err) { - if(err) { - return callback(err); - } - callback(null, operations); - }); +}; +TransactionService.prototype._onBlock = function(block) { }; TransactionService.prototype._getMissingInputValues = function(tx, callback) { diff --git a/lib/utils.js b/lib/utils.js index 3a32aaa4..5062faa3 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -132,4 +132,27 @@ utils.toIntIfNumberLike = function(a) { return a; }; +utils.getAddressString = function(script, output) { + var address = script.toAddress(this.node.network.name); + if(address) { + return address.toString(); + } + + try { + var pubkey = script.getPublicKey(); + if(pubkey) { + return pubkey.toString('hex'); + } + } catch(e) { + } + + //TODO add back in P2PK, but for this we need to look up the utxo for this script + if(output && output.script && output.script.isPublicKeyOut()) { + return output.script.getPublicKey().toString('hex'); + } + + return null; +}; + + module.exports = utils;