lib: save progress

This commit is contained in:
Fedor Indutny 2014-04-30 21:45:55 +04:00
parent a20cb1688f
commit 6cb1594424
8 changed files with 372 additions and 76 deletions

View File

@ -7,5 +7,7 @@ bcoin.bloom = require('./bcoin/bloom');
bcoin.protocol = require('./bcoin/protocol');
bcoin.tx = require('./bcoin/tx');
bcoin.block = require('./bcoin/block');
bcoin.chain = require('./bcoin/chain');
bcoin.wallet = require('./bcoin/wallet');
bcoin.peer = require('./bcoin/peer');
bcoin.pool = require('./bcoin/pool');

View File

@ -14,14 +14,17 @@ function Block(data) {
this.nonce = data.nonce;
this._hash = null;
this._hexHash = null;
}
module.exports = Block;
Block.prototype.hash = function hash(enc) {
// Hash it
if (!this._hash)
if (!this._hash) {
this._hash = utils.dsha256(this.abbr());
return enc === 'hex' ? utils.toHex(this._hash) : this._hash;
this._hexHash = utils.toHex(this._hash);
}
return enc === 'hex' ? this._hexHash : this._hash;
};
Block.prototype.abbr = function abbr() {
@ -36,7 +39,11 @@ Block.prototype.abbr = function abbr() {
return res;
};
Block.prototype.verify = function verify() {
// TODO(indutny): verify nonce
return true;
};
Block.prototype.render = function render(framer) {
console.log('here');
return [];
};

63
lib/bcoin/chain.js Normal file
View File

@ -0,0 +1,63 @@
var bcoin = require('../bcoin');
var constants = bcoin.protocol.constants;
var utils = bcoin.utils;
function Chain() {
if (!(this instanceof Chain))
return new Chain();
this.chain = [];
this.orphan = {
map: {},
count: 0
};
this.map = {};
this.last = null;
this.add(new bcoin.block(constants.genesis));
}
module.exports = Chain;
Chain.prototype.add = function add(block) {
var res = false;
do {
var hash = block.hash('hex');
var prev = utils.toHex(block.prevBlock);
// No need to revalidate orphans
if (!res && !block.verify())
break;
// Add orphan
if (this.last && prev !== this.last) {
if (!this.orphan.map[prev]) {
this.orphan.count++;
this.orphan.map[prev] = block;
}
break;
}
this.map[hash] = block;
this.chain.push(block);
this.last = hash;
res = true;
// We have orphan child for this block - add it to chain
if (this.orphan.map[hash]) {
block = this.orphan.map[hash];
delete this.orphan.map[hash];
this.orphan.count--;
continue;
}
break;
} while (true);
};
Chain.prototype.getLast = function getLast() {
return this.chain[this.chain.length - 1];
};
Chain.prototype.has = function hash(hash) {
hash = utils.toHex(hash);
return !!this.map[hash] && !!this.orphan.map[hash];
};

View File

@ -12,19 +12,21 @@ try {
} catch (e) {
}
function Peer(socket, options) {
function Peer(pool, socket, options) {
if (!(this instanceof Peer))
return new Peer(socket, options);
return new Peer(pool, socket, options);
EventEmitter.call(this);
this.pool = pool;
this.socket = socket;
this.parser = new bcoin.protocol.parser();
this.framer = new bcoin.protocol.framer();
this.bloom = new bcoin.bloom(8 * 10 * 1024,
10,
(Math.random() * 0xffffffff) | 0),
this.chain = this.pool.chain;
this.bloom = this.pool.bloom;
this.version = null;
this.destroyed = false;
this.ack = false;
this.options = options || {};
this._broadcast = {
@ -33,7 +35,9 @@ function Peer(socket, options) {
};
this._request = {
timeout: this.options.requestTimeout || 30000,
timeout: this.options.requestTimeout || 10000,
cont: {},
skip: {},
queue: []
};
@ -77,6 +81,7 @@ Peer.prototype._init = function init() {
this._req('verack', function(err, payload) {
if (err)
return self._error(err);
self.ack = true;
self.emit('ack');
});
};
@ -106,16 +111,9 @@ Peer.prototype.broadcast = function broadcast(items) {
this._write(this.framer.inv(items));
};
Peer.prototype.watch = function watch(id) {
this.bloom.add(id);
this._write(this.framer.filterLoad(this.bloom, 'pubkeyOnly'));
};
Peer.prototype.loadBlocks = function loadBlocks() {
if (this.loadingBlocks)
return;
this.loadingBlocks = true;
this._write(this.framer.getBlocks([ constants.genesis ]));
Peer.prototype.updateWatch = function updateWatch() {
if (this.ack)
this._write(this.framer.filterLoad(this.bloom, 'pubkeyOnly'));
};
Peer.prototype.destroy = function destroy() {
@ -157,41 +155,49 @@ Peer.prototype._req = function _req(cmd, cb) {
cb: cb,
ontimeout: function() {
var i = self._request.queue.indexOf(entry);
if (i !== -1)
self.request.queue.splice(i, 1);
cb(new Error('Timed out'), null);
if (i !== -1) {
self._request.queue.splice(i, 1);
cb(new Error('Timed out: ' + cmd), null);
}
},
timer: null
};
entry.timer = setTimeout(entry.ontimeout, this._request.timeout)
this._request.queue.push(entry);
return entry;
};
Peer.prototype._res = function _res(cmd, payload) {
var entry = this._request.queue[0];
if (!entry || entry.cmd && entry.cmd !== cmd)
return;
for (var i = 0; i < this._request.queue.length; i++) {
var entry = this._request.queue[i];
if (!entry || entry.cmd && entry.cmd !== cmd)
return false;
var res = entry.cb(null, payload, cmd);
var res = entry.cb(null, payload, cmd);
// If callback returns false - it hasn't finished processing responses
if (res === false) {
assert(!entry.cmd);
if (res === this._request.cont) {
assert(!entry.cmd);
// Restart timer
entry.timer = setTimeout(entry.ontimeout, this._request.timeout)
} else {
this._request.queue.shift();
clearTimeout(entry.timer);
entry.timer = null;
// Restart timer
entry.timer = setTimeout(entry.ontimeout, this._request.timeout)
return true;
} else if (res !== this._request.skip) {
this._request.queue.shift();
clearTimeout(entry.timer);
entry.timer = null;
return true;
}
}
return false;
};
Peer.prototype._getData = function _getData(items, cb) {
Peer.prototype.getData = function getData(items, cb) {
var map = {};
var waiting = items.length;
items.forEach(function(item) {
map[utils.toHex(item.hash)] = {
map[item.hash.join('.')] = {
item: item,
once: false,
result: null
@ -199,13 +205,10 @@ Peer.prototype._getData = function _getData(items, cb) {
});
var self = this;
function markEntry(hash, result) {
var entry = map[utils.toHex(hash)];
if (!entry || entry.once) {
done(new Error('Invalid notfound entry hash'));
var entry = map[hash.join('.')];
if (!entry || entry.once)
return false;
}
entry.once = true;
entry.result = result;
@ -215,40 +218,38 @@ Peer.prototype._getData = function _getData(items, cb) {
this._write(this.framer.getData(items));
// Process all incoming data, until all data is returned
this._req(null, function(err, payload, cmd) {
var ok = true;
var req = this._req(null, function(err, payload, cmd) {
if (err)
return done(err);
var ok = false;
if (cmd === 'notfound') {
ok = payload.every(function(item) {
return markEntry(item.hash, null);
ok = true;
payload.forEach(function(item) {
if (!markEntry(item.hash, null))
ok = false;
});
} else if (cmd === 'tx') {
var tx = bcoin.tx(payload);
ok = markEntry(tx.hash(), b);
} else if (cmd === 'merkleblock') {
var b = bcoin.block(payload);
ok = markEntry(b.hash(), b);
} else if (cmd === 'block') {
var b = bcoin.block(payload);
ok = markEntry(b.hash(), b);
} else {
done(new Error('Unknown packet in reply to getdata: ' + cmd));
return;
} else if (payload.hash) {
ok = markEntry(payload.hash(), payload);
}
if (!ok)
return;
return self._request.skip;
if (waiting === 0)
done();
else
return false;
return self._request.cont;
});
// Just for debugging purposes
req._getDataMap = map;
function done(err) {
if (err)
return cb(err);
cb(null, items.map(function(item) {
return map[utils.toHex(item.hash)].result;
return map[item.hash.join('.')].result;
}));
}
};
@ -263,8 +264,22 @@ Peer.prototype._onPacket = function onPacket(packet) {
return this._handleInv(payload);
else if (cmd === 'getdata')
return this._handleGetData(payload);
else
return this._res(cmd, payload);
else if (cmd === 'addr')
return this._handleAddr(payload);
else if (cmd === 'ping')
return this._handlePing(payload);
else if (cmd === 'pong')
return this._handlePong(payload);
if (cmd === 'merkleblock' || cmd === 'block')
payload = bcoin.block(payload);
else if (cmd === 'tx')
payload = bcoin.tx(payload);
if (this._res(cmd, payload)) {
return;
} else if (cmd === 'tx') {
console.log('Omg', this.socket._handle.fd, cmd, payload.hash('hex'));
}
};
Peer.prototype._handleVersion = function handleVersion(payload) {
@ -288,25 +303,50 @@ Peer.prototype._handleGetData = function handleGetData(items) {
}, this);
};
Peer.prototype._handleAddr = function handleAddr(addr) {
// No-op for now
};
Peer.prototype._handlePing = function handlePing() {
// No-op for now
};
Peer.prototype._handlePong = function handlePong() {
// No-op for now
};
Peer.prototype._handleInv = function handleInv(items) {
// Always request what advertised
var req = items.filter(function(item) {
return item.type === 'tx' || item.type === 'block';
// Always request advertised TXs
var txs = items.filter(function(item) {
return item.type === 'tx';
}).map(function(item) {
if (item.type === 'tx')
return item;
if (item.type === 'block')
return { type: 'filtered', hash: item.hash };
});
// Emit new blocks to schedule them between multiple peers
var blocks = items.filter(function(item) {
return item.type === 'block';
}, this).map(function(item) {
return item.hash;
});
this.emit('blocks', blocks);
if (txs.length === 0)
return;
var self = this;
this._getData(req, function(err, data) {
this.getData(txs, function(err, data) {
if (err)
return self._error(err);
console.log(data.join(', '));
data.forEach(function(item) {
console.log(item.type, item.hash('hex'));
});
});
};
Peer.prototype._handleMerkleBlock = function handleMerkleBlock(block) {
console.log(utils.toHex(block.prevBlock));
Peer.prototype.loadBlocks = function loadBlocks(hash) {
console.log(this.socket._handle.fd, 'loading from: ', utils.toHex(hash), this.chain.chain.length, this.chain.orphan.count);
this._write(this.framer.getBlocks([ hash ]));
};

153
lib/bcoin/pool.js Normal file
View File

@ -0,0 +1,153 @@
var assert = require('assert');
var bcoin = require('../bcoin');
var utils = bcoin.utils;
function Pool(options) {
if (!(this instanceof Pool))
return new Pool(options);
this.options = options || {};
this.size = options.size || 16;
this.redundancy = options.redundancy || 3;
this.parallel = options.parallel || this.size;
this.pendingBlocks = {};
this.chain = new bcoin.chain();
this.bloom = new bcoin.bloom(8 * 10 * 1024,
10,
(Math.random() * 0xffffffff) | 0),
this.peers = [];
this.pending = [];
// Peers that are loading block ids
this.loadPeers = [];
this.createConnection = options.createConnection;
assert(this.createConnection);
this._init();
}
module.exports = Pool;
Pool.prototype._init = function _init() {
for (var i = 0; i < this.size; i++)
this._addPeer();
};
Pool.prototype._addPeer = function _addPeer() {
if (this.peers.length + this.pending.length >= this.size)
return;
var socket = this.createConnection();
var peer = bcoin.peer(this, socket, this.options.peer);
this.pending.push(peer);
var load = false;
if (this.loadPeers.length < this.redundancy) {
this.loadPeers.push(peer);
load = true;
}
// Create new peer on failure
var self = this;
peer.once('error', function(err) {
self._removePeer(peer);
self._addPeer();
});
peer.once('ack', function() {
var i = self.pending.indexOf(peer);
if (i !== -1) {
self.pending.splice(i, 1);
self.peers.push(peer);
}
peer.updateWatch();
if (load)
peer.loadBlocks(self.chain.getLast().hash());
});
// Split blocks and request them using multiple peers
peer.on('blocks', function(hashes) {
if (hashes.length === 0)
return;
self._requestBlocks(hashes);
if (load)
peer.loadBlocks(hashes[hashes.length - 1]);
});
};
Pool.prototype._removePeer = function _removePeer(peer) {
var i = this.pending.indexOf(peer);
if (i !== -1)
this.pending.splice(i, 1);
i = this.peers.indexOf(peer);
if (i !== -1)
this.peers.splice(i, 1);
i = this.loadPeers.indexOf(peer);
if (i !== -1)
this.loadPeers.splice(i, 1);
};
Pool.prototype.watch = function watch(id) {
if (id)
this.bloom.add(id);
for (var i = 0; i < this.peers.length; i++)
this.peers[i].updateWatch();
};
Pool.prototype._getPeers = function _getPeers() {
var res = [];
if (this.peers.length <= this.redundancy)
return this.peers.slice();
for (var i = 0; i < this.redundancy; i++) {
var peer = this.peers[(Math.random() * this.peers.length) | 0];
if (res.indexOf(peer) !== -1)
continue;
res.push(peer);
}
return res;
};
Pool.prototype._requestBlocks = function _requestBlocks(hashes, force) {
// Do not request blocks that either already in chain, or are
// already requested from some of the peers
hashes = hashes.filter(function(hash) {
return !this.chain.has(hash) &&
(force || !this.pendingBlocks[utils.toHex(hash)]);
}, this).map(function(hash) {
this.pendingBlocks[utils.toHex(hash)] = true;
return { type: 'filtered', hash: hash };
}, this);
// Split blocks into chunks and send each chunk to some peers
var chunks = [];
var count = Math.ceil(hashes.length / this.parallel);
for (var i = 0; i < hashes.length; i += count)
chunks.push(hashes.slice(i, i + count));
var self = this;
chunks.forEach(function(chunk) {
var peers = self._getPeers();
peers.forEach(function(peer) {
peer.getData(chunk, function(err, blocks) {
// Re-request blocks on failure
if (err) {
return self._requestBlocks(chunk.map(function(item) {
return item.hash;
}), true);
}
// Add blocks to chain on success
blocks.forEach(function(block) {
delete self.pendingBlocks[block.hash('hex')];
self.chain.add(block);
});
});
});
});
};

View File

@ -4,8 +4,20 @@ var utils = bcoin.utils;
exports.minVersion = 70001;
exports.version = 70002;
exports.magic = 0xd9b4bef9;
exports.genesis = utils.toArray(
'000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 'hex');
exports.genesis = {
version: 1,
prevBlock: [ 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0 ],
merkleRoot: utils.toArray(
'4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b',
'hex'
).reverse(),
ts: 1231006505,
bits: 0x1d00ffff,
nonce: 2083236893
};
// version - services field
exports.services = {

View File

@ -90,6 +90,8 @@ Parser.prototype.parsePayload = function parsePayload(cmd, p) {
return this.parseInvList(p);
else if (cmd === 'merkleblock')
return this.parseMerkleBlock(p);
else if (cmd === 'block')
return this.parseBlock(p);
else
return p;
};
@ -162,7 +164,7 @@ Parser.prototype.parseInvList = function parseInvList(p) {
return items;
};
Parser.prototype.parseMerkleBlock = function parsMerkleBlock(p) {
Parser.prototype.parseMerkleBlock = function parseMerkleBlock(p) {
if (p.length < 84)
return this.emit('error', new Error('Invalid merkleblock size'));
@ -179,3 +181,21 @@ Parser.prototype.parseMerkleBlock = function parsMerkleBlock(p) {
// flags:
};
};
Parser.prototype.parseBlock = function parseBlock(p) {
if (p.length < 84)
return this.emit('error', new Error('Invalid block size'));
return {
version: readU32(p, 0),
prevBlock: p.slice(4, 36),
merkleRoot: p.slice(36, 68),
ts: readU32(p, 68),
bits: readU32(p, 72),
nonce: readU32(p, 76),
totalTx: readU32(p, 80)
// hashes:
// flags:
};
};

View File

@ -23,6 +23,5 @@ TX.prototype.hash = function hash(enc) {
};
TX.prototype.render = function render(framer) {
console.log('here');
return [];
};