Merge pull request #51 from maraoz/memory/leak

Memory leak fix
This commit is contained in:
Manuel Aráoz 2015-04-27 11:00:14 -03:00
commit 1d67ce4310
6 changed files with 176 additions and 138 deletions

16
config/livenet.yml Normal file
View File

@ -0,0 +1,16 @@
BitcoreNode:
LevelUp: ./db
network: livenet
NetworkMonitor:
host: localhost
port: 8333
Reporter: none # none, simple, matrix
BitcoreHTTP:
host: localhost
port: 8080
RPC:
user: user
pass: password
protocol: http
host: 127.0.0.1
port: 8332

View File

@ -3,6 +3,7 @@
var bitcore = require('bitcore'); var bitcore = require('bitcore');
var $ = bitcore.util.preconditions; var $ = bitcore.util.preconditions;
var _ = bitcore.deps._; var _ = bitcore.deps._;
var BufferUtil = bitcore.util.buffer;
var NULL = '0000000000000000000000000000000000000000000000000000000000000000'; var NULL = '0000000000000000000000000000000000000000000000000000000000000000';
@ -12,7 +13,9 @@ function BlockChain() {
this.work[NULL] = 0; this.work[NULL] = 0;
this.height = {}; this.height = {};
this.height[NULL] = -1; this.height[NULL] = -1;
this.hashByHeight = { '-1': NULL }; this.hashByHeight = {
'-1': NULL
};
this.next = {}; this.next = {};
this.prev = {}; this.prev = {};
} }
@ -35,66 +38,74 @@ var getWork = function(bits) {
return ((bits & 0xffffff) << (8 * (bytes - 3))) >>> 0; return ((bits & 0xffffff) << (8 * (bytes - 3))) >>> 0;
}; };
BlockChain.prototype.addData = function(block) { BlockChain.prototype.addData = function(header) {
$.checkArgument(block instanceof bitcore.Block, 'Argument is not a Block instance'); $.checkArgument(header instanceof bitcore.Block.BlockHeader, 'Argument is not a BlockHeader instance');
var prevHash = bitcore.util.buffer.reverse(block.header.prevHash).toString('hex'); var prevHash = BufferUtil.reverse(header.prevHash).toString('hex');
var hash = header.hash;
this.work[block.hash] = this.work[prevHash] + getWork(block.header.bits); this.work[hash] = this.work[prevHash] + getWork(header.bits);
this.prev[block.hash] = prevHash; this.prev[hash] = prevHash;
}; };
BlockChain.prototype.proposeNewBlock = function(block) { BlockChain.prototype._appendNewBlock = function(hash) {
$.checkArgument(block instanceof bitcore.Block, 'Argument is not a Block instance'); var toUnconfirm = [];
var prevHash = bitcore.util.buffer.reverse(block.header.prevHash).toString('hex'); var toConfirm = [];
var self = this;
if (_.isUndefined(this.work[prevHash])) { var pointer = hash;
throw new Error('No previous data to estimate work'); while (_.isUndefined(this.height[pointer])) {
toConfirm.push(pointer);
pointer = this.prev[pointer];
} }
this.addData(block); var commonAncestor = pointer;
if (this.work[block.hash] > this.work[this.tip]) { pointer = this.tip;
while (pointer !== commonAncestor) {
toUnconfirm.push(pointer);
pointer = this.prev[pointer];
}
var toUnconfirm = []; toConfirm.reverse();
var toConfirm = []; toUnconfirm.map(function(hash) {
var commonAncestor; self.unconfirm(hash);
});
toConfirm.map(function(hash) {
self.confirm(hash);
});
return {
unconfirmed: toUnconfirm,
confirmed: toConfirm
};
};
var pointer = block.hash; BlockChain.prototype.proposeNewHeader = function(header) {
while (_.isUndefined(this.height[pointer])) { $.checkArgument(header instanceof bitcore.Block.BlockHeader, 'Argument is not a BlockHeader instance');
toConfirm.push(pointer); var prevHash = BufferUtil.reverse(header.prevHash).toString('hex');
pointer = this.prev[pointer]; var hash = header.hash;
}
commonAncestor = pointer;
pointer = this.tip; $.checkState(this.hasData(prevHash), 'No previous data to estimate work');
while (pointer !== commonAncestor) { this.addData(header);
toUnconfirm.push(pointer); var work = this.work[hash];
pointer = this.prev[pointer]; var tipWork = this.work[this.tip];
} $.checkState(!_.isUndefined(work), 'No work found for ' + hash);
$.checkState(!_.isUndefined(tipWork), 'No work found for tip ' + this.tip);
toConfirm.reverse(); if (work > tipWork) {
return this._appendNewBlock(hash);
var self = this;
toUnconfirm.map(function(hash) {
self.unconfirm(hash);
});
toConfirm.map(function(hash) {
self.confirm(hash);
});
return {
unconfirmed: toUnconfirm,
confirmed: toConfirm
};
} }
return { return {
unconfirmed: [], unconfirmed: [],
confirmed: [] confirmed: []
}; };
}; };
BlockChain.prototype.proposeNewBlock = function(block) {
$.checkArgument(block instanceof bitcore.Block, 'Argument is not a Block instance');
return this.proposeNewHeader(block.header);
};
BlockChain.prototype.confirm = function(hash) { BlockChain.prototype.confirm = function(hash) {
var prevHash = this.prev[hash]; var prevHash = this.prev[hash];
$.checkState(prevHash === this.tip); $.checkState(prevHash === this.tip, 'Attempting to confirm a non-contiguous block.');
this.tip = hash; this.tip = hash;
var height = this.height[prevHash] + 1; var height = this.height[prevHash] + 1;
@ -105,7 +116,7 @@ BlockChain.prototype.confirm = function(hash) {
BlockChain.prototype.unconfirm = function(hash) { BlockChain.prototype.unconfirm = function(hash) {
var prevHash = this.prev[hash]; var prevHash = this.prev[hash];
$.checkState(hash === this.tip); $.checkState(hash === this.tip, 'Attempting to unconfirm a non-tip block');
this.tip = prevHash; this.tip = prevHash;
var height = this.height[hash]; var height = this.height[hash];
@ -114,6 +125,35 @@ BlockChain.prototype.unconfirm = function(hash) {
delete this.height[hash]; delete this.height[hash];
}; };
BlockChain.prototype.hasData = function(hash) {
return !_.isUndefined(this.work[hash]);
};
BlockChain.prototype.prune = function() {
var self = this;
_.each(this.prev, function(key) {
if (!self.height[key]) {
delete self.prev[key];
delete self.work[key];
}
});
};
BlockChain.prototype.toObject = function() {
return {
tip: this.tip,
work: this.work,
next: this.next,
hashByHeight: this.hashByHeight,
height: this.height,
prev: this.prev
};
};
BlockChain.prototype.toJSON = function() {
return JSON.stringify(this.toObject());
};
BlockChain.prototype.getBlockLocator = function() { BlockChain.prototype.getBlockLocator = function() {
$.checkState(this.tip); $.checkState(this.tip);
$.checkState(!_.isUndefined(this.height[this.tip])); $.checkState(!_.isUndefined(this.height[this.tip]));
@ -134,33 +174,4 @@ BlockChain.prototype.getBlockLocator = function() {
return result; return result;
}; };
BlockChain.prototype.hasData = function(hash) {
return !_.isUndefined(this.work[hash]);
};
BlockChain.prototype.prune = function() {
var self = this;
_.each(this.prev, function(key, value) {
if (!self.height[key]) {
delete this.prev[key];
delete this.work[key];
}
});
};
BlockChain.prototype.toObject = function() {
return {
tip: this.tip,
work: this.work,
next: this.next,
hashByHeight: this.hashByHeight,
height: this.height,
prev: this.prev
};
};
BlockChain.prototype.toJSON = function() {
return JSON.stringify(this.toObject());
};
module.exports = BlockChain; module.exports = BlockChain;

View File

@ -78,6 +78,7 @@ NetworkMonitor.prototype.requestBlocks = function(locator) {
}; };
NetworkMonitor.prototype.start = function() { NetworkMonitor.prototype.start = function() {
console.log('starting network monitor');
this.peer.connect(); this.peer.connect();
}; };

View File

@ -79,12 +79,18 @@ BitcoreNode.prototype.initialize = function() {
var prevHeight = 0; var prevHeight = 0;
var statTimer = 5 * 1000; var statTimer = 5 * 1000;
setInterval(function() { setInterval(function() {
console.log(process.memoryUsage().heapTotal / 1024 / 1024, 'mb used');
if (!self.blockchain) { if (!self.blockchain) {
// not ready yet // not ready yet
console.log('No blockchain yet');
return; return;
} }
var tipHash = self.blockchain.tip; var tipHash = self.blockchain.tip;
var block = self.blockCache[tipHash]; var block = self.blockCache[tipHash];
if (_.isUndefined(block)) {
console.log('No blocks yet');
return;
}
var delta = block.height - prevHeight; var delta = block.height - prevHeight;
prevHeight = block.height; prevHeight = block.height;
console.log(block.id, block.height, 'vel', delta * 1000 / statTimer, 'b/s'); console.log(block.id, block.height, 'vel', delta * 1000 / statTimer, 'b/s');
@ -149,7 +155,9 @@ BitcoreNode.prototype.start = function() {
var self = this; var self = this;
var genesis = bitcore.Block.fromBuffer(genesisBlocks[bitcore.Networks.defaultNetwork.name]); var genesis = bitcore.Block.fromBuffer(genesisBlocks[bitcore.Networks.defaultNetwork.name]);
console.log('getting blockchain');
this.blockService.getBlockchain().then(function(blockchain) { this.blockService.getBlockchain().then(function(blockchain) {
console.log('got blockchain', !!blockchain);
if (!blockchain) { if (!blockchain) {
self.blockchain = new BlockChain(); self.blockchain = new BlockChain();
self.bus.process(genesis); self.bus.process(genesis);
@ -159,9 +167,6 @@ BitcoreNode.prototype.start = function() {
self.sync(); self.sync();
self.networkMonitor.start(); self.networkMonitor.start();
}); });
this.networkMonitor.on('stop', function() {
self.blockService.saveBlockchain(self.blockchain);
});
}; };
BitcoreNode.prototype.stop = function(reason) { BitcoreNode.prototype.stop = function(reason) {
@ -170,7 +175,7 @@ BitcoreNode.prototype.stop = function(reason) {
BitcoreNode.prototype.requestFromTip = function() { BitcoreNode.prototype.requestFromTip = function() {
var locator = this.blockchain.getBlockLocator(); var locator = this.blockchain.getBlockLocator();
console.log('requesting blocks, locator size:', locator.length); //console.log('requesting blocks, locator size:', locator.length);
this.networkMonitor.requestBlocks(locator); this.networkMonitor.requestBlocks(locator);
}; };

View File

@ -1,12 +1,13 @@
'use strict'; 'use strict';
var config = require('config');
var LevelUp = require('levelup'); var LevelUp = require('levelup');
var Promise = require('bluebird'); var Promise = require('bluebird');
var RPC = require('bitcoind-rpc'); var RPC = require('bitcoind-rpc');
var TransactionService = require('./transaction'); var TransactionService = require('./transaction');
var bitcore = require('bitcore'); var bitcore = require('bitcore');
var Transaction = bitcore.Transaction; var Transaction = bitcore.Transaction;
var config = require('config'); var BufferUtil = bitcore.util.buffer;
var errors = require('../errors'); var errors = require('../errors');
var BlockChain = require('../blockchain'); var BlockChain = require('../blockchain');
@ -41,7 +42,8 @@ var Index = {
next: 'nxt-', // nxt-<hash> -> hash for the next block in the main chain that is a child next: 'nxt-', // nxt-<hash> -> hash for the next block in the main chain that is a child
height: 'bh-', // bh-<hash> -> height (-1 means disconnected) height: 'bh-', // bh-<hash> -> height (-1 means disconnected)
tip: 'tip', // tip -> { hash: hex, height: int }, the latest tip tip: 'tip', // tip -> { hash: hex, height: int }, the latest tip
work: 'wk-' // wk-<hash> -> amount of work for block work: 'wk-', // wk-<hash> -> amount of work for block
header: 'header-' // header-<hash> -> JSON for block header
}; };
_.extend(Index, { _.extend(Index, {
getNextBlock: helper(Index.next), getNextBlock: helper(Index.next),
@ -50,7 +52,8 @@ _.extend(Index, {
getBlockWork: helper(Index.work), getBlockWork: helper(Index.work),
getBlockByTs: function(block) { getBlockByTs: function(block) {
return Index.timestamp + block.header.time; return Index.timestamp + block.header.time;
} },
getBlockHeader: helper(Index.header),
}); });
function BlockService(opts) { function BlockService(opts) {
@ -182,27 +185,26 @@ BlockService.prototype.getBlockByHeight = function(height) {
}); });
}; };
BlockService.prototype._getLatestHash = function() {
var self = this;
return Promise.try(function() {
return self.database.getAsync(Index.tip);
}).catch(LevelUp.errors.NotFoundError, function() {
return null;
});
};
/** /**
* Fetch the block that is currently the tip of the blockchain * Fetch the block that is currently the tip of the blockchain
* *
* @return {Promise<Block>} * @return {Promise<Block>}
*/ */
BlockService.prototype.getLatest = function() { BlockService.prototype.getLatest = function() {
var self = this; var self = this;
return this._getLatestHash().then(function(blockHash) {
return Promise.try(function() { if (!blockHash) {
return null;
return self.database.getAsync(Index.tip); }
}).then(function(blockHash) {
return self.getBlock(blockHash); return self.getBlock(blockHash);
}).catch(LevelUp.errors.NotFoundError, function() {
return null;
}); });
}; };
@ -241,6 +243,8 @@ BlockService.prototype.confirm = function(block, ops) {
//console.log(0); //console.log(0);
return Promise.try(function() { return Promise.try(function() {
//console.log(0.5);
self._setHeader(ops, block);
//console.log(1); //console.log(1);
self._setNextBlock(ops, block.header.prevHash, block); self._setNextBlock(ops, block.header.prevHash, block);
@ -272,6 +276,14 @@ BlockService.prototype.confirm = function(block, ops) {
}); });
}; };
BlockService.prototype._setHeader = function(ops, block) {
ops.push({
type: 'put',
key: Index.getBlockHeader(block.hash),
value: block.header.toJSON(),
});
};
BlockService.prototype._setNextBlock = function(ops, prevBlockHash, block) { BlockService.prototype._setNextBlock = function(ops, prevBlockHash, block) {
if (bitcore.util.buffer.isBuffer(prevBlockHash)) { if (bitcore.util.buffer.isBuffer(prevBlockHash)) {
prevBlockHash = bitcore.util.buffer.reverse(prevBlockHash).toString('hex'); prevBlockHash = bitcore.util.buffer.reverse(prevBlockHash).toString('hex');
@ -358,15 +370,13 @@ BlockService.prototype._setBlockByTs = function(ops, block) {
* @return {Promise<Block>} a promise of the same block, for chaining * @return {Promise<Block>} a promise of the same block, for chaining
*/ */
BlockService.prototype.unconfirm = function(block, ops) { BlockService.prototype.unconfirm = function(block, ops) {
ops = ops || []; ops = ops || [];
var self = this;
return Promise.try(function() { return Promise.try(function() {
self._removeNextBlock(ops, block.header.prevHash, block); self._removeNextBlock(ops, block.header.prevHash, block);
self._unsetBlockHeight(ops, block, block.height); self._unsetBlockHeight(ops, block, block.height);
self._dropBlockByTs(ops, block); self._dropBlockByTs(ops, block);
return Promise.all(block.transactions.map(function(transaction) { return Promise.all(block.transactions.map(function(transaction) {
@ -374,21 +384,19 @@ BlockService.prototype.unconfirm = function(block, ops) {
})); }));
}).then(function() { }).then(function() {
return self.database.batchAsync(ops); return self.database.batchAsync(ops);
}); });
}; };
BlockService.prototype._removeNextBlock = function(ops, prevHash, block) { BlockService.prototype._removeNextBlock = function(ops, prevHash, block) {
if (bitcore.util.buffer.isBuffer(prevBlockHash)) { if (bitcore.util.buffer.isBuffer(prevHash)) {
prevBlockHash = bitcore.util.buffer.reverse(prevBlockHash).toString('hex'); prevHash = bitcore.util.buffer.reverse(prevHash).toString('hex');
} }
ops.push({ ops.push({
type: 'del', type: 'del',
key: Index.getNextBlock(prevBlockHash) key: Index.getNextBlock(prevHash)
}); });
ops.push({ ops.push({
type: 'del', type: 'del',
@ -396,7 +404,7 @@ BlockService.prototype._removeNextBlock = function(ops, prevHash, block) {
}); });
}; };
BlockService.prototype._unsetBlockHeight = function(ops, block, height) { BlockService.prototype._unsetBlockHeight = function(ops, block) {
ops.push({ ops.push({
type: 'del', type: 'del',
key: Index.getBlockHeight(block) key: Index.getBlockHeight(block)
@ -444,52 +452,41 @@ BlockService.prototype.getBlockchain = function() {
var self = this; var self = this;
var blockchain = new BlockChain(); var blockchain = new BlockChain();
var headers = [];
var fetchBlock = function(blockHash) { console.log('Fetching headers from db...');
return Promise.all([ var fetchHeader = function(blockHash) {
self.database.getAsync(Index.getPreviousBlock(blockHash)).then(function(prevHash) { if (blockHash === BlockChain.NULL) {
blockchain.prev[blockHash] = prevHash; console.log('All headers fetched, total =', headers.length);
blockchain.next[prevHash] = blockHash; console.log(process.memoryUsage().heapTotal / 1024 / 1024, 'mb used');
}), return;
self.database.getAsync(Index.getBlockHeight(blockHash)).then(function(height) { }
blockchain.height[blockHash] = +height; var headerKey = Index.getBlockHeader(blockHash);
blockchain.hashByHeight[height] = blockHash; return self.database.getAsync(headerKey)
}), .then(function(json) {
self.database.getAsync(Index.getBlockWork(blockHash)).then(function(work) { return bitcore.Block.BlockHeader.fromJSON(json);
blockchain.work[blockHash] = work;
}) })
]).then(function() { .then(function(header) {
return blockHash; headers.push(header);
}); return fetchHeader(BufferUtil.reverse(header.prevHash).toString('hex'));
});
}; };
var fetchUnlessGenesis = function(blockHash) { return self._getLatestHash()
return fetchBlock(blockHash).then(function() {
if (blockchain.prev[blockHash] === BlockChain.NULL) {
return;
} else {
return fetchUnlessGenesis(blockchain.prev[blockHash]);
}
});
};
return self.database.getAsync(Index.tip)
.catch(function(err) {
if (err.notFound) {
return undefined;
}
throw err;
})
.then(function(tip) { .then(function(tip) {
if (!tip) { if (!tip) {
console.log('No tip found'); console.log('No tip found');
return; return null;
} }
console.log('Tip is', tip); console.log('Tip is', tip);
blockchain.tip = tip; return fetchHeader(tip)
return fetchUnlessGenesis(tip).then(function() { .then(function() {
return blockchain; while (headers.length !== 0) {
}); var header = headers.pop();
blockchain.proposeNewHeader(header);
}
return blockchain;
});
}); });
}; };

View File

@ -116,6 +116,10 @@ describe('BlockService', function() {
it('makes the expected calls when confirming the genesis block', function(callback) { it('makes the expected calls when confirming the genesis block', function(callback) {
database.batchAsync = function(ops) { database.batchAsync = function(ops) {
var expectedOps = [{ var expectedOps = [{
type: 'put',
key: 'header-000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f',
value: '{"version":1,"prevHash":"0000000000000000000000000000000000000000000000000000000000000000","merkleRoot":"3ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a","time":1231006505,"bits":486604799,"nonce":2083236893}'
}, {
type: 'put', type: 'put',
key: 'nxt-0000000000000000000000000000000000000000000000000000000000000000', key: 'nxt-0000000000000000000000000000000000000000000000000000000000000000',
value: '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' value: '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
@ -146,6 +150,10 @@ describe('BlockService', function() {
it('makes the expected calls when confirming the block #170', function(callback) { it('makes the expected calls when confirming the block #170', function(callback) {
database.batchAsync = function(ops) { database.batchAsync = function(ops) {
ops.should.deep.equal([{ ops.should.deep.equal([{
type: 'put',
key: 'header-00000000d1145790a8694403d4063f323d499e655c83426834d4ce2f8dd4a2ee',
value: '{"version":1,"prevHash":"55bd840a78798ad0da853f68974f3d183e2bd1db6a842c1feecf222a00000000","merkleRoot":"ff104ccb05421ab93e63f8c3ce5c2c2e9dbb37de2764b3a3175c8166562cac7d","time":1231731025,"bits":486604799,"nonce":1889418792}'
}, {
type: 'put', type: 'put',
key: 'nxt-000000002a22cfee1f2c846adbd12b3e183d4f97683f85dad08a79780a84bd55', key: 'nxt-000000002a22cfee1f2c846adbd12b3e183d4f97683f85dad08a79780a84bd55',
value: '00000000d1145790a8694403d4063f323d499e655c83426834d4ce2f8dd4a2ee' value: '00000000d1145790a8694403d4063f323d499e655c83426834d4ce2f8dd4a2ee'