flocore/lib/Connection.js
Ryan X. Charles ba692aaa20 add new SecureRandom class that does the right thing
Generating random numbers properly depends on the platform. The new
getRandomBuffer method does the right thing on the right platform. It will
sometimes fail due to insufficient entropy. The getPseudoRandomBuffer class is
also provided that will never fail, but it is not cryptographically secure and
should not be used for keys.
2014-04-22 22:18:59 -03:00

566 lines
15 KiB
JavaScript

var imports = require('soop').imports();
var log = imports.log || require('../util/log');
var MAX_RECEIVE_BUFFER = 10000000;
var PROTOCOL_VERSION = 70000;
var Put = imports.Put || require('bufferput');
var Buffers = imports.Buffers || require('buffers');
require('../patches/Buffers.monkey').patch(Buffers);
var bitcoreDefaults = imports.config || require('../config');
var networks = imports.networks || require('../networks');
var Block = imports.Block || require('./Block');
var Transaction = imports.Transaction || require('./Transaction');
var util = imports.util || require('../util');
var Parser = imports.Parser || require('../util/BinaryParser');
var buffertools = imports.buffertools || require('buffertools');
var doubleSha256 = imports.doubleSha256 || util.twoSha256;
var SecureRandom = imports.SecureRandom || require('./SecureRandom');
var nonce = function() {return SecureRandom.getPseudoRandomBuffer(8)};
var BIP0031_VERSION = 60000;
function Connection(socket, peer, opts) {
Connection.super(this, arguments);
this.config = opts || bitcoreDefaults;
this.network = networks[this.config.network] || networks.livenet;
this.socket = socket;
this.peer = peer;
// check for socks5 proxy options and construct a proxied socket
if (this.config.proxy) {
var Socks5Client = imports.Socks5Client || require('socks5-client');
this.socket = new Socks5Client(this.config.proxy.host, this.config.proxy.port);
}
// A connection is considered "active" once we have received verack
this.active = false;
// The version incoming packages are interpreted as
this.recvVer = 0;
// The version outgoing packages are sent as
this.sendVer = 0;
// The (claimed) height of the remote peer's block chain
this.bestHeight = 0;
// Is this an inbound connection?
this.inbound = !!this.socket.server;
// Have we sent a getaddr on this connection?
this.getaddr = false;
// Receive buffer
this.buffers = new Buffers();
// Starting 20 Feb 2012, Version 0.2 is obsolete
// This is the same behavior as the official client
if (new Date().getTime() > 1329696000000) {
this.recvVer = 209;
this.sendVer = 209;
}
this.setupHandlers();
}
Connection.parent = imports.parent || require('events').EventEmitter;
Connection.prototype.open = function(callback) {
if (typeof callback === 'function') this.once('connect', callback);
this.socket.connect(this.peer.port, this.peer.host);
return this;
};
Connection.prototype.setupHandlers = function () {
this.socket.addListener('connect', this.handleConnect.bind(this));
this.socket.addListener('error', this.handleError.bind(this));
this.socket.addListener('end', this.handleDisconnect.bind(this));
this.socket.addListener('data', (function (data) {
var dumpLen = 35;
log.debug('['+this.peer+'] '+
'Recieved '+data.length+' bytes of data:');
log.debug('... '+ buffertools.toHex(data.slice(0, dumpLen > data.length ?
data.length : dumpLen)) +
(data.length > dumpLen ? '...' : ''));
}).bind(this));
this.socket.addListener('data', this.handleData.bind(this));
};
Connection.prototype.handleConnect = function () {
if (!this.inbound) {
this.sendVersion();
}
this.emit('connect', {
conn: this,
socket: this.socket,
peer: this.peer
});
};
Connection.prototype.handleError = function(err) {
if (err.errno == 110 || err.errno == 'ETIMEDOUT') {
log.info('connection timed out for '+this.peer);
} else if (err.errno == 111 || err.errno == 'ECONNREFUSED') {
log.info('connection refused for '+this.peer);
} else {
log.warn('connection with '+this.peer+' '+err.toString());
}
this.emit('error', {
conn: this,
socket: this.socket,
peer: this.peer,
err: err
});
};
Connection.prototype.handleDisconnect = function () {
this.emit('disconnect', {
conn: this,
socket: this.socket,
peer: this.peer
});
};
Connection.prototype.handleMessage = function(message) {
if (!message) {
// Parser was unable to make sense of the message, drop it
return;
}
try {
switch (message.command) {
case 'version':
// Did we connect to ourself?
if (buffertools.compare(nonce, message.nonce) === 0) {
this.socket.end();
return;
}
if (this.inbound) {
this.sendVersion();
}
if (message.version >= 209) {
this.sendMessage('verack', new Buffer([]));
}
this.sendVer = Math.min(message.version, PROTOCOL_VERSION);
if (message.version < 209) {
this.recvVer = Math.min(message.version, PROTOCOL_VERSION);
} else {
// We won't start expecting a checksum until after we've received
// the 'verack' message.
this.once('verack', (function () {
this.recvVer = message.version;
}).bind(this));
}
this.bestHeight = message.start_height;
break;
case 'verack':
this.recvVer = Math.min(message.version, PROTOCOL_VERSION);
this.active = true;
break;
case 'ping':
if ('object' === typeof message.nonce) {
this.sendPong(message.nonce);
}
break;
}
} catch (e) {
log.err('Error while handling "'+message.command+'" message from ' +
this.peer + ':\n' +
(e.stack ? e.stack : e.toString()));
return;
}
this.emit(message.command, {
conn: this,
socket: this.socket,
peer: this.peer,
message: message
});
};
Connection.prototype.sendPong = function (nonce) {
this.sendMessage('pong', nonce);
};
Connection.prototype.sendVersion = function () {
var subversion = '/BitcoinX:0.1/';
var put = new Put();
put.word32le(PROTOCOL_VERSION); // version
put.word64le(1); // services
put.word64le(Math.round(new Date().getTime()/1000)); // timestamp
put.pad(26); // addr_me
put.pad(26); // addr_you
put.put(nonce);
put.varint(subversion.length);
put.put(new Buffer(subversion, 'ascii'));
put.word32le(0);
this.sendMessage('version', put.buffer());
};
Connection.prototype.sendGetBlocks = function (starts, stop, wantHeaders) {
// Default value for stop is 0 to get as many blocks as possible (500)
stop = stop || util.NULL_HASH;
var put = new Put();
// https://en.bitcoin.it/wiki/Protocol_specification#getblocks
put.word32le(this.sendVer);
put.varint(starts.length);
for (var i = 0; i < starts.length; i++) {
if (starts[i].length != 32) {
throw new Error('Invalid hash length');
}
put.put(starts[i]);
}
var stopBuffer = new Buffer(stop, 'binary');
if (stopBuffer.length != 32) {
throw new Error('Invalid hash length');
}
put.put(stopBuffer);
var command = 'getblocks';
if (wantHeaders)
command = 'getheaders';
this.sendMessage(command, put.buffer());
};
Connection.prototype.sendGetHeaders = function(starts, stop) {
this.sendGetBlocks(starts, stop, true);
};
Connection.prototype.sendGetData = function (invs) {
var put = new Put();
put.varint(invs.length);
for (var i = 0; i < invs.length; i++) {
put.word32le(invs[i].type);
put.put(invs[i].hash);
}
this.sendMessage('getdata', put.buffer());
};
Connection.prototype.sendGetAddr = function (invs) {
var put = new Put();
this.sendMessage('getaddr', put.buffer());
};
Connection.prototype.sendInv = function(data) {
if(!Array.isArray(data)) data = [data];
var put = new Put();
put.varint(data.length);
data.forEach(function (value) {
if (value instanceof Block) {
// Block
put.word32le(2); // MSG_BLOCK
} else {
// Transaction
put.word32le(1); // MSG_TX
}
put.put(value.getHash());
});
this.sendMessage('inv', put.buffer());
};
Connection.prototype.sendHeaders = function (headers) {
var put = new Put();
put.varint(headers.length);
headers.forEach(function (header) {
put.put(header);
// Indicate 0 transactions
put.word8(0);
});
this.sendMessage('headers', put.buffer());
};
Connection.prototype.sendTx = function (tx) {
this.sendMessage('tx', tx.serialize());
};
Connection.prototype.sendBlock = function (block, txs) {
var put = new Put();
// Block header
put.put(block.getHeader());
// List of transactions
put.varint(txs.length);
txs.forEach(function (tx) {
put.put(tx.serialize());
});
this.sendMessage('block', put.buffer());
};
Connection.prototype.sendMessage = function (command, payload) {
try {
var magic = this.network.magic;
var commandBuf = new Buffer(command, 'ascii');
if (commandBuf.length > 12) throw 'Command name too long';
var checksum;
if (this.sendVer >= 209) {
checksum = doubleSha256(payload).slice(0, 4);
} else {
checksum = new Buffer([]);
}
var message = new Put(); // -- HEADER --
message.put(magic); // magic bytes
message.put(commandBuf); // command name
message.pad(12 - commandBuf.length); // zero-padded
message.word32le(payload.length); // payload length
message.put(checksum); // checksum
// -- BODY --
message.put(payload); // payload data
var buffer = message.buffer();
log.debug('['+this.peer+'] '+
'Sending message '+command+' ('+payload.length+' bytes)');
this.socket.write(buffer);
} catch (err) {
// TODO: We should catch this error one level higher in order to better
// determine how to react to it. For now though, ignoring it will do.
log.err('Error while sending message to peer '+this.peer+': '+
(err.stack ? err.stack : err.toString()));
}
};
Connection.prototype.handleData = function (data) {
this.buffers.push(data);
if (this.buffers.length > MAX_RECEIVE_BUFFER) {
log.err('Peer '+this.peer+' exceeded maxreceivebuffer, disconnecting.'+
(err.stack ? err.stack : err.toString()));
this.socket.destroy();
return;
}
this.processData();
};
Connection.prototype.processData = function () {
// If there are less than 20 bytes there can't be a message yet.
if (this.buffers.length < 20) return;
var magic = this.network.magic;
var i = 0;
for (;;) {
if (this.buffers.get(i ) === magic[0] &&
this.buffers.get(i+1) === magic[1] &&
this.buffers.get(i+2) === magic[2] &&
this.buffers.get(i+3) === magic[3]) {
if (i !== 0) {
log.debug('['+this.peer+'] '+
'Received '+i+
' bytes of inter-message garbage: ');
log.debug('... '+this.buffers.slice(0,i));
this.buffers.skip(i);
}
break;
}
if (i > (this.buffers.length - 4)) {
this.buffers.skip(i);
return;
}
i++;
}
var payloadLen = (this.buffers.get(16) ) +
(this.buffers.get(17) << 8) +
(this.buffers.get(18) << 16) +
(this.buffers.get(19) << 24);
var startPos = (this.recvVer >= 209) ? 24 : 20;
var endPos = startPos + payloadLen;
if (this.buffers.length < endPos) return;
var command = this.buffers.slice(4, 16).toString('ascii').replace(/\0+$/,'');
var payload = this.buffers.slice(startPos, endPos);
var checksum = (this.recvVer >= 209) ? this.buffers.slice(20, 24) : null;
log.debug('['+this.peer+'] ' +
'Received message ' + command +
' (' + payloadLen + ' bytes)');
if (checksum !== null) {
var checksumConfirm = doubleSha256(payload).slice(0, 4);
if (buffertools.compare(checksumConfirm, checksum) !== 0) {
log.err('['+this.peer+'] '+
'Checksum failed',
{ cmd: command,
expected: checksumConfirm.toString('hex'),
actual: checksum.toString('hex') });
return;
}
}
var message;
try {
message = this.parseMessage(command, payload);
} catch (e) {
log.err('Error while parsing message '+command+' from ' +
this.peer + ':\n' +
(e.stack ? e.stack : e.toString()));
}
if (message) {
this.handleMessage(message);
}
this.buffers.skip(endPos);
this.processData();
};
Connection.prototype.parseMessage = function (command, payload) {
var parser = new Parser(payload);
var data = {
command: command
};
var i;
switch (command) {
case 'version': // https://en.bitcoin.it/wiki/Protocol_specification#version
data.version = parser.word32le();
data.services = parser.word64le();
data.timestamp = parser.word64le();
data.addr_me = parser.buffer(26);
data.addr_you = parser.buffer(26);
data.nonce = parser.buffer(8);
data.subversion = parser.varStr();
data.start_height = parser.word32le();
break;
case 'inv':
case 'getdata':
data.count = parser.varInt();
data.invs = [];
for (i = 0; i < data.count; i++) {
data.invs.push({
type: parser.word32le(),
hash: parser.buffer(32)
});
}
break;
case 'headers':
data.count = parser.varInt();
data.headers = [];
for (i = 0; i < data.count; i++) {
var header = new Block();
header.parse(parser);
data.headers.push(header);
}
break;
case 'block':
var block = new Block();
block.parse(parser);
data.block = block;
data.version = block.version;
data.prev_hash = block.prev_hash;
data.merkle_root = block.merkle_root;
data.timestamp = block.timestamp;
data.bits = block.bits;
data.nonce = block.nonce;
data.txs = block.txs;
data.size = payload.length;
break;
case 'tx':
var tx = new Transaction();
tx.parse(parser);
return {
command: command,
version: tx.version,
lock_time: tx.lock_time,
ins: tx.ins,
outs: tx.outs,
tx: tx,
};
case 'getblocks':
case 'getheaders':
// parse out the version
data.version = parser.word32le();
// TODO: Limit block locator size?
// reference implementation limits to 500 results
var startCount = parser.varInt();
data.starts = [];
for (i = 0; i < startCount; i++) {
data.starts.push(parser.buffer(32));
}
data.stop = parser.buffer(32);
break;
case 'addr':
var addrCount = parser.varInt();
// Enforce a maximum number of addresses per message
if (addrCount > 1000) {
addrCount = 1000;
}
data.addrs = [];
for (i = 0; i < addrCount; i++) {
// TODO: Time actually depends on the version of the other peer (>=31402)
data.addrs.push({
time: parser.word32le(),
services: parser.word64le(),
ip: parser.buffer(16),
port: parser.word16be()
});
}
break;
case 'alert':
data.payload = parser.varStr();
data.signature = parser.varStr();
break;
case 'ping':
if (this.recvVer > BIP0031_VERSION) {
data.nonce = parser.buffer(8);
}
break;
case 'getaddr':
case 'verack':
case 'reject':
// Empty message, nothing to parse
break;
default:
log.err('Connection.parseMessage(): Command not implemented',
{cmd: command});
// This tells the calling function not to issue an event
return null;
}
return data;
};
module.exports = require('soop')(Connection);