add locker. refactor.

This commit is contained in:
Christopher Jeffrey 2016-03-23 03:52:11 -07:00
parent 3efdc904e7
commit 5023c7efe4
8 changed files with 169 additions and 111 deletions

View File

@ -83,6 +83,7 @@ bcoin.debug = function debug() {
bcoin.utils = utils;
bcoin.utils.debug = bcoin.debug;
bcoin.utils.ensurePrefix = bcoin.ensurePrefix;
bcoin.locker = require('./bcoin/locker');
bcoin.reader = require('./bcoin/reader');
bcoin.writer = require('./bcoin/writer');
bcoin.profiler = require('./bcoin/profiler');

View File

@ -558,7 +558,6 @@ Address.prototype.toJSON = function toJSON(passphrase) {
key: key.toJSON(passphrase),
type: this.type,
witness: this.witness,
redeem: this.redeem ? utils.toHex(this.redeem) : null,
keys: this.keys.map(utils.toBase58),
m: this.m,
n: this.n
@ -583,7 +582,6 @@ Address.fromJSON = function fromJSON(json, passphrase) {
key: bcoin.keypair.fromJSON(json.key, passphrase),
type: json.type,
witness: json.witness,
redeem: json.redeem ? new Buffer(json.redeem, 'hex') : null,
keys: json.keys.map(utils.fromBase58),
m: json.m,
n: json.n

View File

@ -33,14 +33,10 @@ function Chain(node, options) {
this.loaded = false;
this.mempool = node.mempool;
this.db = new bcoin.chaindb(this, options);
this.busy = false;
this.jobs = [];
this.pending = [];
this.pendingBlocks = {};
this.pendingSize = 0;
this.total = 0;
this.orphanLimit = options.orphanLimit || 20 * 1024 * 1024;
this.pendingLimit = options.pendingLimit || 20 * 1024 * 1024;
this.orphanLimit = options.orphanLimit || (20 << 20);
this.pendingLimit = options.pendingLimit || (20 << 20);
this.locker = new bcoin.locker(this, this.add, this.pendingLimit);
this.invalid = {};
this.bestHeight = -1;
this.lastUpdate = utils.now();
@ -181,61 +177,7 @@ Chain.prototype.open = function open(callback) {
};
Chain.prototype._lock = function _lock(func, args, force) {
var self = this;
var block, called;
if (force) {
assert(this.busy);
return function unlock() {
assert(!called);
called = true;
};
}
if (this.busy) {
if (func === Chain.prototype.add) {
block = args[0];
this.pending.push(block);
this.pendingBlocks[block.hash('hex')] = true;
this.pendingSize += block.getSize();
if (this.pendingSize > this.pendingLimit) {
this.purgePending();
return;
}
}
this.jobs.push([func, args]);
return;
}
this.busy = true;
return function unlock() {
var item, block;
assert(!called);
called = true;
self.busy = false;
if (func === Chain.prototype.add) {
if (self.pending.length === 0)
self.emit('flush');
}
if (self.jobs.length === 0)
return;
item = self.jobs.shift();
if (item[0] === Chain.prototype.add) {
block = item[1][0];
assert(block === self.pending.shift());
delete self.pendingBlocks[block.hash('hex')];
self.pendingSize -= block.getSize();
}
item[0].apply(self, item[1]);
};
return this.locker.lock(func, args, force);
};
// Stream headers from electrum.org for quickly
@ -962,10 +904,7 @@ Chain.prototype.resetTime = function resetTime(ts, callback, force) {
};
Chain.prototype.onFlush = function onFlush(callback) {
if (this.pending.length === 0)
return callback();
this.once('flush', callback);
return this.locker.onFlush(callback);
};
Chain.prototype.add = function add(initial, peer, callback, force) {
@ -1300,21 +1239,7 @@ Chain.prototype.pruneOrphans = function pruneOrphans(peer) {
};
Chain.prototype.purgePending = function purgePending() {
var self = this;
utils.debug('Warning: %dmb of pending blocks. Purging.',
utils.mb(this.pendingSize));
this.pending.forEach(function(block) {
delete self.pendingBlocks[block.hash('hex')];
});
this.pending.length = 0;
this.pendingSize = 0;
this.jobs = this.jobs.filter(function(item) {
return item[0] !== Chain.prototype.add;
});
return this.locker.purgePending();
};
Chain.prototype.has = function has(hash, callback) {
@ -1409,7 +1334,7 @@ Chain.prototype.hasPending = function hasPending(hash) {
else if (hash.hash)
hash = hash.hash('hex');
return !!this.pendingBlocks[hash];
return this.locker.hasPending(hash);
};
Chain.prototype.getEntry = function getEntry(hash, callback) {

125
lib/bcoin/locker.js Normal file
View File

@ -0,0 +1,125 @@
/**
* locker.js - lock and queue for bcoin
* Copyright (c) 2014-2015, Fedor Indutny (MIT License)
* https://github.com/indutny/bcoin
*/
var EventEmitter = require('events').EventEmitter;
var utils = require('./utils');
var assert = utils.assert;
/**
* Locker
*/
function Locker(parent, add, limit) {
if (!(this instanceof Locker))
return Locker(parent, add, limit);
this.parent = parent;
this.jobs = [];
this.busy = false;
this.pending = [];
this.pendingMap = {};
this.pendingSize = 0;
this.pendingLimit = limit || (20 << 20);
this.add = add;
}
utils.inherits(Locker, EventEmitter);
Locker.prototype.hasPending = function hasPending(key) {
return this.pendingMap[key] === true;
};
Locker.prototype.lock = function lock(func, args, force) {
var self = this;
var obj, called;
if (force) {
assert(this.busy);
return function unlock() {
assert(!called);
called = true;
};
}
if (this.busy) {
if (this.add && func === this.add) {
obj = args[0];
this.pending.push(obj);
this.pendingMap[obj.hash('hex')] = true;
this.pendingSize += obj.getSize();
if (this.pendingSize > this.pendingLimit) {
this.purgePending();
return;
}
}
this.jobs.push([func, args]);
return;
}
this.busy = true;
return function unlock() {
var item, obj;
assert(!called);
called = true;
self.busy = false;
if (self.add && func === self.add) {
if (self.pending.length === 0)
self.emit('flush');
}
if (self.jobs.length === 0)
return;
item = self.jobs.shift();
if (self.add && item[0] === self.add) {
obj = item[1][0];
assert(obj === self.pending.shift());
delete self.pendingMap[obj.hash('hex')];
self.pendingSize -= obj.getSize();
}
item[0].apply(self.parent, item[1]);
};
};
Locker.prototype.purgePending = function purgePending() {
var self = this;
assert(this.add);
utils.debug('Warning: %dmb of pending objects. Purging.',
utils.mb(this.pendingSize));
this.pending.forEach(function(obj) {
delete self.pendingMap[obj.hash('hex')];
});
this.pending.length = 0;
this.pendingSize = 0;
this.jobs = this.jobs.filter(function(item) {
return item[0] !== self.add;
});
};
Locker.prototype.onFlush = function onFlush(callback) {
if (this.pending.length === 0)
return callback();
this.once('flush', callback);
};
/**
* Expose
*/
module.exports = Locker;

View File

@ -13,6 +13,7 @@ var utils = require('./utils');
var assert = utils.assert;
var BufferWriter = require('./writer');
var BufferReader = require('./reader');
var DUMMY_PEER = { sendReject: function() {} };
/**
* Mempool
@ -69,9 +70,12 @@ function Mempool(node, options) {
utils.inherits(Mempool, EventEmitter);
Mempool.flags = constants.flags.STANDARD_VERIFY_FLAGS;
Mempool.mandatory = constants.flags.MANDATORY_VERIFY_FLAGS;
Mempool.prototype._lock = function _lock(func, args, force) {
var self = this;
var block, called;
var tx, called;
if (force) {
assert(this.busy);
@ -223,9 +227,6 @@ Mempool.prototype.hasTX = function hasTX(hash, callback) {
});
};
Mempool.flags = constants.flags.STANDARD_VERIFY_FLAGS;
Mempool.mandatory = constants.flags.MANDATORY_VERIFY_FLAGS;
Mempool.prototype.add =
Mempool.prototype.addTX = function addTX(tx, peer, callback, force) {
var self = this;
@ -236,6 +237,14 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) {
if (!unlock)
return;
if (typeof peer === 'function') {
callback = peer;
peer = null;
}
if (!peer)
peer = DUMMY_PEER;
hash = tx.hash('hex');
assert(tx.ts === 0);
@ -321,18 +330,6 @@ Mempool.prototype.addUnchecked = function addUnchecked(tx, peer, callback) {
});
};
function VerifyError(reason, score) {
Error.call(this);
if (Error.captureStackTrace)
Error.captureStackTrace(this, VerifyError);
this.type = 'VerifyError';
this.message = reason;
this.reason = score === -1 ? null : reason;
this.score = score;
}
utils.inherits(VerifyError, Error);
Mempool.prototype.verify = function verify(tx, callback) {
var self = this;
var total, input, coin, i, fee, now;
@ -607,6 +604,9 @@ Mempool.prototype.checkTX = function checkTX(tx, peer) {
var total = new bn(0);
var uniq = {};
if (!peer)
peer = DUMMY_PEER;
if (tx.inputs.length === 0)
return peer.sendReject(tx, 'bad-txns-vin-empty', 100);
@ -649,6 +649,18 @@ Mempool.prototype.checkTX = function checkTX(tx, peer) {
return true;
};
function VerifyError(reason, score) {
Error.call(this);
if (Error.captureStackTrace)
Error.captureStackTrace(this, VerifyError);
this.type = 'VerifyError';
this.message = reason;
this.reason = score === -1 ? null : reason;
this.score = score;
}
utils.inherits(VerifyError, Error);
/**
* Expose
*/

View File

@ -754,9 +754,9 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
peer.queue.block.length,
block.bits,
self.peers.all.length,
self.chain.pending.length,
self.chain.locker.pending.length,
self.chain.bestHeight,
self.chain.jobs.length);
self.chain.locker.jobs.length);
}
return callback(null, true);
@ -1145,10 +1145,7 @@ Pool.prototype.watch = function watch(id) {
else
this.watchMap[hid] = 1;
if (this.bloom.test(id, 'hex'))
return;
this.bloom.add(id, 'hex');
this.bloom.add(id);
}
// Send it to peers
@ -1528,7 +1525,7 @@ Pool.prototype.scheduleRequests = function scheduleRequests(peer) {
Pool.prototype._sendRequests = function _sendRequests(peer) {
var size, items;
if (this.chain.pending.length > 0)
if (this.chain.locker.pending.length > 0)
return;
if (peer.queue.block.length === 0)

View File

@ -1915,6 +1915,6 @@ utils.SyncBatch = SyncBatch;
if (utils.isBrowser) {
bn.prototype.toBuffer = function toBuffer(order, size) {
return new Buffer(this.toArray(order, size));
return this.toArrayLike(Buffer, order, size);
};
}

View File

@ -33,7 +33,7 @@ workers.spawn = function spawn(index) {
child = cp.spawn(process.argv[0], [__filename], {
stdio: ['pipe', 'pipe', 'inherit'],
env: utils.merge({}, process.env, {
BCOIN_WORKER: index + ''
BCOIN_WORKER_ID: index + ''
})
});
@ -121,7 +121,7 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb
workers.listen = function listen() {
bcoin.debug = function debug() {
process.stderr.write('Worker %s: ', process.env.BCOIN_WORKER);
process.stderr.write('Worker ' + process.env.BCOIN_WORKER_ID + ':');
return console.error.apply(console.error, arguments);
};
@ -372,5 +372,5 @@ function parser(onPacket) {
};
}
if (process.env.BCOIN_WORKER)
if (process.env.BCOIN_WORKER_ID)
workers.listen();