db wrapper. fix memleak. consistency.

This commit is contained in:
Christopher Jeffrey 2016-03-19 08:24:22 -07:00
parent 131e546fb3
commit 800f5a5448
12 changed files with 391 additions and 86 deletions

View File

@ -30,7 +30,7 @@ function Chain(node, options) {
this.options = options;
this.node = node;
this.loading = false;
this.loaded = false;
this.mempool = node.mempool;
this.db = new bcoin.chaindb(this, options);
this.busy = false;
@ -145,8 +145,6 @@ Chain.prototype._init = function _init() {
self.emit('remove block', block);
});
this.loading = true;
utils.debug('Chain is loading.');
self._preload(function(err, start) {
@ -155,26 +153,33 @@ Chain.prototype._init = function _init() {
utils.debug('Reason: %s', err.message);
}
self.db.load(function(err) {
self.db.open(function(err) {
if (err)
throw err;
return self.emit('error', err);
self.db.getTip(function(err, tip) {
if (err)
throw err;
return self.emit('error', err);
assert(tip);
self.tip = tip;
self.height = tip.height;
self.loading = false;
self.loaded = true;
self.emit('load');
});
});
});
};
Chain.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
Chain.prototype._lock = function _lock(func, args, force) {
var self = this;
var block, called;
@ -957,7 +962,7 @@ Chain.prototype.add = function add(initial, peer, callback, force) {
var self = this;
var total = 0;
assert(!this.loading);
assert(this.loaded);
var unlock = this._lock(add, [initial, peer, callback], force);
if (!unlock)

View File

@ -35,7 +35,6 @@ function ChainDB(chain, options) {
this.queueSize = 0;
this.size = 0;
this.fd = null;
this.loading = false;
this.loaded = false;
this.fsync = !!options.fsync;
@ -67,67 +66,73 @@ function ChainDB(chain, options) {
utils.inherits(ChainDB, EventEmitter);
ChainDB.prototype._init = function _init() {
var self = this;
var genesis, block;
if (this.loaded)
return;
this.db = bcoin.ldb((this.options.spv ? 'spv' : '') + 'chain', {
compression: false,
cacheSize: 16 * 1024 * 1024,
writeBufferSize: 8 * 1024 * 1024
});
if (!bcoin.isBrowser) {
//var DataStore = require('./data' + 'store');
//this.db = new DataStore(this.db);
}
};
ChainDB.prototype.load = function load(callback) {
var self = this;
var genesis, block;
if (this.loaded)
return callback();
this.loading = true;
utils.debug('Starting chain load.');
function finish(err) {
this.db.open(function(err) {
if (err)
return callback(err);
return self.emit('error', err);
self.loading = false;
self.loaded = true;
self.emit('load');
function finish(err) {
if (err)
return self.emit('error', err);
utils.debug('Chain successfully loaded.');
self.loaded = true;
self.emit('load');
callback();
}
utils.debug('Chain successfully loaded.');
}
this.db.get('c/b/' + network.genesis.hash, function(err, exists) {
if (err && err.type !== 'NotFoundError')
throw err;
self.db.get('c/b/' + network.genesis.hash, function(err, exists) {
if (err && err.type !== 'NotFoundError')
return self.emit('error', err);
if (exists)
return finish();
if (exists)
return finish();
genesis = new bcoin.chainblock(self.chain, {
hash: network.genesis.hash,
version: network.genesis.version,
prevBlock: network.genesis.prevBlock,
merkleRoot: network.genesis.merkleRoot,
ts: network.genesis.ts,
bits: network.genesis.bits,
nonce: network.genesis.nonce,
height: 0,
chainwork: null
}, null);
genesis = new bcoin.chainblock(self.chain, {
hash: network.genesis.hash,
version: network.genesis.version,
prevBlock: network.genesis.prevBlock,
merkleRoot: network.genesis.merkleRoot,
ts: network.genesis.ts,
bits: network.genesis.bits,
nonce: network.genesis.nonce,
height: 0,
chainwork: null
}, null);
block = bcoin.block.fromRaw(network.genesisBlock, 'hex');
block.height = 0;
block = bcoin.block.fromRaw(network.genesisBlock, 'hex');
block.height = 0;
self.save(genesis, block, finish);
self.save(genesis, block, finish);
});
});
};
ChainDB.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
ChainDB.prototype.close = function close(callback) {
callback = utils.ensure(callback);
this.db.close(callback);
@ -210,6 +215,7 @@ ChainDB.prototype.getHash = function getHash(height, callback) {
ChainDB.prototype.dump = function dump(callback) {
var records = {};
return callback();
var iter = this.db.db.iterator({
gte: 'c',
lte: 'c~',
@ -834,6 +840,8 @@ ChainDB.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, opti
var ids = [];
var coins = [];
return callback(null, coins);
if (!callback) {
callback = options;
options = {};
@ -931,6 +939,8 @@ ChainDB.prototype.getTXByAddress = function getTXByAddress(addresses, options, c
var txs = [];
var have = {};
return callback(null, txs);
if (!callback) {
callback = options;
options = {};
@ -1220,6 +1230,8 @@ ChainDB.prototype._getTX = function _getTX(hash, callback) {
// For BIP30
// https://bitcointalk.org/index.php?topic=67738.0
ChainDB.prototype.isUnspentTX = function isUnspentTX(hash, callback) {
return callback(null, false);
if (this.options.spv)
return callback(null, false);
return this.isSpentTX(hash, function(err, spent) {
@ -1234,6 +1246,8 @@ ChainDB.prototype.isSpentTX = function isSpentTX(hash, callback) {
if (hash.hash)
hash = hash.hash('hex');
return callback(null, false);
var iter = this.db.db.iterator({
gte: 'u/t/' + hash,
lte: 'u/t/' + hash + '~',
@ -1337,6 +1351,7 @@ ChainDB.prototype._pruneQueue = function _pruneQueue(block, batch, callback) {
};
ChainDB.prototype._pruneCoinQueue = function _pruneQueue(block, batch, callback) {
return callback();
var iter = this.db.db.iterator({
gte: 'u/q/' + pad32(block.height),
lte: 'u/q/' + pad32(block.height) + '~',

View File

@ -21,7 +21,7 @@ function Fullnode(options) {
bcoin.node.call(this, options);
this.loading = false;
this.loaded = false;
Fullnode.global = this;
@ -32,11 +32,9 @@ utils.inherits(Fullnode, bcoin.node);
Fullnode.prototype._init = function _init() {
var self = this;
var pending = 3;
var pending = 5;
var options;
this.loading = true;
this.chain = new bcoin.chain(this, {
preload: false,
fsync: false,
@ -68,7 +66,9 @@ Fullnode.prototype._init = function _init() {
// and blockdb.
this.http = new bcoin.http.server(this, {
key: this.options.sslKey,
cert: this.options.sslCert
cert: this.options.sslCert,
port: this.options.httpPort || 8080,
host: '0.0.0.0'
});
// Bind to errors
@ -121,9 +121,12 @@ Fullnode.prototype._init = function _init() {
// self.walletdb.removeBlock(block);
// });
function load() {
function load(err) {
if (err)
return self.emit('error', err);
if (!--pending) {
self.loading = false;
self.loaded = true;
self.emit('load');
self.pool.startSync();
utils.debug('Node is loaded and syncing.');
@ -136,28 +139,34 @@ Fullnode.prototype._init = function _init() {
};
// Create or load the primary wallet.
this.createWallet(options, function(err, wallet) {
this.walletdb.open(function(err) {
if (err)
throw err;
return self.emit('error', err);
// Set the miner payout address if the
// programmer didn't pass one in.
if (!self.miner.address)
self.miner.address = wallet.getAddress();
self.createWallet(options, function(err, wallet) {
if (err)
throw err;
load();
// Set the miner payout address if the
// programmer didn't pass one in.
if (!self.miner.address)
self.miner.address = wallet.getAddress();
load();
});
});
this.chain.once('load', function() {
load();
});
this.chain.open(load);
this.mempool.open(load);
this.pool.open(load);
this.http.open(load);
};
this.http.listen(this.options.httpPort || 8080, '0.0.0.0', function(err) {
if (err)
throw err;
Fullnode.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
load();
});
this.once('load', callback);
};
Fullnode.prototype.createWallet = function createWallet(options, callback) {

View File

@ -20,6 +20,7 @@ function Client(uri) {
EventEmitter.call(this);
this.uri = uri;
this.loaded = false;
this.id = null;
this._init();
}
@ -68,9 +69,19 @@ Client.prototype._init = function _init() {
self.socket.on('error', function(err) {
self.emit('error', err);
});
self.loaded = true;
self.emit('load');
});
};
Client.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
Client.prototype.listenWallet = function listenWallet(id) {
this.socket.join(id);
};

View File

@ -22,6 +22,7 @@ function NodeServer(node, options) {
this.node = node;
this.walletdb = node.walletdb;
this.pool = node.pool;
this.loaded = false;
this.server = new HTTPServer(options);
this.io = null;
@ -370,6 +371,16 @@ NodeServer.prototype._init = function _init() {
});
this._initIO();
if (this.options.port != null)
this.listen(this.options.port, this.options.host);
};
NodeServer.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
NodeServer.prototype._initIO = function _initIO() {
@ -444,7 +455,20 @@ NodeServer.prototype.del = function del(path, callback) {
};
NodeServer.prototype.listen = function listen(port, host, callback) {
return this.server.listen(port, host, callback);
var self = this;
return this.server.listen(port, host, function(err) {
if (err) {
if (callback)
return callback(err);
return self.emit('error', err);
}
self.loaded = true;
self.emit('load');
if (callback)
callback();
});
};
/**

View File

@ -4,7 +4,9 @@
* https://github.com/indutny/bcoin
*/
var EventEmitter = require('events').EventEmitter;
var bcoin = require('../bcoin');
var utils = bcoin.utils;
var network = bcoin.protocol.network;
var db = {};
@ -12,6 +14,7 @@ module.exports = function ldb(name, options) {
var levelup = require('levelup');
var file = bcoin.prefix + '/' + name + '-' + network.type + '.db';
var backend = process.env.BCOIN_DB;
var promise;
bcoin.ensurePrefix();
@ -29,6 +32,7 @@ module.exports = function ldb(name, options) {
backend = require(backend);
}
/*
db[file] = new levelup(file, {
keyEncoding: 'ascii',
valueEncoding: 'binary',
@ -37,6 +41,26 @@ module.exports = function ldb(name, options) {
compression: options.compression !== false,
cacheSize: options.cacheSize || (8 << 20),
writeBufferSize: options.writeBufferSize || (4 << 20),
memtableSize: 10 << 20,
maxOpenFiles: options.maxOpenFiles || 8192,
// For LMDB if we decide to use it:
sync: options.sync || false,
mapSize: options.mapSize || 150 * (1024 << 20),
writeMap: options.writeMap || false,
db: backend
});
*/
db[file] = new DBWrapper(backend, file, {
keyEncoding: 'ascii',
valueEncoding: 'binary',
createIfMissing: true,
errorIfExists: false,
compression: options.compression !== false,
cacheSize: options.cacheSize || (8 << 20),
writeBufferSize: options.writeBufferSize || (4 << 20),
memtableSize: 10 << 20,
maxOpenFiles: options.maxOpenFiles || 8192,
// For LMDB if we decide to use it:
@ -50,3 +74,91 @@ module.exports = function ldb(name, options) {
return db[file];
};
/**
* DBWrapper
*/
function DBWrapper(backend, file, options) {
var self = this;
if (!(this instanceof DBWrapper))
return new DBWrapper(backend, file, options);
EventEmitter.call(this);
this.loaded = false;
this.backend = new backend(file);
this.db = this.backend;
this.backend.open(options, function(err) {
if (err)
return self.emit('error', err);
self.emit('load');
self.loaded = true;
});
}
utils.inherits(DBWrapper, EventEmitter);
DBWrapper.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
DBWrapper.prototype.get = function get(key, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
return this.backend.get(key, options, function(err, result) {
if (err) {
if (err.notFound || /not\s*found/i.test(err.message)) {
err.notFound = true;
err.type = 'NotFoundError';
}
return callback(err);
}
return callback(null, result);
});
};
DBWrapper.prototype.close = function close(callback) {
return this.backend.close(callback);
};
DBWrapper.prototype.put = function put(key, value, options, callback) {
return this.backend.put(key, value, options, callback);
};
DBWrapper.prototype.del = function del(key, options, callback) {
return this.backend.del(key, options, callback);
};
DBWrapper.prototype.batch = function batch(ops, options, callback) {
if (!ops)
return this.backend.batch();
return this.backend.batch(ops, options, callback);
};
DBWrapper.prototype.iterator = function batch(ops, options, callback) {
return this.backend.iterator(options);
};
DBWrapper.prototype.getProperty = function getProperty(name) {
if (this.backend.getProperty)
return this.backend.getProperty(name);
if (this.backend.db && this.backend.db.getProperty)
return this.backend.db.getProperty(name);
return null;
};
DBWrapper.prototype.approximateSize = function approximateSize(start, end, callback) {
return this.backend.approximateSize(start, end, callback);
};

View File

@ -20,6 +20,8 @@ function Mempool(node, options) {
if (!(this instanceof Mempool))
return new Mempool(node, options);
EventEmitter.call(this);
if (!options)
options = {};
@ -33,6 +35,7 @@ function Mempool(node, options) {
this.size = 0;
this.count = 0;
this.locked = false;
this.loaded = false;
Mempool.global = this;
@ -42,7 +45,18 @@ function Mempool(node, options) {
utils.inherits(Mempool, EventEmitter);
Mempool.prototype._init = function _init() {
;
var self = this;
utils.nextTick(function() {
self.loaded = true;
self.emit('load');
});
};
Mempool.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
Mempool.prototype.addBlock = function addBlock(block) {

View File

@ -47,6 +47,7 @@ function Pool(node, options) {
this.server = null;
this.destroyed = false;
this.loaded = false;
this.size = options.size || 8;
this.chain = node.chain;
@ -133,24 +134,26 @@ function Pool(node, options) {
Pool.global = this;
this.loading = true;
this.chain.open(function(err) {
if (err)
return self.emit('error', err);
if (!this.chain.loading) {
this.loading = false;
this.emit('load');
this._init();
return;
}
this.chain.once('load', function() {
self.loading = false;
self.loaded = true;
self.emit('load');
self._init();
});
}
utils.inherits(Pool, EventEmitter);
Pool.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
Pool.prototype._init = function _init() {
var self = this;
var i;
@ -502,7 +505,7 @@ Pool.prototype._addLoader = function _addLoader() {
};
Pool.prototype.startSync = function startSync() {
if (this.loading)
if (!this.loaded)
return this.once('load', this.startSync.bind(this));
this.syncing = true;
@ -525,7 +528,7 @@ Pool.prototype.stopSync = function stopSync() {
this.syncing = false;
if (this.loading)
if (!this.loaded)
return;
this._stopInterval();
@ -1247,7 +1250,7 @@ Pool.prototype.isWatched = function(tx, bloom) {
Pool.prototype.addWallet = function addWallet(wallet, callback) {
var self = this;
if (this.loading)
if (!this.loaded)
return this.once('load', this.addWallet.bind(this, wallet, callback));
callback = utils.asyncify(callback);
@ -1274,7 +1277,7 @@ Pool.prototype.removeWallet = function removeWallet(wallet) {
if (!this.options.spv)
return;
assert(!this.loading);
assert(this.loaded);
this.unwatchWallet(wallet);
};
@ -1304,7 +1307,7 @@ Pool.prototype.unwatchWallet = function unwatchWallet(wallet) {
Pool.prototype.searchWallet = function(wallet, callback) {
var self = this;
assert(!this.loading);
assert(this.loaded);
callback = utils.asyncify(callback);
@ -1364,7 +1367,7 @@ Pool.prototype.searchWallet = function(wallet, callback) {
Pool.prototype.search = function search(id, range, callback) {
var self = this;
assert(!this.loading);
assert(this.loaded);
if (!this.options.spv)
return;

View File

@ -119,6 +119,8 @@ TXPool.prototype.mapAddresses = function mapAddresses(address, callback) {
var map = {};
var iter;
return callback(null, map);
if (Array.isArray(address)) {
return utils.forEachSerial(address, function(address, next) {
self.mapAddresses(address, function(err, res) {
@ -804,6 +806,8 @@ TXPool.prototype.getTXHashes = function getTXHashes(address, callback) {
var txs = [];
var iter;
return callback(null, txs);
callback = utils.ensure(callback);
if (Array.isArray(address)) {
@ -867,6 +871,8 @@ TXPool.prototype.getPendingHashes = function getPendingHashes(address, callback)
var txs = [];
var iter;
return callback(null, txs);
callback = utils.ensure(callback);
if (Array.isArray(address)) {
@ -931,6 +937,8 @@ TXPool.prototype.getCoinIDs = function getCoinIDs(address, callback) {
var coins = [];
var iter;
return callback(null, coins);
callback = utils.ensure(callback);
if (Array.isArray(address)) {
@ -993,6 +1001,8 @@ TXPool.prototype.getHeightRangeHashes = function getHeightRangeHashes(address, o
var txs = [];
var iter;
return callback(null, txs);
callback = utils.ensure(callback);
iter = this.db.db.iterator({
@ -1045,6 +1055,8 @@ TXPool.prototype.getRangeHashes = function getRangeHashes(address, options, call
var txs = [];
var iter;
return callback(null, txs);
callback = utils.ensure(callback);
iter = this.db.db.iterator({

View File

@ -1761,6 +1761,66 @@ utils.wrap = function wrap(callback, unlock) {
};
};
utils.parallel = function parallel(stack, callback) {
var pending = stack.length;
var error;
var i;
callback = utils.once(callback);
if (!pending)
return utils.nextTick(callback);
function next(err) {
assert(pending > 0);
if (err)
error = err;
if (!--pending)
callback(error);
}
for (i = 0; i < stack.length; i++) {
try {
if (stack[i].length >= 2) {
stack[i](error, next);
error = null;
} else {
if (error)
continue;
stack[i](next);
}
} catch (e) {
pending--;
error = e;
}
}
};
utils.serial = function serial(stack, callback) {
var i = 0;
(function next(err) {
if (i++ >= stack.length)
return callback(err);
if (stack[i].length >= 2) {
try {
return stack[i](err, next);
} catch (e) {
return next(e);
}
}
if (err)
return utils.nextTick(next.bind(null, err));
try {
return stack[i](next);
} catch (e) {
return next(e);
}
})();
};
function SyncBatch(db) {
this.db = db;
this.ops = [];

View File

@ -47,6 +47,7 @@ function Wallet(options) {
this.change = [];
this.receive = [];
this.witness = options.witness || false;
this.loaded = false;
this.accountIndex = options.accountIndex || 0;
this.receiveDepth = options.receiveDepth || 1;
@ -152,6 +153,21 @@ Wallet.prototype._init = function _init() {
this.provider.on('unconfirmed', function(tx) {
self.emit('unconfirmed', tx);
});
this.provider.open(function(err) {
if (err)
return self.emit('error', err);
self.loaded = true;
self.emit('load');
});
};
Wallet.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
Wallet.prototype.destroy = function destroy() {

View File

@ -29,6 +29,7 @@ function WalletDB(node, options) {
this.node = node;
this.options = options;
this.loaded = false;
WalletDB.global = this;
@ -42,6 +43,7 @@ WalletDB._db = {};
WalletDB.prototype.dump = function dump(callback) {
var records = {};
return callback();
var iter = this.db.db.iterator({
gte: 'w',
lte: 'w~',
@ -80,11 +82,21 @@ WalletDB.prototype.dump = function dump(callback) {
WalletDB.prototype._init = function _init() {
var self = this;
if (this.loaded)
return;
this.db = bcoin.ldb('wallet', {
cacheSize: 4 * 1024 * 1024,
writeBufferSize: 4 * 1024 * 1024
});
this.db.open(function(err) {
if (err)
return self.emit('error', err);
self.emit('load');
self.loaded = true;
});
this.tx = new bcoin.txdb('w', this.db, {
ids: true
});
@ -154,6 +166,13 @@ WalletDB.prototype._init = function _init() {
});
};
WalletDB.prototype.open = function open(callback) {
if (this.loaded)
return utils.nextTick(callback);
this.once('load', callback);
};
WalletDB.prototype.syncOutputDepth = function syncOutputDepth(id, tx, callback) {
var self = this;
@ -557,12 +576,17 @@ function Provider(db) {
EventEmitter.call(this);
this.loaded = true;
this.db = db;
this.id = null;
}
utils.inherits(Provider, EventEmitter);
Provider.prototype.open = function open(callback) {
return utils.nextTick(callback);
};
Provider.prototype.setID = function setID(id) {
var self = this;