From 5a372f268cc77c8e5ec87cced24354b721b0b99c Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 20 Jan 2017 17:28:25 -0500 Subject: [PATCH] wip --- contrib/printKeys.js | 36 ++++++++---- lib/services/address/index.js | 80 ++++++++++++++----------- lib/services/timestamp.js | 14 ++++- lib/services/transaction.js | 107 ++++++++++++++++++++++++++++------ livetest/index.js | 93 +++++++++++++++++++++++++++++ 5 files changed, 266 insertions(+), 64 deletions(-) create mode 100644 livetest/index.js diff --git a/contrib/printKeys.js b/contrib/printKeys.js index 295b222b..368acc91 100644 --- a/contrib/printKeys.js +++ b/contrib/printKeys.js @@ -3,32 +3,46 @@ var levelup = require('levelup'); var leveldown = require('leveldown'); var Encoding = require('../lib/services/address/encoding'); -var dbPath = '/Users/patrick/.bitcore/bitcore-node.db'; - +var dbPath = '/Users/chrisk/.bwdb/bitcore-node.db'; +var bitcore = require('bitcore-lib'); var db = levelup(dbPath, {keyEncoding: 'binary', valueEncoding: 'binary'}); var prefix = new Buffer('0002', 'hex'); var encoding = new Encoding(prefix); -var address = '19k8nToWwMGuF4HkNpzgoVAYk4viBnEs5D'; +var address = '1MfDRRVVKXUe5KNVZzu8CBzUZDHTTYZM94'; var addressLength = new Buffer(1); addressLength.writeUInt8(address.length); //var startBuffer = prefix; //var endBuffer = Buffer.concat([prefix, new Buffer('ff', 'hex')]); -var startBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('00', 'hex')]); -var endBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('ff', 'hex')]); - +//var startBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('00', 'hex')]); +//var endBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('01', 'hex')]); +var start = Buffer.concat([prefix, new Buffer('0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9', 'hex')]); +var end = Buffer.concat([prefix, new Buffer('0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9', 'hex'), new Buffer('01', 'hex')]); var stream = db.createReadStream({ - gte: startBuffer, - lt: endBuffer + gte: start, + lt: end }); - stream.on('data', function(data) { - console.log(encoding.decodeAddressIndexKey(data.key), encoding.decodeAddressIndexValue(data.value)); + var txkey = data.key.slice(2).toString('hex'); + var height = data.value.readUInt32BE(); + var timestamp = data.value.readDoubleBE(4); + var inputValues = []; + var inputValuesLength = data.value.readUInt16BE(12); + for(var i = 0; i < inputValuesLength / 8; i++) { + inputValues.push(buffer.readDoubleBE(i * 8 + 14)); + } + var transaction = new bitcore.Transaction(data.value.slice(inputValues.length * 8 + 14)); + transaction.__height = height; + transaction.__inputValues = inputValues; + transaction.__timestamp = timestamp; + //console.log(txkey, transaction.toObject()); + console.log(data.value); + console.log(transaction.__height, transaction.__inputValues, transaction.__timestamp); //console.log(data.key.toString('hex'), data.value.toString('hex')); }); stream.on('end', function() { console.log('end'); -}); \ No newline at end of file +}); diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 20bf28f1..2094e079 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -466,12 +466,12 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options txidBuffer = new Buffer(txid, 'hex'); } if (options.queryMempool) { - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex); + var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex); if (this.mempoolSpentIndex[spentIndexSyncKey]) { return this._getSpentMempool(txidBuffer, outputIndex, callback); } } - var key = encoding.encodeInputKeyMap(txidBuffer, outputIndex); + var key = self.encoding.encodeInputKeyMap(txidBuffer, outputIndex); var dbOptions = { valueEncoding: 'binary', keyEncoding: 'binary' @@ -482,7 +482,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options } else if (err) { return callback(err); } - var value = encoding.decodeInputValueMap(buffer); + var value = self.encoding.decodeInputValueMap(buffer); callback(null, { inputTxId: value.inputTxId.toString('hex'), inputIndex: value.inputIndex @@ -519,7 +519,7 @@ AddressService.prototype.createInputsStream = function(addressStr, options) { AddressService.prototype.createInputsDBStream = function(addressStr, options) { var stream; - var addrObj = encoding.getAddressInfo(addressStr); + var addrObj = this.encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; @@ -588,7 +588,7 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) { var inputs = []; - var addrObj = encoding.getAddressInfo(addressStr); + var addrObj = self.encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; @@ -733,7 +733,7 @@ AddressService.prototype.createOutputsStream = function(addressStr, options) { AddressService.prototype.createOutputsDBStream = function(addressStr, options) { - var addrObj = encoding.getAddressInfo(addressStr); + var addrObj = this.encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; var stream; @@ -805,7 +805,7 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) { $.checkArgument(_.isObject(options), 'Second argument is expected to be an options object.'); $.checkArgument(_.isFunction(callback), 'Third argument is expected to be a callback function.'); - var addrObj = encoding.getAddressInfo(addressStr); + var addrObj = self.encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; if (!hashTypeBuffer) { @@ -880,7 +880,7 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h // prefix: 1, hashBuffer: 20, hashTypeBuffer: 1, txid: 32, outputIndex: 4 var txid = data.key.slice(22, 54); var outputIndex = data.key.readUInt32BE(54); - var value = encoding.decodeOutputMempoolValue(data.value); + var value = self.encoding.decodeOutputMempoolValue(data.value); var output = { address: addressStr, hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], @@ -953,25 +953,36 @@ AddressService.prototype.getUnspentOutputsForAddress = function(address, queryMe var self = this; - this.getOutputs(address, {queryMempool: queryMempool}, function(err, outputs) { - if (err) { - return callback(err); - } else if(!outputs.length) { - return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); - } + var addressLengthBuffer = new Buffer(1); + addressLengthBuffer.writeUInt8(address.length); + var start = Buffer.concat([ self.prefix, addressLengthBuffer, new Buffer(address, 'utf8'), new Buffer('00', 'hex') ]); + var end = Buffer.concat([ self.prefix, addressLengthBuffer, new Buffer(address, 'utf8'), new Buffer('01', 'hex') ]); + var stream = self.store.createReadStream({ + gte: start, + lt: end + }); - var opts = { - queryMempool: queryMempool - }; - - var isUnspent = function(output, callback) { - self.isUnspent(output, opts, callback); - }; - - async.filter(outputs, isUnspent, function(results) { - callback(null, results); + var utxos = []; + stream.on('data', function(data) { + var key = self.encoding.decodeAddressIndexKey(data.key); + var value = self.encoding.decodeAddressIndexValue(data.value); + utxos.push({ + address: key.address, + txid: key.txid, + outputIndex: key.index, + satoshis: value.satoshis, + height: key.height }); }); + + stream.on('end', function() { + return callback(null, utxos); + }); + stream.on('error', function(err) { + if(err) { + return callback(err); + } + }); }; /** @@ -1003,7 +1014,7 @@ AddressService.prototype.isSpent = function(output, options, callback) { var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); if (!spent && queryMempool) { var txidBuffer = new Buffer(txid, 'hex'); - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); + var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; } setImmediate(function() { @@ -1047,12 +1058,13 @@ AddressService.prototype.isSpent = function(output, options, callback) { * @param {Function} callback */ AddressService.prototype.getAddressHistory = function(addresses, options, callback) { - var history = new AddressHistory({ - node: this.node, - options: options, - addresses: addresses - }); - history.get(callback); + + //var history = new AddressHistory({ + // node: this.node, + // options: options, + // addresses: addresses + //}); + //history.get(callback); }; /** @@ -1193,7 +1205,7 @@ AddressService.prototype._getAddressConfirmedOutputsSummary = function(address, if(options.queryMempool) { // Check to see if this output is spent in the mempool and if so // we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta) - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey( new Buffer(txid, 'hex'), // TODO: get buffer directly outputIndex ); @@ -1252,7 +1264,7 @@ AddressService.prototype._getAddressMempoolSummary = function(address, options, var addressStr = address.toString(); var hashBuffer = address.hashBuffer; var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; - var addressIndexKey = encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer); + var addressIndexKey = self.encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer); if(!this.mempoolAddressIndex[addressIndexKey]) { return callback(null, result); @@ -1282,7 +1294,7 @@ AddressService.prototype._getAddressMempoolSummary = function(address, options, result.unconfirmedAppearanceIds[output.txid] = output.timestamp; if(!options.noBalance) { - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey( new Buffer(output.txid, 'hex'), // TODO: get buffer directly output.outputIndex ); diff --git a/lib/services/timestamp.js b/lib/services/timestamp.js index 9fa530c7..e69b5b61 100644 --- a/lib/services/timestamp.js +++ b/lib/services/timestamp.js @@ -4,7 +4,8 @@ var inherits = require('util').inherits; function TimestampService(options) { BaseService.call(this, options); - + this.currentBlock = null; + this.currentTimestamp = null; } inherits(TimestampService, BaseService); @@ -65,6 +66,9 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback timestamp = lastTimestamp + 1; } + self.currentBlock = block.hash; + self.currentTimestamp = timestamp; + operations = operations.concat( [ { @@ -87,6 +91,12 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback TimestampService.prototype.getTimestamp = function(hash, callback) { var self = this; + if (hash === self.currentBlock) { + return setImmediate(function() { + callback(null, self.currentTimestamp); + }); + } + var key = self._encodeBlockTimestampKey(hash); self.store.get(key, function(err, buffer) { if(err) { @@ -133,4 +143,4 @@ TimestampService.prototype._decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -module.exports = TimestampService; \ No newline at end of file +module.exports = TimestampService; diff --git a/lib/services/transaction.js b/lib/services/transaction.js index b75879a8..3f4c290e 100644 --- a/lib/services/transaction.js +++ b/lib/services/transaction.js @@ -1,18 +1,21 @@ 'use strict'; + +var async = require('async'); var BaseService = require('../service'); var inherits = require('util').inherits; var bitcore = require('bitcore-lib'); function TransactionService(options) { BaseService.call(this, options); - + this.concurrency = options.concurrency || 20; this.currentTransactions = {}; } inherits(TransactionService, BaseService); TransactionService.dependencies = [ - 'db' + 'db', + 'timestamp' ]; TransactionService.prototype.start = function(callback) { @@ -36,6 +39,7 @@ TransactionService.prototype.stop = function(callback) { }; TransactionService.prototype.blockHandler = function(block, connectBlock, callback) { + var self = this; var action = 'put'; if (!connectBlock) { action = 'del'; @@ -45,22 +49,68 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba this.currentTransactions = {}; - for(var i = 0; i < block.transactions.length; i++) { - var tx = block.transactions[i]; - tx.__height = block.__height; + async.series([ + function(next) { + self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) { + if(err) { + return next(err); + } + block.__timestamp = timestamp; + next(); + }); + }, function(next) { + async.eachSeries(block.transactions, function(tx, next) { + tx.__timestamp = block.__timestamp; + tx.__height = block.__height; - this.currentTransactions[tx.id] = tx; + self._getInputValues(tx, function(err, inputValues) { + if(err) { + return next(err); + } + tx.__inputValues = inputValues; + self.currentTransactions[tx.id] = tx; - operations.push({ - type: action, - key: this._encodeTransactionKey(tx.id), - value: this._encodeTransactionValue(tx) + operations.push({ + type: action, + key: self._encodeTransactionKey(tx.id), + value: self._encodeTransactionValue(tx) + }); + next(); + }); + }, function(err) { + if(err) { + return next(err); + } + next(); + }); + }], function(err) { + if(err) { + return callback(err); + } + callback(null, operations); }); + +}; + +TransactionService.prototype._getInputValues = function(tx, callback) { + var self = this; + + if (tx.isCoinbase()) { + return callback(null, []); } - setImmediate(function() { - callback(null, operations); - }); + async.mapLimit(tx.inputs, this.concurrency, function(input, next) { + self.getTransaction(input.prevTxId.toString('hex'), function(err, prevTx) { + if(err) { + return next(err); + } + if (!prevTx.outputs[input.outputIndex]) { + return next(new Error('Input did not have utxo.')); + } + var satoshis = prevTx.outputs[input.outputIndex].satoshis; + next(null, satoshis); + }); + }, callback); }; TransactionService.prototype.getTransaction = function(txid, callback) { @@ -92,16 +142,39 @@ TransactionService.prototype._decodeTransactionKey = function(buffer) { return buffer.slice(2).toString('hex'); }; -TransactionService.prototype._encodeTransactionValue = function(transaction, height) { +TransactionService.prototype._encodeTransactionValue = function(transaction) { var heightBuffer = new Buffer(4); - heightBuffer.writeUInt32BE(); - return new Buffer.concat([heightBuffer, transaction.toBuffer()]); + heightBuffer.writeUInt32BE(transaction.__height); + + var timestampBuffer = new Buffer(8); + timestampBuffer.writeDoubleBE(transaction.__timestamp); + + var inputValues = transaction.__inputValues; + var inputValuesBuffer = new Buffer(8 * inputValues.length); + for(var i = 0; i < inputValues.length; i++) { + inputValuesBuffer.writeDoubleBE(inputValues[i], i * 8); + } + + var inputValuesLengthBuffer = new Buffer(2); + inputValuesLengthBuffer.writeUInt16BE(inputValues.length * 8); + + return new Buffer.concat([heightBuffer, timestampBuffer, inputValuesLengthBuffer, inputValuesBuffer, transaction.toBuffer()]); }; TransactionService.prototype._decodeTransactionValue = function(buffer) { var height = buffer.readUInt32BE(); - var transaction = new bitcore.Transaction(buffer.slice(4)); + + var timestamp = buffer.readDoubleBE(4); + + var inputValues = []; + var inputValuesLength = buffer.readUInt16BE(12); + for(var i = 0; i < inputValuesLength / 8; i++) { + inputValues.push(buffer.readDoubleBE(i * 8 + 14)); + } + var transaction = new bitcore.Transaction(buffer.slice(inputValues.length * 8 + 14)); transaction.__height = height; + transaction.__inputValues = inputValues; + transaction.__timestamp = timestamp; return transaction; }; diff --git a/livetest/index.js b/livetest/index.js new file mode 100644 index 00000000..b5af5c75 --- /dev/null +++ b/livetest/index.js @@ -0,0 +1,93 @@ +'use strict'; + +var assert = require('assert'); +var async = require('async'); +var BitcoreNode = require('../'); +var db = require('../lib/services/db'); +var config = { + "network": "livenet", + "port": 3001, + "datadir": "/Users/chrisk/.bwdb/", + "services": [ + { + "name": "bitcoind", + "config": { + "connect": [ + { + "rpcport": 8332, + "rpcuser": "bitcoin", + "rpcpassword": "local321", + "zmqpubrawtx": "tcp://127.0.0.1:28332" + } + ] + }, + "module": require('../lib/services/bitcoind') + }, + { + "name": "db", + "config": {}, + "module": db + }, + { + "name": "transaction", + "config": {}, + "module": require('../lib/services/transaction') + }, + { + "name": "address", + "config": {}, + "module": require('../lib/services/address') + }, + { + "name": "timestamp", + "config": {}, + "module": require('../lib/services/timestamp') + } + ], + "servicesConfig": { + "bitcoind": { + "connect": [ + { + "rpcport": 8332, + "rpcuser": "bitcoin", + "rpcpassword": "local321", + "zmqpubrawtx": "tcp://127.0.0.1:28332" + } + ] + } + }, + "path": "/Users/chrisk/source/zzbitcore_node/bitcore-node.json" +} +db.prototype.sync = function(){}; +var node = new BitcoreNode.Node(config); +node.start(function(err) { + if(err) { + throw err; + } + var addresses = [ '1MfDRRVVKXUe5KNVZzu8CBzUZDHTTYZM94' ]; + async.series([function(next) { + node.services.address.getUnspentOutputs(addresses, false, function(err, results) { + if(err) { + throw err; + } + console.log(results); + next(); + }); + }, function(next) { + node.services.address.getAddressHistory(addresses, false, function(err, results) { + if(err) { + return callback(err); + } + console.log(results); + next(); + }); + }], function(err) { + node.stop(function(err) { + if(err) { + return callback(err); + } + process.exit(0); + }); + }); + +});