From 87df236e35b6c199732916b17d41557a6b739907 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 30 May 2017 08:01:13 -0400 Subject: [PATCH] wip --- lib/constants.js | 11 + lib/services/block/index.js | 137 ++-- lib/services/p2p/index.js | 147 ++-- package.json | 2 +- regtest/db.js | 27 +- regtest/p2p.js | 1518 +++++------------------------------ regtest/test_bus.js | 47 ++ regtest/utils.js | 23 +- 8 files changed, 435 insertions(+), 1477 deletions(-) create mode 100644 lib/constants.js create mode 100644 regtest/test_bus.js diff --git a/lib/constants.js b/lib/constants.js new file mode 100644 index 00000000..3d1131be --- /dev/null +++ b/lib/constants.js @@ -0,0 +1,11 @@ +'use strict'; + + +module.exports = { + BITCOIN_GENESIS_HASH: { + livenet: '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', + regtest: '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206', + testnet: '000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943' //this is testnet3 + } +}; + diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 863fb6e1..6582ef38 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -14,6 +14,7 @@ var BlockHandler = require('./block_handler'); var LRU = require('lru-cache'); var utils = require('../../utils'); var _ = require('lodash'); +var constants = require('../../constants'); var BlockService = function(options) { BaseService.call(this, options); @@ -27,8 +28,8 @@ var BlockService = function(options) { keyEncoding: 'string', valueEncoding: 'binary' }; - this._blockQueue = LRU(50); //hash -> header, height -> header, - this._rawBlockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's + this._blockHeaderQueue = LRU(50); //hash -> header, height -> header, + this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's }; inherits(BlockService, BaseService); @@ -177,20 +178,10 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { self._isGenesisBlock.bind(self, blockArg), - function(block, next) { - - if (block) { - return next(null, self.genesis); - } - - var cachedBlock = self._rawBlockQueue.get(blockArg); - next(null, cachedBlock); - }, - function(block, next) { if (block) { - return next(null, block); + return next(null, self.genesis); } self._getBlock(blockArg, next); @@ -209,13 +200,25 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { // index, but we want to mark it as "-REORG" var reorgOperations = self._getReorgOperations(reorgHash, reorgHeight); - var headers = blocks.map(function(block) { - return { + var headers = []; + var tipIndexHeight = self.tip.__height; + + for(var i = 0; i < blocks.length; i++) { + + var block = blocks[i]; + if (block.__height !== ++tipIndexHeight) { + block.height = tipIndexHeight; + block.__height = tipIndexHeight; + self._blockQueue.set(block.hash, block); + } + + headers.push({ hash: block.hash, prevHash: utils.reverseBufferToString(block.header.prevHash), - height: block.__height - }; - }); + height: tipIndexHeight + }); + + } var operations = self._getBlockOperations(headers); @@ -236,7 +239,8 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { BlockService.prototype.getBlockHeader = function(blockArg, callback) { var self = this; - var header = self._blockQueue.get(blockArg); + var header = self._blockHeaderQueue.get(blockArg); + if (header) { return setImmediate(function() { callback(null, header); @@ -249,7 +253,7 @@ BlockService.prototype.getBlockHeader = function(blockArg, callback) { return callback(err); } - self._setBlockQueue(header); + self._setBlockHeaderQueue(header); callback(null, header); }); @@ -282,10 +286,10 @@ BlockService.prototype._startSubscriptions = function() { log.info('New block received: ' + block.hash + ' at height: ' + block.height); self._cacheRawBlock(block); var header = self._getHeader(block); - self._setBlockQueue(header); + self._setBlockHeaderQueue(header); self._detectReorg([block], function() { - self._blockHandler.sync(block); + self._blockHandler.sync(block); }); }); @@ -297,7 +301,7 @@ BlockService.prototype._startSubscriptions = function() { BlockService.prototype._cacheRawBlock = function(block) { log.debug('Setting block: ' + block.hash + ' in the raw block cache.'); - this._rawBlockQueue.set(block.hash, block); + this._blockQueue.set(block.hash, block); }; BlockService.prototype._getBlocks = function(callback) { @@ -367,48 +371,46 @@ BlockService.prototype._getHeader = function(block) { }; }; -BlockService.prototype._setBlockQueue = function(header) { +BlockService.prototype._setBlockHeaderQueue = function(header) { - this._blockQueue.set(header.height, header); - this._blockQueue.set(header.hash, header); + this._blockHeaderQueue.set(header.height, header); + this._blockHeaderQueue.set(header.hash, header); }; BlockService.prototype._setHandlers = function() { var self = this; + self.node.once('ready', function() { - self.genesis = bitcore.Block.fromBuffer(self.bitcoind.genesisBuffer); - self.genesis.__height = 0; - self.genesis.height = 0; - - var genesisHeader = self._getHeader(self.genesis); - self._setBlockQueue(genesisHeader); - self.db.batch(self._getBlockOperations(genesisHeader), function(err) { - - if (err) { - log.error('Unable to store genesis block in block index.'); - return; - } - - self._sync(); - }); + self._sync(); }); self._blockHandler.on('synced', function() { + log.info('Synced: ' + self.tip.hash); self._startSubscriptions(); + }); }; +BlockService.prototype._getGenesisBlock = function() { + + this.genesis = {}; + this.genesis.height = 0; + this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; + return this.genesis; + +}; + BlockService.prototype._detectStartupReorg = function(callback) { var self = this; - // does our block's header exist on the network. and if, so, is it the correct height? - var height = self.bitcoind.height; - var hash = self.bitcoind.tiphash; + if (self.tip.height === 0) { + return callback(); + } self.getBlockHeader(hash, function(err, header) { @@ -420,7 +422,8 @@ BlockService.prototype._detectStartupReorg = function(callback) { return callback(); } - self._handleReorg(header.hash, callback); + var opts = { preReorgTipHash: hash, preReorgTipHeight: header.height }; + self._handleReorg(header.hash, opts, callback); }); }; @@ -444,12 +447,21 @@ BlockService.prototype._handleReorg = function(hash, callback) { self.printTipInfo('Reorg successful!'); self.reorg = false; - callback(); + self.cleanupAfterReorg(callback); }); }; +BlockService.prototype._decodeTipData = function(tipDataBuf) { + var hash = tipDataBuf.slice(0, 32).toString('hex'); + var height = tipDataBuf.slice(32).toString('hex').readUInt32BE(); + return { + hash: hash, + height: height + }; +}; + BlockService.prototype._loadTips = function(callback) { var self = this; @@ -464,25 +476,15 @@ BlockService.prototype._loadTips = function(callback) { return next(err); } - var hash; if (!tipData) { - self[tip] = self.genesis; + self[tip] = self._getGenesisBlock(); return next(); } - hash = tipData.slice(0, 32).toString('hex'); + self[tip] = self._decodeTipData(tipData); + log.info('loaded ' + tip + ' hash: ' + self[tip].hash + ' height: ' + self[tip].height); + next(); - self._getBlock(hash, function(err, block) { - - if(err) { - return next(err); - } - - self[tip] = block; - log.info('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height); - next(); - - }); }); }, function(err) { @@ -502,7 +504,6 @@ BlockService.prototype._detectReorg = function(blocks, callback) { var tipHash = this.reorgHash || this.tip.hash; var tipHeight = this.reorgHeight || this.tip.__height; - var forkedHash; for(var i = 0; i < blocks.length; i++) { @@ -510,8 +511,10 @@ BlockService.prototype._detectReorg = function(blocks, callback) { continue; } - if (utils.reverseBufferToString(blocks[i].header.prevHash) !== tipHash) { - return this._handleReorg(forkedHash, callback.bind(this, tipHash, tipHeight)); + var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash); + if (prevHash !== tipHash) { + var opts = { preReorgTipHash: tipHash, preReorgTipHeight: tipHeight }; + return this._handleReorg(prevHash, opts, callback); } tipHash = blocks[i].hash; @@ -583,6 +586,14 @@ BlockService.prototype._isGenesisBlock = function(blockArg, callback) { BlockService.prototype._getBlock = function(blockArg, callback) { var self = this; + + var cachedBlock = self._blockQueue.get(blockArg); + if (cachedBlock) { + return setImmediate(function() { + callback(null, cachedBlock); + }); + } + self.bitcoind.getBlock(blockArg, function(err, block) { if(err) { diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index c6cb2e99..d4e0517d 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -1,14 +1,14 @@ 'use strict'; var p2p = require('bitcore-p2p'); -var messages = new p2p.Messages(); var LRU = require('lru-cache'); var util = require('util'); var _ = require('lodash'); var bitcore = require('bitcore-lib'); -var index = require('../'); +var index = require('../../'); var log = index.log; -var Service = require('../../service'); +var BaseService = require('../../service'); +var constants = require('../../constants'); /* Purpose: @@ -19,37 +19,42 @@ var Service = require('../../service'); 5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters) */ var P2P = function(options) { + if (!(this instanceof P2P)) { return new P2P(options); } - Service.call(this, options); + + BaseService.call(this, options); this.options = options; this._maxPeers = this.options.maxPeers || 60; this._peers = this.options.peers; this.subscriptions = {}; + this.subscriptions.block = []; + this.subscriptions.transaction = []; + this.messages = new p2p.Messages({ network: this.node.network }); + }; -util.inherits(P2P, Service); +util.inherits(P2P, BaseService); P2P.dependencies = []; P2P.prototype.start = function(callback) { + var self = this; - self.once('synced', function() { - self._initPool(); - self._initCache(); - self._pool.connect(); - self._synced = true; - }); + self._initCache(); + self._initPool(); + this._setupListeners(); callback(); + }; P2P.prototype.stop = function(callback) { + var self = this; - setImmediate(function() { - self._pool.disconnect(); - callback(); - }); + self._pool.disconnect(); + callback(); + }; P2P.prototype.subscribe = function(name, emitter) { @@ -67,71 +72,83 @@ P2P.prototype.unsubscribe = function(name, emitter) { P2P.prototype._initCache = function() { this._inv = LRU(2000); - this._cache = LRU({ - max: this._maxMempoolSize, - length: function(tx) { return tx.toBuffer().length; } - }); + this._cache = []; }; P2P.prototype._initPool = function() { var opts = {}; - var _addrs = []; - if (this.addrs && _.isArray(this.addrs)) { - for(var i = 0; i < this.addrs.length; i++) { - _addrs.push({ - ip: { - v4: this.addrs[i] - } - }); - } - opts.addrs = _addrs; + if (this._peers) { + opts.addrs = this._peers; opts.dnsSeed = false; } opts.maxPeers = this._maxPeers; opts.network = this.node.getNetworkName(); this._pool = new p2p.Pool(opts); - this._setupListeners(); -}; - -P2P.prototype.validTx = function(tx) { - return tx; }; P2P.prototype._setupListeners = function() { var self = this; self._pool.on('peerready', function(peer, addr) { - log.info('Connected to peer: ' + addr.ip.v4); - peer.sendMessage(messages.MemPool()); + + log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' + + peer.network.alias + ', version: ' + peer.version + ', subversion: ' + + peer.subversion + ', status: ' + peer.status + ', port: ' + + peer.port + ', best height: ' + peer.bestHeight); + peer.sendMessage(self.messages.MemPool()); }); self._pool.on('peerdisconnect', function(peer, addr) { + log.info('Disconnected from peer: ' + addr.ip.v4); + }); self._pool.on('peerinv', function(peer, message) { - var invList = []; + + var newDataNeeded = []; + + message.inventory.forEach(function(inv) { - var hash = self._inv.get(inv.hash); - if (inv.type === 1 && !hash) { + + if (!self._inv.get(inv.hash)) { + self._inv.set(inv.hash, true); - invList.push(inv); + + //self._generateRetrievalMessage() + newDataNeeded.push(inv); + } + }); - peer.sendMessage(messages.GetData(invList)); + + if (newDataNeeded.length > 0) { + peer.sendMessage(self.messages.GetData(newDataNeeded)); + } + }); - self._pool.on('peertx', function(peer, message) { - var tx = new bitcore.Transaction(message.transaction); - if (self.validTx(tx)) { - return self._cache.set(tx.id, tx); - } - return self._operations.push({ - type: 'put', - key: new Buffer(tx.id), - value: tx.toBuffer() - }); + self._pool.on('peertx', self._broadcastTx.bind(self)); + self._pool.on('peerblock', self._broadcastBlock.bind(self)); + + self.node.on('ready', function() { + setTimeout(function() { + self._pool.connect(); + }, 1000); }); + +}; + +P2P.prototype._broadcastTx = function(peer, message) { + for (var i = 0; i < this.subscriptions.transaction.length; i++) { + this.subscriptions.transaction[i].emit('p2p/transaction', message.transaction); + } +}; + +P2P.prototype._broadcastBlock = function(peer, message) { + for (var i = 0; i < this.subscriptions.block.length; i++) { + this.subscriptions.block[i].emit('p2p/block', message.block); + } }; P2P.prototype.getAPIMethods = function() { @@ -152,35 +169,7 @@ P2P.prototype.getPublishEvents = function() { subscribe: this.subscribe.bind(this, 'block'), unsubscribe: this.unsubscribe.bind(this, 'block') }, - { - name: 'bitcoind/rawblock', - scope: this, - subscribe: this.subscribe.bind(this, 'rawblock'), - unsubscribe: this.unsubscribe.bind(this, 'rawblock') - } ]; - return []; -}; - -P2P.prototype.blockHandler = function(block, connected, callback) { - var self = this; - - var operations = []; - - if (!self._synced) { - return callback(operations); - } - - var action = 'put'; - var reverseAction = 'del'; - if (!connected) { - action = 'del'; - reverseAction = 'put'; - } - - block.transactions.forEach(function(tx) { - self._cache.del(tx.id); - }); }; module.exports = P2P; diff --git a/package.json b/package.json index 338f2e31..b1dda387 100644 --- a/package.json +++ b/package.json @@ -72,7 +72,7 @@ }, "devDependencies": { "benchmark": "1.0.0", - "bitcore-p2p": "^1.1.0", + "bitcore-p2p": "", "chai": "^3.5.0", "coveralls": "^2.11.9", "istanbul": "^0.4.3", diff --git a/regtest/db.js b/regtest/db.js index 57b16566..f10b3157 100644 --- a/regtest/db.js +++ b/regtest/db.js @@ -4,7 +4,7 @@ var chai = require('chai'); var expect = chai.expect; var async = require('async'); var path = require('path'); -var utils = require('./utils'); +var Utils = require('./utils'); var zmq = require('zmq'); var http = require('http'); var blocks = require('../test/data/blocks.json'); @@ -80,6 +80,7 @@ var opts = { bitcoreDataDir: bitcoreDataDir, blockHeight: 0 }; +var utils = new Utils(opts); var genesis = new Block(new Buffer(blocks.genesis, 'hex')); var block1 = new Block(new Buffer(blocks.block1a, 'hex')); @@ -109,7 +110,7 @@ function publishBlockHash(rawBlockHex, callback) { pubSocket.send([ 'rawblock', new Buffer(rawBlockHex, 'hex') ]); - var httpOpts = utils.getHttpOpts(opts, { path: '/info' }); + var httpOpts = utils.getHttpOpts({ path: '/info' }); // we don't know exactly when all the blockhandlers will complete after the "tip" event // so we must wait an indeterminate time to check on the current tip @@ -184,10 +185,8 @@ describe('DB Operations', function() { setupFakeZmq(); - self.opts = Object.assign({}, opts); - - utils.startBitcoreNode(self.opts, function() { - utils.waitForBitcoreNode(self.opts, done); + utils.startBitcoreNode(function() { + utils.waitForBitcoreNode(done); }); }); @@ -211,17 +210,15 @@ describe('DB Operations', function() { async.series([ publishBlockHash.bind(self, rawBlock1), - publishBlockHash.bind(self, rawBlock2) + publishBlockHash.bind(self, rawBlock2), + function(next) { + utils.opts.blockHeight++; + next(); + }, + utils.waitForBitcoreNode.bind(utils) - ], function(err) { + ], done); - if(err) { - return done(err); - } - - done(); - - }); }); }); diff --git a/regtest/p2p.js b/regtest/p2p.js index 54c1b79c..e570bce7 100644 --- a/regtest/p2p.js +++ b/regtest/p2p.js @@ -1,1378 +1,260 @@ 'use strict'; +var chai = require('chai'); +var expect = chai.expect; var async = require('async'); -var assert = require('assert'); -var BaseService = require('../../service'); -var inherits = require('util').inherits; -var index = require('../../'); -var log = index.log; -var multer = require('multer'); -var storage = multer.memoryStorage(); -var upload = multer({ storage: storage }); -var validators = require('./validators'); -var mainUtils = require('../../utils'); -var utils = require('./utils'); -var _ = require('lodash'); -var bodyParser = require('body-parser'); -var LRU = require('lru-cache'); -var Encoding = require('./encoding'); +var BitcoinRPC = require('bitcoind-rpc'); +var path = require('path'); +var Utils = require('./utils'); +var crypto = require('crypto'); +var zmq = require('zmq'); var bitcore = require('bitcore-lib'); -var Input = bitcore.Transaction.Input; +var Transaction = bitcore.Transaction; +var PrivateKey = bitcore.PrivateKey; var Unit = bitcore.Unit; -var Transform = require('stream').Transform; -var WalletService = function(options) { - BaseService.call(this, options); +var debug = false; +var extraDebug = true; +var bitcoreDataDir = '/tmp/bitcore'; +var bitcoinDataDir = '/tmp/bitcoin'; - this._MAX_QUEUE = 20; - this._jobs = LRU({ - max: this._MAX_QUEUE, - maxAge: 86400000 * 3 //3 days - }); - - this._addressMap = {}; - this.balances = {}; - - this.db = this.node.services.db; +var rpcConfig = { + protocol: 'http', + user: 'bitcoin', + pass: 'local321', + host: '127.0.0.1', + port: '58332', + rejectUnauthorized: false }; -inherits(WalletService, BaseService); - -WalletService.dependencies = [ - 'bitcoind', - 'web', - 'address', - 'transaction', - 'timestamp' -]; - -WalletService.prototype.getAPIMethods = function() { - return []; +var bitcoin = { + args: { + datadir: bitcoinDataDir, + listen: 1, + regtest: 1, + server: 1, + listenonion: 0, + whitelist: '127.0.0.1', + rpcuser: rpcConfig.user, + rpcpassword: rpcConfig.pass, + rpcport: rpcConfig.port + }, + datadir: bitcoinDataDir, + exec: 'bitcoind', //if this isn't on your PATH, then provide the absolute path, e.g. /usr/local/bin/bitcoind + process: null }; -WalletService.prototype.start = function(callback) { - var self = this; - - - self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { - - if(err) { - return callback(err); - } - - self.servicePrefix = servicePrefix; - self._encoding = new Encoding(self.servicePrefix); - - self._loadAllAddresses(function(err) { - - if(err) { - return callback(err); - } - - self._loadAllBalances(callback); - }); - - }); -}; - -WalletService.prototype.stop = function(callback) { - setImmediate(callback); -}; - -WalletService.prototype.getPublishEvents = function() { - return []; -}; - - -WalletService.prototype.getAddressString = function(io) { - - var address = io.script.toAddress(this.node.network); - - if(address) { - return address.toString(); - } - - try { - var pubkey = io.script.getPublicKey(); - if(pubkey) { - return pubkey.toString('hex'); - } - } catch(e) {} - -}; - -WalletService.prototype._checkAddresses = function() { - return Object.keys(this._addressMap).length > 0; -}; - -WalletService.prototype.blockHandler = function(block, connectBlock, callback) { - - var opts = { - block: block, - connectBlock: connectBlock, - serial: true - }; - this._blockHandler(opts, callback); - -}; - -WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { - - var opts = { - block: block, - connectBlock: connectBlock - }; - this._blockHandler(opts, callback); - -}; - -WalletService.prototype._blockHandler = function(opts, callback) { - - var self = this; - - if (!self._checkAddresses()) { - return setImmediate(function() { - callback(null, []); - }); - } - - async.mapSeries(opts.block.transactions, function(tx, next) { - - self._processTransaction(opts, tx, next); - - }, function(err, operations) { - - if(err) { - return callback(err); - } - - var ret = _.compact(_.flattenDeep(operations)); - callback(null, ret); - - }); - -}; - -WalletService.prototype._processTransaction = function(opts, tx, callback) { - var self = this; - - tx.outputs.forEach(function(output, index) { - output.index = index; - }); - - var ioData = tx.inputs.concat(tx.outputs); - - async.mapSeries(ioData, function(io, next) { - if (opts.serial) { - self._processSerialIO(opts, tx, io, next); - } else { - self._processConcurrentIO(opts, tx, io, next); - } - }, function(err, operations) { - if(err) { - return callback(err); - } - callback(null, operations); - }); - -}; - -WalletService.prototype._processConcurrentIO = function(opts, tx, io, callback) { - - var self = this; - var walletIds = self._getWalletIdsFromScript(io); - - if (!walletIds) { - return callback(); - } - var actions = self._getActions(opts.connectBlock); - - var operations = walletIds.map(function(walletId) { - return { - type: actions[0], - key: self._encoding.encodeWalletTransactionKey(walletId, opts.block.__height, tx.id) - }; - }); - - setImmediate(function() { - callback(null, operations); - }); - -}; - -WalletService.prototype._processSerialIO = function(opts, tx, io, callback) { - var fn = this._processSerialOutput; - if (io instanceof Input) { - fn = this._processSerialInput; - } - fn.call(this, opts, tx, io, callback); -}; - -WalletService.prototype._getWalletIdsFromScript = function(io) { - - if(!io.script) { - log.debug('Invalid script'); - return; - } - - return this._addressMap[this.getAddressString(io)]; - -}; - -WalletService.prototype._getActions = function(connect) { - var action = 'put'; - var reverseAction = 'del'; - if (!connect) { - action = 'del'; - reverseAction = 'put'; - } - return [action, reverseAction]; -}; - -WalletService.prototype._processSerialOutput = function(opts, tx, output, callback) { - - var self = this; - var walletIds = self._getWalletIdsFromScript(output); - - if (!walletIds) { - return callback(); - } - - var actions = self._getActions(opts.connectBlock); - - async.mapSeries(walletIds, function(walletId, next) { - - self.balances[walletId] = self.balances[walletId] || 0; - self.balances[walletId] += opts.connectBlock ? output.satoshis : (-1 * output.satoshis); - - var operations = [ - { - type: actions[0], - key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, output.index), - value: self._encoding.encodeWalletUtxoValue(opts.block.__height, output.satoshis, output._scriptBuffer) - }, - { - type: actions[0], - key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, output.index), - value: self._encoding.encodeWalletUtxoSatoshisValue(opts.block.__height, output._scriptBuffer) - }, - { - type: 'put', - key: self._encoding.encodeWalletBalanceKey(walletId), - value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) - } - ]; - - next(null, operations); - - }, function(err, operations) { - - if(err) { - return callback(err); - } - - callback(null, operations); - - }); - -}; - -WalletService.prototype._processSerialInput = function(opts, tx, input, callback) { - - var self = this; - - var walletIds = input.script && input.script.isPublicKeyIn() ? - ['p2pk'] : - self._getWalletIdsFromScript(input); - - if (!walletIds) { - return callback(); - } - - var actions = self._getActions(opts.connectBlock); - - async.mapSeries(walletIds, function(walletId, next) { - - self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) { - - if(err) { - return next(err); - } - - var utxo = tx.outputs[input.outputIndex]; - - if (walletId === 'p2pk') { - - var pubKey = utxo.script.getPublicKey().toString('hex'); - walletId = self._addressMap[pubKey]; - - if (!walletId) { - return next(null, []); - } - - } - - self.balances[walletId] = self.balances[walletId] || 0; - self.balances[walletId] += opts.connectBlock ? (-1 * utxo.satoshis) : utxo.satoshis; - - var operations = [ - { - type: actions[1], - key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex), - value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer) - }, - { - type: actions[1], - key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex), - value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer) - }, - { - type: 'put', - key: self._encoding.encodeWalletBalanceKey(walletId), - value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) - } - ]; - - next(null, operations); - - }); - }, function(err, operations) { - - if(err) { - return callback(err); - } - - callback(null, operations); - - }); - -}; - -WalletService.prototype._loadAllAddresses = function(callback) { - var self = this; - - self._addressMap = {}; - - var start = self._encoding.encodeWalletAddressesKey('00'); - var end = self._encoding.encodeWalletAddressesKey(Array(65).join('f')); - - var stream = self.db.createReadStream({ - gte: start, - lt: end - }); - - var streamErr = null; - - stream.on('data', function(data) { - var key = self._encoding.decodeWalletAddressesKey(data.key); - var value = self._encoding.decodeWalletAddressesValue(data.value); - value.forEach(function(address) { - if(!self._addressMap[address]) { - self._addressMap[address] = []; - } - - self._addressMap[address].push(key); - }); - }); - - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('end', function() { - callback(streamErr); - }); -}; - -WalletService.prototype._loadAllBalances = function(callback) { - var self = this; - - self._balances = {}; - - var start = self._encoding.encodeWalletBalanceKey('00'); - var end = self._encoding.encodeWalletBalanceKey(Array(65).join('f')); - - var stream = self.db.createReadStream({ - gte: start, - lt: end - }); - - var streamErr = null; - - stream.on('data', function(data) { - var walletId = self._encoding.decodeWalletBalanceKey(data.key); - var balance = self._encoding.decodeWalletBalanceValue(data.value); - - self._balances[walletId] = balance; - }); - - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('end', function() { - callback(streamErr); - }); -}; - - -WalletService.prototype._endpointUTXOs = function() { - var self = this; - return function(req, res) { - req.setTimeout(600000); - var walletId = req.params.walletId; - var queryMempool = req.query.queryMempool !== false; - var height = self.node.services.db.tip.__height; - var options = { - queryMempool: queryMempool - }; - self.db.pauseSync(function() { - self._getUtxos(walletId, options, function(err, utxos) { - if(err) { - return utils.sendError(err, res); - } - self.db.resumeSync(); - res.status(200).jsonp({ - utxos: utxos, - height: height - }); - }); - }); - }; -}; - -WalletService.prototype._endpointGetBalance= function() { - var self = this; - return function(req, res) { - req.setTimeout(600000); - var walletId = req.params.walletId; - var queryMempool = req.query.queryMempool !== false; - var byAddress = req.query.byAddress; - - var options = { - queryMempool: queryMempool, - byAddress: byAddress - }; - - self.db.pauseSync(function() { - self._getBalance(walletId, options, function(err, result) { - if(err) { - return utils.sendError(err, res); - } - self.db.resumeSync(); - res.status(200).jsonp({ - satoshis: result, - height: self.node.services.db.tip.__height, - hash: self.node.services.db.tip.hash - }); - }); - }); - }; -}; - -WalletService.prototype._endpointRemoveWallet = function() { - var self = this; - return function(req, res) { - var walletId = req.params.walletId; - - self.db.pauseSync(function() { - self._removeWallet(walletId, function(err, numRecords) { - if(err) { - return utils.sendError(err, res); - } - self.db.resumeSync(); - res.status(200).jsonp({ - walletId: walletId, - numberRemoved: numRecords - }); - }); - }); - }; -}; - -WalletService.prototype._endpointRemoveAllWallets = function() { - var self = this; - return function(req, res) { - - self.db.pauseSync(function() { - self._removeAllWallets(function(err, numRecords) { - if(err) { - return utils.sendError(err, res); - } - self.db.resumeSync(); - res.status(200).jsonp({ - numberRemoved: numRecords - }); - }); - }); - }; -}; - -WalletService.prototype._endpointGetAddresses = function() { - var self = this; - return function(req, res) { - var walletId = req.params.walletId; - - self.db.pauseSync(function() { - self._getAddresses(walletId, function(err, addresses) { - self.db.resumeSync(); - if(err) { - return utils.sendError(err, res); - } - - if(!addresses) { - return res.status(404).send('Not found'); - } - - res.status(200).jsonp({ - addresses: addresses.length - }); - }); - }); - }; -}; - -WalletService.prototype._endpointDumpAllWallets = function() { - var self = this; - return function(req, res) { - var keys = []; - - var start = new Buffer(self.servicePrefix); - var end = new Buffer.concat([start, new Buffer('ff', 'hex')]); - - var stream = self.db.createKeyStream({ - gte: start, - lt: end - }); - - var streamErr = null; - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('data', function(data) { - keys.push(data); - }); - - stream.on('end', function() { - if(streamErr) { - return utils.sendError(streamErr, res); - } - var resultsMap = keys.map(function(key) { - return key.toString('hex'); - }); - res.status(200).jsonp({ - result: resultsMap - }); - }); - }; -}; - -WalletService.prototype._endpointGetWalletIds = function() { - var self = this; - return function(req, res) { - var start = new Buffer.concat([self.servicePrefix, new Buffer(self._encoding.subKeyMap.addresses.buffer)]); - var end = new Buffer.concat([start, new Buffer('ff', 'hex')]); - var stream = self.db.createKeyStream({ - gte: start, - lt: end - }); - var walletIds = []; - - var streamErr; - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('data', function(data) { - walletIds.push(self._encoding.decodeWalletAddressesKey(data)); - }); - - stream.on('end', function() { - if(streamErr) { - return utils.sendError(streamErr, res); - } - res.status(200).jsonp({ - walletIds: walletIds - }); - }); - }; -}; - -WalletService.prototype._endpointRegisterWallet = function() { - var self = this; - return function(req, res) { - var walletId = req.params.walletId; - if (!walletId) { - walletId = utils.getWalletId(); - } - self._createWallet(walletId, function(err) { - if(err) { - return utils.sendError(err, res); - } - res.status(201).jsonp({ - walletId: walletId - }); - }); - }; -}; - -WalletService.prototype._endpointResyncAddresses = function() { - - var self = this; - - return function(req, res) { - var walletId = req.params.walletId; - - - if (!walletId) { - return utils.sendError(new Error('WalletId must be given.'), res); - } - - if (!self._isJobQueueReady()) { - return utils.sendError(new Error('Job queue is currently overloaded, please try again later.'), res); - } - - self.db.pauseSync(function() { - - self._getAddresses(walletId, function(err, oldAddresses) { - - if(err) { - return utils.sendError(err, res); - } - - if(!oldAddresses) { - return res.status(404).send('Not found'); - } - - self._removeWallet(walletId, function(err) { - - if(err) { - return utils.sendError(err, res); - } - - self._createWallet(walletId, function() { - - var jobId = utils.generateJobId(); - self._importAddresses(walletId, oldAddresses, jobId, self._jobCompletionCallback.bind(self)); - res.status(200).jsonp({jobId: jobId}); - - }); - }); - }); - }); - }; -}; - -WalletService.prototype._endpointPostAddresses = function() { - var self = this; - return function(req, res) { - - var addresses = req.addresses; - if (!addresses || !addresses.length) { - return utils.sendError(new Error('addresses are required when creating a wallet.'), res); - } - var walletId = req.params.walletId; - if (!walletId) { - return utils.sendError(new Error('WalletId must be given.'), res); - } - if (!self._isJobQueueReady()) { - return utils.sendError(new Error('Job queue is currently overloaded, please try again later.'), res); - } - - var jobId = utils.generateJobId(); - - self.db.pauseSync(function() { - - self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self)); - res.status(200).jsonp({jobId: jobId}); - - }); - }; -}; - -WalletService.prototype._endpointGetTransactions = function() { - - var self = this; - - return function(req, res) { - - var walletId = req.params.walletId; - - self.db.pauseSync(function() { - self._processStartEndOptions(req, function(err, heights) { - - if(err) { - return utils.sendError(err, res); - } - - var options = { - start: heights[0] || 0, - end : heights[1] || 0xffffffff, - self: self, - walletId: walletId - }; - - var missingTxidCount = 0; - var transform = new Transform({ objectMode: true, highWaterMark: 1000000 }); - //txids are sent in and the actual tx's are found here - transform._transform = function(chunk, enc, callback) { - - var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex'); - - if (txid.length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000') { - missingTxidCount++; - log.error('missingTxidCount: ', missingTxidCount); - return callback(); - } - - self._getTransactionFromDb(options, txid, function(err, tx) { - - if(err) { - log.error(err); - transform.unpipe(); - return callback(); +var bitcore = { + configFile: { + file: bitcoreDataDir + '/bitcore-node.json', + conf: { + network: 'regtest', + port: 53001, + datadir: bitcoreDataDir, + services: ['p2p', 'test-p2p'], + servicesConfig: { + p2p: { + peers: [ + { + ip: { v4: '127.0.0.1' } } - - var formattedTx = utils.toJSONL(self._formatTransaction(tx)); - transform.push(formattedTx); - callback(); - - }); - - }; - - transform._flush = function(callback) { - self.db.resumeSync(); - callback(); - }; - - var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); - var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options)); - - stream.on('close', function() { - stream.unpipe(); - }); - - stream.pipe(transform).pipe(res); - }); - }); - }; + ] + }, + 'test-p2p': { + requirePath: path.resolve(__dirname + '/test_bus.js') + } + } + } + }, + httpOpts: { + protocol: 'http:', + hostname: 'localhost', + port: 53001, + }, + opts: { cwd: bitcoreDataDir }, + datadir: bitcoreDataDir, + exec: path.resolve(__dirname, '../bin/bitcore-node'), + args: ['start'], + process: null }; -WalletService.prototype._formatTransactions = function(txs) { - return txs.forEach(this._formatTransaction); +if (debug && extraDebug) { + bitcoin.args.printtoconsole = 1, + bitcoin.args.debug = 1, + bitcoin.args.logips = 1 +} + +var opts = { + debug: debug, + bitcore: bitcore, + bitcoin: bitcoin, + bitcoinDataDir: bitcoinDataDir, + bitcoreDataDir: bitcoreDataDir, + rpc: new BitcoinRPC(rpcConfig), + walletPassphrase: 'test', + txCount: 0, + blockHeight: 0, + walletPrivKeys: [], + initialTxs: [], + fee: 100000, + feesReceived: 0, + satoshisSent: 0, + walletId: crypto.createHash('sha256').update('test').digest('hex'), + satoshisReceived: 0, + initialHeight: 150 }; -WalletService.prototype._formatTransaction = function(tx) { - var obj = tx.toObject(); +var utils = new Utils(opts); - for(var i = 0; i < tx.inputs.length; i++) { - obj.inputs[i].inputSatoshis = tx.__inputValues[i]; +var subSocket; +var txs = []; +var blocks = []; + +function processMessages(topic, message) { + var topicStr = topic.toString(); + if (topicStr === 'transaction') { + return txs.push(message); + } else if (topicStr === 'block') { + return blocks.push(message); } - obj.height = tx.__height; - obj.timestamp = tx.__timestamp; - return obj; -}; +} -WalletService.prototype._endpointPutAddresses = function() { - var self = this; - return function(req, res) { - if (!self._isJobQueueReady()) { - return utils.sendError(new Error('Job Queue is full, current job limit: ' + self._MAX_QUEUE), res); +function setupZmqSubscriber(callback) { + + subSocket = zmq.socket('sub'); + subSocket.on('connect', function(fd, endPoint) { + if (debug) { + console.log('ZMQ connected to:', endPoint); } + }); - var newAddresses = req.body; - - if(!Array.isArray(req.body)) { - return utils.sendError(new Error('Must PUT an array'), res); + subSocket.on('disconnect', function(fd, endPoint) { + if (debug) { + console.log('ZMQ disconnect:', endPoint); } + }); - var walletId = req.params.walletId; - if (!walletId) { - return utils.sendError(new Error('WalletId must be given.'), res); - } + subSocket.monitor(100, 0); + subSocket.connect('tcp://127.0.0.1:38332'); + subSocket.subscribe('transaction'); + subSocket.subscribe('block'); + subSocket.on('message', processMessages); + callback(); +} - self.db.pauseSync(function() { +function waitForZmqConnection(callback) { + async.retry({ interval: 500, times: 50 }, function(next) { + return next(txs.length < utils.opts.initialTxs.length); + }, callback); +} - self._getAddresses(walletId, function(err, oldAddresses) { +describe('P2P Operations', function() { - if(err) { - return utils.sendError(err, res); - } + this.timeout(60000); - if(!oldAddresses) { - return res.status(404).send('Not found'); - } + after(function(done) { + utils.cleanup(done); + }); - var addAddresses = _.without(newAddresses, oldAddresses); + before(function(done) { + async.series([ + utils.startBitcoind.bind(utils), + utils.waitForBitcoinReady.bind(utils) + ], done); + }); - var jobId = utils.generateJobId(); - self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self)); - res.status(200).jsonp({jobId: jobId}); + it('should connect to the p2p network and stream the mempool to clients', function(done) { + async.series([ + utils.unlockWallet.bind(utils), + utils.setupInitialTxs.bind(utils), + function(next) { + async.eachSeries(utils.opts.initialTxs, function(tx, next) { + utils.opts.rpc.sendRawTransaction(tx.serialize(), next); + }, next); + }, + + utils.startBitcoreNode.bind(utils), + setupZmqSubscriber, + waitForZmqConnection + + ], function(err) { + + if(err) { + return done(err); + } + + var initialTxs = {}; + + utils.opts.initialTxs.map(function(tx) { + initialTxs[tx.hash] = true; + return; }); - }); - }; -}; -WalletService.prototype._getUtxos = function(walletId, options, callback) { - var self = this; + var i = 0; + for(; i < utils.opts.initialTxs.length; i++) { + var tx = new Transaction(txs[i]); + expect(initialTxs[tx.hash]).to.equal(true); + } - var stream = self.db.createReadStream({ - gte: self._encoding.encodeWalletUtxoKey(walletId), - lt: self._encoding.encodeWalletUtxoKey(mainUtils.getTerminalKey(new Buffer(walletId))) - }); - - var utxos = []; - var streamErr = null; - - stream.on('data', function(data) { - var key = self._encoding.decodeWalletUtxoKey(data.key); - var value = self._encoding.decodeWalletUtxoValue(data.value); - utxos.push({ - txid: key.txid, - vout: key.outputIndex, - height: value.height, - satoshis: value.satoshis, - scriptPubKey: value.script.toString('hex') + expect(utils.opts.initialTxs.length).to.equal(i); + done(); }); }); - stream.on('error', function(err) { - streamErr = err; - }); + it('should send new transactions as they are broadcasted by our trusted peer', function(done) { + var tx; + async.series([ - stream.on('end', function() { - callback(streamErr, utxos); - }); -}; + function(next) { + opts.rpc.generate(10, next); + }, -WalletService.prototype._getBalance = function(walletId, options, callback) { - - var self = this; - - var key = self._encoding.encodeWalletBalanceKey(walletId); - - self.db.get(key, function(err, buffer) { - - if(err) { - return callback(err); - } - - callback(null, self._encoding.decodeWalletBalanceValue(buffer)); - }); - -}; - -WalletService.prototype._getSearchParams = function(fn, options) { - return { - gte: fn.call(this, options.walletId, options.start), - lt: Buffer.concat([ fn.call(this, options.walletId, options.end).slice(0, -32), new Buffer('ff', 'hex') ]) - }; -}; - -WalletService.prototype._getTransactionFromDb = function(options, txid, callback) { - - var self = options.self; - - self.node.services.transaction.getTransaction(txid.toString('hex'), options, function(err, tx) { - - if(err) { - return callback(err); - } - - if (tx.__inputValues) { - return callback(null, tx); - } - - async.mapLimit(tx.inputs, 8, function(input, next) { - - self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), options, function(err, tx) { + function(next) { + utils.getPrivateKeysWithABalance(function(err, spendables) { if(err) { return next(err); } - next(null, tx.outputs[input.outputIndex].satoshis); - }); + tx = new Transaction() + .from(spendables[0].utxo) + .to(new PrivateKey('testnet').toAddress().toString(), + Unit.fromBTC(spendables[0].utxo.amount).satoshis - 100000) + .fee(100000) + .sign(spendables[0].privKey); - }, function(err, inputValues) { - - if(err) { - return callback(err); - } - - tx.__inputValues = inputValues; - callback(null, tx); - - }); - }); - -}; - -WalletService.prototype._removeWallet = function(walletId, callback) { - - var self = this; - async.map(Object.keys(self._encoding.subKeyMap), function(prefix, next) { - - var keys = []; - - var start = self._encoding.subKeyMap[prefix].fn.call(self._encoding, walletId); - var end = new Buffer.concat([ - self._encoding.subKeyMap[prefix] - .fn.call(self._encoding, walletId), - new Buffer('ff', 'hex')]); - - var stream = self.db.createKeyStream({ - gte: start, - lt: end - }); - - var streamErr = null; - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('data', function(data) { - keys.push(data); - }); - - stream.on('end', function() { - next(streamErr, keys); - }); - - }, function(err, results) { - if(err) { - return callback(err); - } - results = _.flatten(results); - var operations = []; - for(var i = 0; i < results.length; i++) { - operations.push({ - type: 'del', - key: results[i] + utils.sendTx(tx, 0, next); }); } - self.db.batch(operations, function(err) { + ], function(err) { + if(err) { - return callback(err); - } - callback(null, operations.length); - }); - }); -}; - -WalletService.prototype._removeAllWallets = function(callback) { - var self = this; - var operations = []; - - var start = self._encoding.servicePrefix; - var end = new Buffer.concat([ start, new Buffer('ff', 'hex') ]); - - var stream = self.db.createKeyStream({ - gte: start, - lte: end - }); - - var streamErr = null; - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('data', function(data) { - operations.push({ type: 'del', key: data }); - }); - - stream.on('end', function() { - self.db.batch(operations, function(err) { - if(err) { - return callback(err); - } - callback(null, operations.length); - }); - }); -}; - -WalletService.prototype._getAddresses = function(walletId, callback) { - var self = this; - var key = self._encoding.encodeWalletAddressesKey(walletId); - self.db.get(key, function(err, value) { - if(err) { - return callback(err); - } - if (!value) { - return callback(null, []); - } - callback(null, self._encoding.decodeWalletAddressesValue(value)); - }); -}; - -WalletService.prototype._createWallet = function(walletId, callback) { - var self = this; - var key = self._encoding.encodeWalletAddressesKey(walletId); - self.db.get(key, function(err) { - if (err && ((/notfound/i).test(err) || err.notFound)) { - var value = self._encoding.encodeWalletAddressesValue([]); - return self.db.put(key, value, callback); - } - callback(); - }); -}; - -WalletService.prototype._isJobQueueReady = function() { - - var self = this; - - self._jobs.rforEach(function(value, key) { - if ((value.status === 'complete' || value.status === 'error') && value.reported) { - self._jobs.del(key); - } - }); - - return self._jobs.length < self._MAX_QUEUE; - -}; - -WalletService.prototype._jobCompletionCallback = function(err, results) { - - this.db.resumeSync(); - - log.info('Completed job: ', results.jobId); - - var jobId = results.jobId; - var job = this._jobs.get(jobId); - - if (!job) { - log.error('ERROR: Could not locate job id: ' + jobId + - ' in the list of jobs. It may have been purged already although it should not have.'); - return; - } - - job.progress = 1.0; - job.endtime = Date.now(); - - if (err) { - job.status = 'error'; - job.message = err.message; - return; - } - - job.status = 'complete'; - job.message = results; - job.reported = false; -}; - -WalletService.prototype._importAddresses = function(walletId, addresses, jobId, callback) { - var self = this; - - var jobResults = { jobId: jobId }; - - var job = { - starttime: Date.now(), - fn: 'importAddresses', - progress: 0, - projectedendtime: null - }; - - this._jobs.set(jobId, job); - - - self._getAddresses(walletId, function(err, oldAddresses) { - if(err) { - return callback(err, jobResults); - } - - log.info('loaded existing addresses, count: ', oldAddresses.length); - async.parallel( - [ - self._getUTXOIndexOperations.bind(self, walletId, addresses, jobId), - self._getTxidIndexOperations.bind(self, walletId, addresses, jobId) - ], - function(err, results) { - if(err) { - return callback(err, jobResults); - } - - var now = Date.now(); - job.progress = 0.50; - job.projectedendtime = now + (now - job.starttime); - - var operations = results[0].concat(results[1]); - - operations.push({ - type: 'put', - key: self._encoding.encodeWalletAddressesKey(walletId), - value: self._encoding.encodeWalletAddressesValue(oldAddresses.concat(addresses)) - }); - - self.db.batch(operations, function(err) { - if(err) { - return callback(err, jobResults); - } - - self._loadAllAddresses(function(err) { - if(err) { - return callback(err, jobResults); - } - - self._loadAllBalances(function(err) { - if(err) { - return callback(err, jobResults); - } - callback(null, jobResults); - }); - }); - }); - } - ); - }); -}; - -WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, jobId, callback) { - var self = this; - - var balance = 0; - - self._getBalance(walletId, {}, function(err, initialBalance) { - if(err && !err.notFound) { - return callback(err); - } - - if(initialBalance) { - balance = initialBalance; - } - - log.info('Initial balance of walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.'); - log.info('Starting to gather utxos for walletId: ' + walletId); - self.node.services.address.getUtxos(addresses, false, function(err, utxos) { - if(err) { - return callback(err); + return done(err); } - log.info('completed gathering utxos: ', utxos.length); - var operations = []; - - for(var i = 0; i < utxos.length; i++) { - var utxo = utxos[i]; - - balance += utxo.satoshis; - - operations.push({ - type: 'put', - key: self._encoding.encodeWalletUtxoKey(walletId, utxo.txid, utxo.outputIndex), - value: self._encoding.encodeWalletUtxoValue(utxo.height, utxo.satoshis, utxo.script) - }); - - operations.push({ - type: 'put', - key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, utxo.txid, utxo.outputIndex), - value: self._encoding.encodeWalletUtxoSatoshisValue(utxo.height, utxo.script) - }); - } - - operations.push({ - type: 'put', - key: self._encoding.encodeWalletBalanceKey(walletId), - value: self._encoding.encodeWalletBalanceValue(balance) - }); - - log.info('Final balance for walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.'); - callback(null, operations); - }); - }); -}; - -WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, jobId, callback) { - var self = this; - var txids = {}; - - var logCount = 0; - async.eachLimit(addresses, 10, function(address, next) { - self.node.services.address.getAddressTxidsWithHeights(address, null, function(err, tmpTxids) { - if(err) { - return next(err); - } - if (logCount++ % 1000 === 0) { - log.info('loaded address txids, total count: ', Object.keys(txids).length); - } - txids = _.merge(txids, tmpTxids); - return next(); - }); - }, function(err) { - if(err) { - return callback(err); - } - - var operations = Object.keys(txids).map(function(txid) { - assert(txid.length === 64, 'WalletService, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.'); - return { - type: 'put', - key: self._encoding.encodeWalletTransactionKey(walletId, txids[txid], txid) - }; + setTimeout(function() { + var broadcastedTx = new Transaction(txs[txs.length - 1]); + expect(tx.hash).to.equal(broadcastedTx.hash); + expect(txs.length).to.equal(utils.opts.initialTxs.length + 1); + done(); + }, 1000); }); - callback(null, operations); }); -}; -WalletService.prototype._storeAddresses = function(walletId, addresses, callback) { - var key = this._encoding.encodeWalletAddressesKey(walletId); - var value = this._encoding.encodeWalletValue(addresses); - this.db.put(key, value, callback); -}; + it('should get blocks from peer that we do not have on startup', function(done) { -WalletService.prototype._storeBalance = function(walletId, balance, callback) { - var key = this._encoding.encodeWalletBalanceKey(walletId); - var value = this._encoding.encodeWalletBalanceValue(balance); - this.db.put(key, value, callback); -}; + done(); + }); -WalletService.prototype._processStartEndOptions = function(req, callback) { - var self = this; - - if (req.query.start >= 0 && req.query.end >= 0) { - - var heights = []; - self.node.services.timestamp.getBlockHeights([ - utils.normalizeTimeStamp(req.query.start), - utils.normalizeTimeStamp(req.query.end) - ], - - function(err, hashTuple) { - if(err) { - return callback(err); - } - - hashTuple.forEach(function(hash) { - self.node.services.bitcoind._tryAllClients(function(client, done) { - client.getBlock(hash, function(err, response) { - if (err) { - return callback(err); - } - done(null, heights.push(response.result.height)); - }); - }, function(err) { - if(err) { - return callback(err); - } - if (heights.length > 1) { - callback(null, heights); - } - }); - }); - }); - } else { - - setImmediate(function() { - callback(null, [req.query.start, req.query.end]); - }); - - } -}; - -WalletService.prototype._endpointJobs = function() { - - var self = this; - - return function(req, res) { - - var count = 0; - self._jobs.rforEach(function(value) { - if ((value.state === 'complete' || value.state === 'error') && value.reported) { - count++; - } - }); - - res.status(200).jsonp({ jobCount: self._jobs.length - count }); - }; - -}; - -WalletService.prototype._endpointJobStatus = function() { - - var self = this; - - return function(req, res) { - var jobId = req.params.jobId; - var job = self._jobs.get(jobId); - if (!jobId || !job) { - return utils.sendError(new Error('Job not found. ' + - 'The job results may have been purged to make room for new jobs.'), res); - } - job.reported = true; - return res.status(201).jsonp(job); - }; - -}; - -WalletService.prototype._setupReadOnlyRoutes = function(app) { - var s = this; - - app.get('/wallets/:walletId/utxos', - s._endpointUTXOs() - ); - app.get('/wallets/:walletId/balance', - s._endpointGetBalance() - ); - app.get('/wallets/dump', - s._endpointDumpAllWallets() - ); - app.get('/wallets/:walletId', - s._endpointGetAddresses() - ); - app.get('/wallets/:walletId/transactions', - s._endpointGetTransactions() - ); - app.get('/wallets', - s._endpointGetWalletIds() - ); - app.get('/jobs/:jobId', - s._endpointJobStatus() - ); - app.get('/jobs', - s._endpointJobs() - ); -}; - -WalletService.prototype._setupWriteRoutes = function(app) { - var s = this; - var v = validators; - - app.post('/wallets/:walletId', - s._endpointRegisterWallet() - ); - app.delete('/wallets/:walletId', - s._endpointRemoveWallet() - ); - app.delete('/wallets/', - s._endpointRemoveAllWallets() - ); - app.put('/wallets/:walletId/addresses', - s._endpointPutAddresses() - ); - app.post('/wallets/:walletId/addresses', - upload.single('addresses'), - v.checkAddresses, - s._endpointPostAddresses() - ); - app.put('/wallets/:walletId/addresses/resync', - s._endpointResyncAddresses() - ); -}; + it('should send new blocks as they are broadcasted by our trusted peer', function(done) { + expect(blocks.length).to.equal(1); + done(); + }); -WalletService.prototype.setupRoutes = function(app) { - app.use(bodyParser.json()); - this._setupReadOnlyRoutes(app); - this._setupWriteRoutes(app); +}); -}; - -WalletService.prototype.getRoutePrefix = function() { - return 'wallet-api'; -}; - -module.exports = WalletService; diff --git a/regtest/test_bus.js b/regtest/test_bus.js new file mode 100644 index 00000000..2a9d3472 --- /dev/null +++ b/regtest/test_bus.js @@ -0,0 +1,47 @@ +'use strict'; + +var BaseService = require('../lib/service'); +var inherits = require('util').inherits; +var zmq = require('zmq'); +var index = require('../lib'); +var log = index.log; + +var TestBusService = function(options) { + BaseService.call(this, options); +}; + +inherits(TestBusService, BaseService); + +TestBusService.dependencies = ['p2p']; + +TestBusService.prototype.start = function(callback) { + + var self = this; + self.pubSocket = zmq.socket('pub'); + + log.info('zmq bound to port: 38332'); + + self.pubSocket.bind('tcp://127.0.0.1:38332'); + + self.bus = self.node.openBus({ remoteAddress: 'localhost' }); + + self.bus.on('p2p/transaction', function(tx) { + self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]); + }); + + self.bus.on('p2p/block', function(block) { + self.pubSocket.send([ 'block', block.toBuffer() ]); + }); + + self.bus.subscribe('p2p/transaction'); + self.bus.subscribe('p2p/block'); + + callback(); +}; + +TestBusService.prototype.stop = function(callback) { + callback(); +}; + +module.exports = TestBusService; + diff --git a/regtest/utils.js b/regtest/utils.js index a84f6398..fcc6d337 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -127,19 +127,25 @@ Utils.prototype.initializeAndStartService = function(opts, callback) { var self = this; rimraf(opts.datadir, function(err) { + if(err) { return callback(err); } + mkdirp(opts.datadir, function(err) { + if(err) { return callback(err); } + if (opts.configFile) { self.writeConfigFile(opts.configFile.file, opts.configFile.conf); } + var args = _.isArray(opts.args) ? opts.args : self.toArgs(opts.args); opts.process = spawn(opts.exec, args, opts.opts); callback(); + }); }); @@ -170,7 +176,22 @@ Utils.prototype.startBitcoreNode = function(callback) { }; Utils.prototype.startBitcoind = function(callback) { - this.initializeAndStartService(this.opts.bitcoin, callback); + var self = this; + self.initializeAndStartService(self.opts.bitcoin, function() { + + // in case you choose to -printtoconsole + self.opts.bitcoin.process.stdout.on('data', function(data) { + if (self.opts.debug) { + process.stdout.write(data.toString()); + } + }); + + self.opts.bitcoin.process.stderr.on('data', function(data) { + process.stdout.write(data.toString()); + }); + + callback(); + }); }; Utils.prototype.unlockWallet = function(callback) {