getutxos. arg parsing.

This commit is contained in:
Christopher Jeffrey 2016-04-04 20:27:29 -07:00
parent 4b20a87b4f
commit d423e33e03
4 changed files with 215 additions and 144 deletions

View File

@ -77,9 +77,8 @@ function Peer(pool, options) {
this._request = {
timeout: this.options.requestTimeout || 10000,
cont: {},
skip: {},
queue: []
queue: {}
};
this._ping = {
@ -316,13 +315,17 @@ Peer.prototype.updateWatch = function updateWatch() {
if (!this.pool.options.spv)
return;
if (this.ack)
this._write(this.framer.filterLoad(this.bloom, 'none'));
if (this.ack) {
this._write(this.framer.filterLoad({
filter: this.bloom.toBuffer(),
n: this.bloom.n,
tweak: this.bloom.tweak,
update: 'none'
}));
}
};
Peer.prototype.destroy = function destroy() {
var i;
if (this.destroyed)
return;
@ -340,8 +343,13 @@ Peer.prototype.destroy = function destroy() {
clearInterval(this._ping.timer);
this._ping.timer = null;
for (i = 0; i < this._request.queue.length; i++)
clearTimeout(this._request.queue[i].timer);
Object.keys(this._request).forEach(function(cmd) {
var queue = this._request[cmd];
var i;
for (i = 0; i < queue.length; i++)
clearTimeout(queue[i].timer);
}, this);
};
Peer.prototype._write = function write(chunk) {
@ -375,9 +383,16 @@ Peer.prototype._req = function _req(cmd, cb) {
cmd: cmd,
cb: cb,
ontimeout: function() {
var i = self._request.queue.indexOf(entry);
var queue = self._request.queue[cmd];
var i;
if (!queue)
return;
i = queue.indexOf(entry);
if (i !== -1) {
self._request.queue.splice(i, 1);
queue.splice(i, 1);
cb(new Error('Timed out: ' + cmd), null);
}
},
@ -386,35 +401,35 @@ Peer.prototype._req = function _req(cmd, cb) {
entry.timer = setTimeout(entry.ontimeout, this._request.timeout);
this._request.queue.push(entry);
if (!this._request.queue[cmd])
this._request.queue[cmd] = [];
this._request.queue[cmd].push(entry);
return entry;
};
Peer.prototype._res = function _res(cmd, payload) {
var i, entry, res;
var queue = this._request.queue[cmd];
var entry, res;
for (i = 0; i < this._request.queue.length; i++) {
entry = this._request.queue[i];
if (!queue)
return false;
if (!entry || (entry.cmd && entry.cmd !== cmd))
return false;
entry = queue[0];
res = entry.cb(null, payload, cmd);
if (!entry)
return false;
if (res === this._request.cont) {
assert(!entry.cmd);
res = entry.cb(null, payload, cmd);
// Restart timer
if (!this.destroyed)
entry.timer = setTimeout(entry.ontimeout, this._request.timeout);
return true;
} else if (res !== this._request.skip) {
this._request.queue.shift();
clearTimeout(entry.timer);
entry.timer = null;
return true;
}
if (res !== this._request.skip) {
queue.shift();
if (queue.length === 0)
delete this._request.queue[cmd];
clearTimeout(entry.timer);
entry.timer = null;
return true;
}
return false;
@ -543,13 +558,71 @@ Peer.prototype._handleUTXOs = function _handleUTXOs(payload) {
return new bcoin.coin(coin);
});
utils.debug('Received %d utxos from %s.', payload.coins.length, this.host);
this.emit('utxos', payload);
this._emit('utxos', payload);
};
Peer.prototype.getUTXOs = function getUTXOs(utxos, callback) {
var self = this;
var reqs = [];
var coins = [];
var i;
for (i = 0; i < utxos.length; i += 15)
reqs.push(utxos.slice(i, i + 15));
utils.forEachSerial(reqs, function(utxos, next) {
self._getUTXOs(utxos, function(err, coin) {
if (err)
return next(err);
coins = coins.concat(coin);
next();
});
}, function(err) {
if (err)
return callback(err);
return callback(null, coins);
});
};
Peer.prototype._getUTXOs = function getUTXOs(utxos, callback) {
var index = 0;
var i, prevout, coin;
this._req('utxos', function(err, payload) {
if (err)
return callback(err);
for (i = 0; i < payload.hits.length; i++) {
if (payload.hits[i]) {
prevout = utxos[i];
coin = payload.coins[index++];
if (!prevout || !coin)
return callback(new Error('Malformed utxos message.'));
coin.hash = prevout.hash;
coin.index = prevout.index;
}
}
return callback(null, payload.coins);
});
this._write(this.framer.getUTXOs({
mempool: true,
prevout: utxos.map(function(item) {
return { hash: item[0], index: item[1] };
})
}));
};
Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
var self = this;
var coins = [];
var notfound = [];
var hits = [];
if (this.pool.options.selfish)
return;
@ -577,6 +650,9 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
self.mempool.isSpent(hash, index, callback);
}
if (payload.prevout.length > 15)
return;
utils.forEachSerial(payload.prevout, function(prevout, next, i) {
var hash = prevout.hash;
var index = prevout.index;
@ -586,6 +662,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
return next(err);
if (coin) {
hits.push(1);
coins.push(coin);
return next();
}
@ -595,7 +672,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
return next(err);
if (result) {
notfound.push(i);
hits.push(0);
return next();
}
@ -604,10 +681,11 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
return next(err);
if (!coin) {
notfound.push(i);
hits.push(0);
return next();
}
hits.push(1);
coins.push(coin);
next();
@ -621,7 +699,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
self._write(self.framer.UTXOs({
height: self.chain.height,
tip: self.chain.tip.hash,
notfound: notfound,
hits: hits,
coins: coins
}));
});
@ -778,7 +856,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) {
if (this.options.witness) {
if (!payload.witness) {
this._req('havewitness', function(err, payload) {
this._req('havewitness', function(err) {
if (err) {
self._error('Peer does not support segregated witness.');
self.setMisbehavior(100);
@ -1151,30 +1229,30 @@ Peer.prototype._handleAlert = function handleAlert(details) {
this.emit('alert', details);
};
Peer.prototype.getHeaders = function getHeaders(hashes, stop) {
Peer.prototype.getHeaders = function getHeaders(locator, stop) {
utils.debug(
'Requesting headers packet from %s with getheaders',
this.host);
utils.debug('Height: %s, Hash: %s, Stop: %s',
this.chain._getCachedHeight(hashes[0]),
hashes ? utils.revHex(hashes[0]) : 0,
locator && locator.length ? this.chain._getCachedHeight(locator[0]) : null,
locator && locator.length ? utils.revHex(locator[0]) : 0,
stop ? utils.revHex(stop) : 0);
this._write(this.framer.getHeaders(hashes, stop));
this._write(this.framer.getHeaders({ locator: locator, stop: stop }));
};
Peer.prototype.getBlocks = function getBlocks(hashes, stop) {
Peer.prototype.getBlocks = function getBlocks(locator, stop) {
utils.debug(
'Requesting inv packet from %s with getblocks',
this.host);
utils.debug('Height: %s, Hash: %s, Stop: %s',
this.chain._getCachedHeight(hashes[0]),
hashes ? utils.revHex(hashes[0]) : 0,
locator && locator.length ? this.chain._getCachedHeight(locator[0]) : null,
locator && locator.length ? utils.revHex(locator[0]) : 0,
stop ? utils.revHex(stop) : 0);
this._write(this.framer.getBlocks(hashes, stop));
this._write(this.framer.getBlocks({ locator: locator, stop: stop }));
};
Peer.prototype.getMempool = function getMempool() {

View File

@ -1730,6 +1730,39 @@ Pool.prototype.getPeer = function getPeer(addr) {
}
};
Pool.prototype.getUTXOs = function getUTXOs(utxos, callback) {
var peer = this.peers.load || this.peers.regular[0];
if (!peer)
return utils.asyncify(callback)(new Error('No peer available.'));
peer.getUTXOs(utxos, callback);
};
Pool.prototype.fillTX = function fillTX(tx, callback) {
var utxos = [];
var reqs = [];
var i, input;
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
if (!input.coin)
utxos.push([input.prevout.hash, input.prevout.index]);
}
if (utxos.length === 0)
return utils.asyncify(callback)(null, tx);
this.getUTXOs(utxos, function(err, coins) {
if (err)
return callback(err);
tx.fillCoins(coins);
return callback(null, tx);
});
};
Pool.prototype.getSeed = function getSeed(priority) {
var addr;

View File

@ -11,7 +11,6 @@ var utils = require('../utils');
var assert = utils.assert;
var BufferWriter = require('../writer');
var DUMMY = new Buffer([]);
var bn = require('bn.js');
/**
* Framer
@ -141,16 +140,16 @@ Framer.prototype.pong = function pong(data) {
return this.packet('pong', Framer.pong(data));
};
Framer.prototype.filterLoad = function filterLoad(bloom, update) {
return this.packet('filterload', Framer.filterLoad(bloom, update));
Framer.prototype.filterLoad = function filterLoad(data) {
return this.packet('filterload', Framer.filterLoad(data));
};
Framer.prototype.getHeaders = function getHeaders(hashes, stop) {
return this.packet('getheaders', Framer.getHeaders(hashes, stop));
Framer.prototype.getHeaders = function getHeaders(data) {
return this.packet('getheaders', Framer.getHeaders(data));
};
Framer.prototype.getBlocks = function getBlocks(hashes, stop) {
return this.packet('getblocks', Framer.getBlocks(hashes, stop));
Framer.prototype.getBlocks = function getBlocks(data) {
return this.packet('getblocks', Framer.getBlocks(data));
};
Framer.prototype.utxo = function _coin(coin) {
@ -192,38 +191,30 @@ Framer.prototype.addr = function addr(peers) {
Framer.address = function address(data, full, writer) {
var p = new BufferWriter(writer);
if (!data)
data = {};
if (full) {
if (!data.ts)
p.writeU32(utils.now() - (process.uptime() | 0));
else
p.writeU32(data.ts);
}
if (!data.ts)
data.ts = utils.now() - (process.uptime() | 0);
if (!data.services)
data.services = 0;
if (!data.port)
data.port = network.port;
if (full)
p.writeU32(data.ts);
p.writeU64(data.services);
p.writeU64(data.services || 0);
if (data.ipv6) {
data.ipv6 = utils.ip2array(data.ipv6, 6);
p.writeBytes(data.ipv6);
p.writeBytes(utils.ip2array(data.ipv6, 6));
} else {
if (!data.ipv4)
data.ipv4 = new Buffer([0, 0, 0, 0]);
data.ipv4 = utils.ip2array(data.ipv4, 4);
// We don't have an ipv6, convert ipv4 to ipv4-mapped ipv6 address
// We don't have an ipv6 address: convert
// ipv4 to ipv4-mapped ipv6 address.
p.writeU32BE(0x00000000);
p.writeU32BE(0x00000000);
p.writeU32BE(0x0000ffff);
p.writeBytes(data.ipv4);
if (data.ipv4)
p.writeBytes(utils.ip2array(data.ipv4, 4));
else
p.writeU32BE(0x00000000);
}
p.writeU16BE(data.port);
p.writeU16BE(data.port || network.port);
if (!writer)
p = p.render();
@ -233,29 +224,23 @@ Framer.address = function address(data, full, writer) {
Framer.version = function version(options, writer) {
var p = new BufferWriter(writer);
var agent = options.agent || constants.userAgent;
var remote = options.remote || {};
var local = options.local || {};
if (!options.agent)
options.agent = new Buffer(constants.userAgent, 'ascii');
if (typeof agent === 'string')
agent = new Buffer(agent, 'ascii');
if (!options)
options = {};
if (!options.remote)
options.remote = {};
if (!options.local)
options.local = {};
if (options.local.services == null)
options.local.services = constants.localServices;
if (local.services == null)
local.services = constants.localServices;
p.write32(constants.version);
p.writeU64(constants.localServices);
p.write64(utils.now());
Framer.address(options.remote, false, p);
Framer.address(options.local, false, p);
Framer.address(remote, false, p);
Framer.address(local, false, p);
p.writeU64(utils.nonce());
p.writeVarString(options.agent);
p.writeVarString(agent);
p.write32(options.height || 0);
p.writeU8(options.relay ? 1 : 0);
@ -317,30 +302,18 @@ Framer.pong = function pong(data) {
return p;
};
Framer.filterLoad = function filterLoad(bloom, update, writer) {
Framer.filterLoad = function filterLoad(data, writer) {
var p = new BufferWriter(writer);
var filter, n, tweak;
if (bloom instanceof bcoin.bloom) {
filter = bloom.toBuffer();
n = bloom.n;
tweak = bloom.tweak;
} else {
writer = update;
update = bloom.update;
filter = bloom.filter;
n = bloom.n;
tweak = bloom.tweak;
}
var update = data.update;
if (typeof update === 'string')
update = constants.filterFlags[update];
assert(update != null, 'Bad filter flag.');
p.writeVarBytes(filter);
p.writeU32(n);
p.writeU32(tweak);
p.writeVarBytes(data.filter);
p.writeU32(data.n);
p.writeU32(data.tweak);
p.writeU8(update);
if (!writer)
@ -349,33 +322,33 @@ Framer.filterLoad = function filterLoad(bloom, update, writer) {
return p;
};
Framer.getHeaders = function getHeaders(locator, stop, writer) {
// NOTE: getheaders can have an empty locator.
return Framer._getBlocks(locator || [], stop, writer);
Framer.getHeaders = function getHeaders(data, writer) {
return Framer._getBlocks(data, writer, true);
};
Framer.getBlocks = function getBlocks(locator, stop, writer) {
return Framer._getBlocks(locator, stop, writer);
Framer.getBlocks = function getBlocks(data, writer) {
return Framer._getBlocks(data, writer, false);
};
Framer._getBlocks = function _getBlocks(locator, stop, writer) {
var p, i, version;
if (locator.locator) {
writer = stop;
stop = locator.stop;
version = locator.version;
locator = locator.locator;
}
Framer._getBlocks = function _getBlocks(data, writer, headers) {
var version = data.version;
var locator = data.locator;
var stop = data.stop;
var p, i;
if (!version)
version = constants.version;
if (!locator) {
if (headers)
locator = [];
else
assert(false, 'getblocks requires a locator');
}
if (!stop)
stop = constants.zeroHash;
assert(locator, 'getblocks requires a locator');
p = new BufferWriter(writer);
p.writeU32(version);
@ -819,19 +792,10 @@ Framer.UTXOs = function UTXOs(data, writer) {
var i, j, coin, height, index, map;
if (!data.map) {
assert(data.notfound);
map = new bn(0);
j = -1;
for (i = 0; i < data.notfound.length; i++) {
index = data.notfound[i];
while (++j < index) {
map.iushln(1);
map.iuorn(1);
}
map.iushln(1);
map.iuorn(0);
}
map = map.toBuffer('be');
assert(data.hits);
map = new Buffer((data.hits.length + 7) / 8 | 0);
for (i = 0; i < data.hits.length; i++)
map[i / 8 | 0] |= +data.hits[i] << (i % 8);
} else {
map = data.map;
}

View File

@ -287,7 +287,7 @@ Parser.parseGetUTXOs = function parseGetUTXOs(p) {
Parser.parseUTXOs = function parseUTXOs(p) {
var chainHeight, tip, map, count, coins;
var coin, version, height, i, notfound, ch, j;
var coin, version, height, i, hits, ch, j;
p = new BufferReader(p);
p.start();
@ -297,15 +297,11 @@ Parser.parseUTXOs = function parseUTXOs(p) {
map = p.readVarBytes();
count = p.readVarint();
coins = [];
notfound = [];
hits = [];
for (i = 0; i < map.length; i++) {
ch = map[i];
for (j = 0; j < 8; j++) {
if ((ch & 1) === 0)
notfound.push(i + j);
ch >>>= 1;
}
for (j = 0; j < 8; j++)
hits.push((map[i] >> j) & 1);
}
for (i = 0; i < count; i++) {
@ -326,7 +322,7 @@ Parser.parseUTXOs = function parseUTXOs(p) {
tip: tip,
map: map,
coins: coins,
notfound: notfound,
hits: hits,
_size: p.end()
};
};