poll: wip

This commit is contained in:
Fedor Indutny 2014-05-01 22:43:38 +04:00
parent 98bf123f5d
commit 1432608813
4 changed files with 281 additions and 168 deletions

View File

@ -11,7 +11,7 @@ function Chain() {
map: {},
count: 0
};
this.map = {};
this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0);
this.last = null;
this.add(new bcoin.block(constants.genesis));
}
@ -20,7 +20,7 @@ module.exports = Chain;
Chain.prototype.add = function add(block) {
var res = false;
do {
var hash = block.hash('hex');
var rhash = block.hash();
var prev = utils.toHex(block.prevBlock);
// No need to revalidate orphans
@ -29,14 +29,16 @@ Chain.prototype.add = function add(block) {
// Add orphan
if (this.last && prev !== this.last) {
if (!this.map[hash] && !this.orphan.map[prev]) {
if (!this.bloom.test(rhash) && !this.orphan.map[prev]) {
this.orphan.count++;
this.orphan.map[prev] = block;
}
break;
}
this.map[hash] = block;
var hash = block.hash('hex');
this.bloom.add(rhash);
this.chain.push(block);
this.last = hash;
res = true;
@ -51,13 +53,15 @@ Chain.prototype.add = function add(block) {
break;
} while (true);
return res;
};
Chain.prototype.getLast = function getLast() {
return this.chain[this.chain.length - 1];
};
Chain.prototype.has = function hash(hash) {
Chain.prototype.has = function has(hash) {
hash = utils.toHex(hash);
return !!this.map[hash] && !!this.orphan.map[hash];
return this.bloom.test(hash) || !!this.orphan.map[hash];
};

View File

@ -122,6 +122,7 @@ Peer.prototype.destroy = function destroy() {
this.destroyed = true;
this.socket.destroy();
this.socket = null;
this.emit('close');
// Clean-up timeouts
Object.keys(this._broadcast.map).forEach(function(key) {
@ -193,65 +194,8 @@ Peer.prototype._res = function _res(cmd, payload) {
return false;
};
Peer.prototype.getData = function getData(items, cb) {
var map = {};
var waiting = items.length;
items.forEach(function(item) {
map[item.hash.join('.')] = {
item: item,
once: false,
result: null
};
});
var self = this;
function markEntry(hash, result) {
var entry = map[hash.join('.')];
if (!entry || entry.once)
return false;
entry.once = true;
entry.result = result;
waiting--;
return true;
}
Peer.prototype.getData = function getData(items) {
this._write(this.framer.getData(items));
// Process all incoming data, until all data is returned
var req = this._req(null, function(err, payload, cmd) {
if (err)
return done(err);
var ok = false;
if (cmd === 'notfound') {
ok = true;
payload.forEach(function(item) {
if (!markEntry(item.hash, null))
ok = false;
});
} else if (payload.hash) {
ok = markEntry(payload.hash(), payload);
}
if (!ok)
return self._request.skip;
if (waiting === 0)
done();
else
return self._request.cont;
});
// Just for debugging purposes
req._getDataMap = map;
function done(err) {
if (err)
return cb(err);
cb(null, items.map(function(item) {
return map[item.hash.join('.')].result;
}));
}
};
Peer.prototype._onPacket = function onPacket(packet) {
@ -277,8 +221,8 @@ Peer.prototype._onPacket = function onPacket(packet) {
payload = bcoin.tx(payload);
if (this._res(cmd, payload)) {
return;
} else if (cmd === 'tx') {
console.log('Omg', this.socket._handle.fd, cmd, payload.hash('hex'));
} else {
this.emit(cmd, payload);
}
};
@ -335,18 +279,9 @@ Peer.prototype._handleInv = function handleInv(items) {
if (txs.length === 0)
return;
var self = this;
this.getData(txs, function(err, data) {
if (err)
return self._error(err);
data.forEach(function(item) {
console.log(item.type, item.hash('hex'));
});
});
this.getData(txs);
};
Peer.prototype.loadBlocks = function loadBlocks(hash) {
console.log(this.socket._handle.fd, 'loading from: ', utils.toHex(hash), this.chain.chain.length, this.chain.orphan.count);
this._write(this.framer.getBlocks([ hash ]));
};

View File

@ -8,146 +8,319 @@ function Pool(options) {
this.options = options || {};
this.size = options.size || 16;
this.redundancy = options.redundancy || 3;
this.parallel = options.parallel || this.size;
this.pendingBlocks = {};
this.parallel = options.parallel || 4000;
this.loadTimeout = options.loadTimeout || 3000;
this.loadDelay = options.loadDelay || 750;
this.loadTimer = null;
this.loadWatermark = {
lo: options.lwm || 1000,
hi: options.hwm || 4000
};
this.maxRetries = options.maxRetries || 1000;
this.requestTimeout = options.requestTimeout || 30000;
this.chain = new bcoin.chain();
this.bloom = new bcoin.bloom(8 * 10 * 1024,
10,
(Math.random() * 0xffffffff) | 0),
this.peers = [];
this.pending = [];
this.peers = {
// Peers that are loading blocks themselves
block: [],
// Peers that are still connecting
pending: [],
// Peers that are loading block ids
load: null
};
this.block = {
lastSeen: null,
queue: [],
active: 0,
requests: {}
};
// Peers that are loading block ids
this.loadPeers = [];
this.createConnection = options.createConnection;
assert(this.createConnection);
this._init();
var self = this;
setInterval(function() {
console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d',
self.chain.chain.length, self.chain.orphan.count,
self.block.active,
self.block.queue.length,
Object.keys(self.block.requests).length,
process.memoryUsage().heapUsed);
}, 5000);
}
module.exports = Pool;
Pool.prototype._init = function _init() {
this._addLoader();
for (var i = 0; i < this.size; i++)
this._addPeer();
};
Pool.prototype._addPeer = function _addPeer() {
if (this.peers.length + this.pending.length >= this.size)
return;
Pool.prototype._addLoader = function _addLoader() {
assert(this.peers.load === null);
var socket = this.createConnection();
var peer = bcoin.peer(this, socket, this.options.peer);
this.pending.push(peer);
this.peers.load = peer;
var load = false;
if (this.loadPeers.length < this.redundancy) {
this.loadPeers.push(peer);
load = true;
}
// Create new peer on failure
var self = this;
peer.once('error', function(err) {
peer.once('error', function() {
// Just ignore, it will result in `close` anyway
});
peer.once('close', function() {
clearTimeout(timer);
self._removePeer(peer);
self._addPeer();
self._addLoader();
});
peer.once('ack', function() {
var i = self.pending.indexOf(peer);
if (i !== -1) {
self.pending.splice(i, 1);
self.peers.push(peer);
}
peer.updateWatch();
if (load)
peer.loadBlocks(self.chain.getLast().hash());
if (!self._load())
clearTimeout(timer);
});
function destroy() {
peer.destroy();
}
var timer = setTimeout(destroy, this.loadTimeout);
// Split blocks and request them using multiple peers
peer.on('blocks', function(hashes) {
if (hashes.length === 0)
return;
self._requestBlocks(hashes);
if (load)
peer.loadBlocks(hashes[hashes.length - 1]);
// Request each block
hashes.forEach(function(hash) {
self._requestBlock(hash);
});
self._scheduleRequests();
self.block.lastSeen = hashes[hashes.length - 1];
clearTimeout(timer);
// Reinstantiate timeout
if (self._load())
timer = setTimeout(destroy, self.loadTimeout);
});
};
Pool.prototype._load = function load() {
if (this.block.queue.length >= this.loadWatermark.hi)
return false;
// Load more blocks, starting from last hash
var hash;
if (this.block.lastSeen)
hash = this.block.lastSeen;
else
hash = this.chain.getLast().hash();
this.peers.load.loadBlocks(hash);
return true;
};
Pool.prototype._addPeer = function _addPeer() {
if (this.peers.block.length + this.peers.pending.length >= this.size)
return;
var socket = this.createConnection();
var peer = bcoin.peer(this, socket, this.options.peer);
this.peers.pending.push(peer);
peer._retry = 0;
// Create new peer on failure
var self = this;
peer.once('error', function(err) {
// Just ignore, it will result in `close` anyway
});
peer.once('close', function() {
self._removePeer(peer);
self._addPeer();
});
peer.once('ack', function() {
var i = self.peers.pending.indexOf(peer);
if (i !== -1) {
self.peers.pending.splice(i, 1);
self.peers.block.push(peer);
}
peer.updateWatch();
});
peer.on('merkleblock', function(block) {
self.chain.add(block);
var req = self.block.requests[block.hash('hex')];
if (!req)
return;
req.finish(block);
});
peer.on('tx', function(tx) {
console.log('got tx', tx.hash('hex'));
});
};
Pool.prototype._removePeer = function _removePeer(peer) {
var i = this.pending.indexOf(peer);
var i = this.peers.pending.indexOf(peer);
if (i !== -1)
this.pending.splice(i, 1);
this.peers.pending.splice(i, 1);
i = this.peers.indexOf(peer);
i = this.peers.block.indexOf(peer);
if (i !== -1)
this.peers.splice(i, 1);
this.peers.block.splice(i, 1);
i = this.loadPeers.indexOf(peer);
if (i !== -1)
this.loadPeers.splice(i, 1);
if (this.peers.load === peer)
this.peers.load = null;
};
Pool.prototype.watch = function watch(id) {
if (id)
this.bloom.add(id);
for (var i = 0; i < this.peers.length; i++)
this.peers[i].updateWatch();
for (var i = 0; i < this.peers.block.length; i++)
this.peers.block[i].updateWatch();
};
Pool.prototype._getPeers = function _getPeers() {
var res = [];
Pool.prototype._requestBlock = function _requestBlock(hash) {
// Block is already in chain, or being requested
if (this.chain.has(hash) || this.block.requests[utils.toHex(hash)])
return;
if (this.peers.length <= this.redundancy)
return this.peers.slice();
for (var i = 0; i < this.redundancy; i++) {
var peer = this.peers[(Math.random() * this.peers.length) | 0];
if (res.indexOf(peer) !== -1)
continue;
res.push(peer);
}
return res;
var req = new BlockRequest(this, hash);
this.block.requests[utils.toHex(hash)] = req;
this.block.queue.push(req);
};
Pool.prototype._requestBlocks = function _requestBlocks(hashes, force) {
// Do not request blocks that either already in chain, or are
// already requested from some of the peers
hashes = hashes.filter(function(hash) {
return !this.chain.has(hash) &&
(force || !this.pendingBlocks[utils.toHex(hash)]);
}, this).map(function(hash) {
this.pendingBlocks[utils.toHex(hash)] = true;
return { type: 'filtered', hash: hash };
}, this);
// Split blocks into chunks and send each chunk to some peers
var chunks = [];
var count = Math.ceil(hashes.length / this.parallel);
for (var i = 0; i < hashes.length; i += count)
chunks.push(hashes.slice(i, i + count));
Pool.prototype._scheduleRequests = function _scheduleRequests() {
if (this.loadTimer !== null)
return;
var self = this;
chunks.forEach(function(chunk) {
var peers = self._getPeers();
peers.forEach(function(peer) {
peer.getData(chunk, function(err, blocks) {
// Re-request blocks on failure
if (err) {
return self._requestBlocks(chunk.map(function(item) {
return item.hash;
}), true);
}
// Add blocks to chain on success
blocks.forEach(function(block) {
delete self.pendingBlocks[block.hash('hex')];
self.chain.add(block);
});
});
});
});
this.loadTimer = setTimeout(function() {
self.loadTimer = null;
self._doRequests();
}, this.loadDelay);
};
Pool.prototype._doRequests = function _doRequests() {
if (this.block.active >= this.parallel)
return;
var above = this.block.queue.length >= this.loadWatermark.lo;
var items = this.block.queue.slice(0, this.parallel - this.block.active);
this.block.queue = this.block.queue.slice(items.length);
var below = this.block.queue.length < this.loadWatermark.lo;
// Watermark boundary crossed, load more blocks
if (above && below)
this._load();
// Split list between nodes
var count = this.peers.block.length;
var split = Math.ceil(items.length / count);
for (var i = 0, off = 0; i < count; i++, off += split) {
var peer = this.peers.block[i];
peer.getData(items.slice(off, off + split).map(function(item) {
return item.start(peer);
}));
}
};
function BlockRequest(pool, hash) {
this.pool = pool
this.hash = hash;
this.timer = null;
this.peer = null;
this.ts = +new Date;
this.active = false;
}
function binaryInsert(list, item, compare) {
var start = 0,
end = list.length;
while (start < end) {
var pos = (start + end) >> 1;
var cmp = compare(item, list[pos]);
if (cmp === 0) {
start = pos;
end = pos;
break;
} else if (cmp < 0) {
end = pos;
} else {
start = pos + 1;
}
}
list.splice(start, 0, item);
}
BlockRequest.prototype.start = function start(peer) {
var self = this;
this.peer = peer;
if (this.timer)
clearTimeout(this.timer);
this.timer = setTimeout(function() {
self.timer = null;
self.retry();
}, this.pool.requestTimeout);
if (!this.active)
this.pool.block.active++;
this.active = true;
return {
type: 'filtered',
hash: this.hash
};
};
BlockRequest.compare = function compare(a, b) {
return a.ts - b.ts;
};
BlockRequest.prototype.retry = function retry() {
assert(this.active);
this.pool.block.active--;
// Put block into the queue, ensure that the queue is always sorted by ts
binaryInsert(this.pool.block.queue, this, BlockRequest.compare);
this.active = false;
// And schedule requesting blocks again
this.pool._scheduleRequests();
// Kill peer, if it misbehaves
if (++this.peer._retry > this.pool._maxRetries)
this.peer.destroy();
};
BlockRequest.prototype.finish = function finish(block) {
if (this.active) {
this.pool.block.active--;
this.active = false;
} else {
// It could be that request was never sent to the node, remove it from
// queue and forget about it
var index = this.pool.block.queue.indexOf(this);
if (index !== -1)
this.pool.block.queue.splice(index, 1);
}
delete this.pool.block.requests[block.hash('hex')];
if (this.timer)
clearTimeout(this.timer);
this.timer = null;
// We may have some free slots in queue now
this.pool._scheduleRequests();
};

View File

@ -75,8 +75,8 @@ Framer.prototype.version = function version(packet) {
this._addr(p, 46);
// Nonce, very dramatic
writeU32(p, 0xdeadbeef, 72);
writeU32(p, 0xabbadead, 76);
writeU32(p, (Math.random() * 0xffffffff) | 0, 72);
writeU32(p, (Math.random() * 0xffffffff) | 0, 76);
// No user-agent
p[80] = 0;
@ -120,6 +120,7 @@ function varint(arr, value, off) {
Framer.prototype._inv = function _inv(command, items) {
var res = [];
var off = varint(res, items.length, 0);
assert(items.length <= 50000);
for (var i = 0; i < items.length; i++) {
// Type