Merge pull request #59 from LucasJones/master

Fixed p2p system
This commit is contained in:
Matthew Little 2014-04-16 08:06:31 -06:00
commit c72d5e01ae
2 changed files with 79 additions and 41 deletions

View File

@ -8,46 +8,55 @@ var util = require('./util.js');
//Example of p2p in node from TheSeven: http://paste.pm/e54.js //Example of p2p in node from TheSeven: http://paste.pm/e54.js
var commandStringBuffer = function(s){ var fixedLenStringBuffer = function(s, len) {
var buff = new Buffer(12); var buff = new Buffer(len);
buff.fill(0); buff.fill(0);
buff.write(s); buff.write(s);
return buff; return buff;
}; };
var commandStringBuffer = function (s) {
return fixedLenStringBuffer(s, 12);
};
/* Reads a set amount of bytes from a flowing stream, argument descriptions: /* Reads a set amount of bytes from a flowing stream, argument descriptions:
- stream to read from, must have data emitter - stream to read from, must have data emitter
- amount of bytes to read - amount of bytes to read
- preRead argument can be used to set start with an existing data buffer - preRead argument can be used to set start with an existing data buffer
- callback returns 1) data buffer and 2) lopped/over-read data */ - callback returns 1) data buffer and 2) lopped/over-read data */
var readFlowingBytes = function(stream, amount, preRead, callback){ var readFlowingBytes = function (stream, amount, preRead, callback) {
var buff = preRead ? preRead : new Buffer([]); var buff = preRead ? preRead : new Buffer([]);
var readData = function(data){ var readData = function (data) {
buff = Buffer.concat([buff, data], 2); buff = Buffer.concat([buff, data]);
if (buff.length >= amount){ if (buff.length >= amount) {
var returnData = buff.slice(0, amount); var returnData = buff.slice(0, amount);
var lopped = buff.length > amount ? buff.slice(amount): null; var lopped = buff.length > amount ? buff.slice(amount) : null;
callback(returnData, lopped); callback(returnData, lopped);
} }
else else
stream.once('data', readData); stream.once('data', readData);
}; };
readData(buff); readData(new Buffer([]));
}; };
var Peer = module.exports = function(options){ var Peer = module.exports = function (options) {
var _this = this; var _this = this;
var client; var client;
var magic = new Buffer(options.magic, 'hex'); var magic = new Buffer(options.magic, 'hex');
var magicInt = magic.readUInt32LE(0); var magicInt = magic.readUInt32LE(0);
var verack = false;
//https://en.bitcoin.it/wiki/Protocol_specification#Inventory_Vectors //https://en.bitcoin.it/wiki/Protocol_specification#Inventory_Vectors
var invVectMsgBlock = 2; //Hash from inventory message is related to a data block var invCodes = {
error: 0,
tx: 1,
block: 2
};
var networkServices = new Buffer('0100000000000000', 'hex'); //NODE_NETWORK services (value 1 packed as uint64) var networkServices = new Buffer('0100000000000000', 'hex'); //NODE_NETWORK services (value 1 packed as uint64)
var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex'); var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex');
var userAgent = util.varStringBuffer('/node-stratum/'); var userAgent = util.varStringBuffer('/node-stratum/');
@ -59,27 +68,34 @@ var Peer = module.exports = function(options){
var commands = { var commands = {
version: commandStringBuffer('version'), version: commandStringBuffer('version'),
inv: commandStringBuffer('inv') inv: commandStringBuffer('inv'),
verack: commandStringBuffer('verack'),
addr: commandStringBuffer('addr'),
getblocks: commandStringBuffer('getblocks')
}; };
(function init(){ (function init() {
Connect(); Connect();
})(); })();
function Connect(){ function Connect() {
client = net.connect(options.host, options.port, function(){ client = net.connect({
host: options.host,
port: options.port
}, function () {
_this.emit('connected'); _this.emit('connected');
SendVersion(); SendVersion();
}); });
client.on('end', function(){ client.on('end', function () {
_this.emit('disconnected'); _this.emit('disconnected');
verack = false;
Connect(); Connect();
}); });
client.on('error', function(){ client.on('error', function (e) {
_this.emit('connectionFailed'); _this.emit('connectionFailed', e);
}); });
@ -87,22 +103,29 @@ var Peer = module.exports = function(options){
} }
function SetupMessageParser(client){ function SetupMessageParser(client) {
var beginReadingMessage = function(preRead){ var beginReadingMessage = function (preRead) {
readFlowingBytes(client, 24, preRead, function(header, lopped){ readFlowingBytes(client, 24, preRead, function (header, lopped) {
var msgMagic = header.readUInt32LE(0); var msgMagic = header.readUInt32LE(0);
if (msgMagic !== magicInt){ if (msgMagic !== magicInt) {
_this.emit('error', 'bad magic number from peer'); _this.emit('error', 'bad magic number from peer');
beginReadingMessage(null); while (header.readUInt32LE(0) !== magicInt && header.length >= 4) {
header = header.slice(1);
}
if (header.readUInt32LE(0) === magicInt) {
beginReadingMessage(header);
} else {
beginReadingMessage(new Buffer([]));
}
return; return;
} }
var msgCommand = header.slice(4, 16).toString(); var msgCommand = header.slice(4, 16).toString();
var msgLength = header.readUInt32LE(16); var msgLength = header.readUInt32LE(16);
var msgChecksum = header.readUInt32LE(20); var msgChecksum = header.readUInt32LE(20);
readFlowingBytes(client, msgLength, lopped, function(payload, lopped){ readFlowingBytes(client, msgLength, lopped, function (payload, lopped) {
if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum){ if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum) {
_this.emit('error', 'bad payload - failed checksum'); _this.emit('error', 'bad payload - failed checksum');
beginReadingMessage(null); beginReadingMessage(null);
return; return;
@ -118,7 +141,7 @@ var Peer = module.exports = function(options){
//Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv
function HandleInv(payload){ function HandleInv(payload) {
//sloppy varint decoding //sloppy varint decoding
var count = payload.readUInt8(0); var count = payload.readUInt8(0);
payload = payload.slice(1); payload = payload.slice(1);
@ -127,29 +150,42 @@ var Peer = module.exports = function(options){
count = payload.readUInt16LE(0); count = payload.readUInt16LE(0);
payload = payload.slice(2); payload = payload.slice(2);
} }
while (count--) while (count--) {
{ switch(payload.readUInt32LE(0)) {
if (payload.readUInt32LE(0) === invVectMsgBlock){ case invCodes.error:
var block = payload.slice(4, 36).toString('hex'); break;
console.log('block found ' + block); case invCodes.tx:
_this.emit('blockFound', block); var tx = payload.slice(4, 36).toString('hex');
break;
case invCodes.block:
var block = payload.slice(4, 36).toString('hex');
_this.emit('blockFound', block);
break;
} }
payload = payload.slice(36); payload = payload.slice(36);
} }
} }
function HandleMessage(command, payload){ function HandleMessage(command, payload) {
_this.emit('peerMessage', {command: command, payload: payload});
switch(command){ switch (command) {
case commands.inv.toString(): case commands.inv.toString():
HandleInv(payload); HandleInv(payload);
break; break;
case commands.verack.toString():
if(!verack) {
verack = true;
_this.emit('verack');
}
break;
default:
break;
} }
} }
//Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure //Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure
function SendMessage(command, payload){ function SendMessage(command, payload) {
var message = Buffer.concat([ var message = Buffer.concat([
magic, magic,
command, command,
@ -158,13 +194,14 @@ var Peer = module.exports = function(options){
payload payload
]); ]);
client.write(message); client.write(message);
_this.emit('sentMessage', message);
} }
function SendVersion(){ function SendVersion() {
var payload = Buffer.concat([ var payload = Buffer.concat([
util.packUInt32LE(options.protocolVersion), util.packUInt32LE(options.protocolVersion),
networkServices, networkServices,
util.packUInt32LE(Date.now() / 1000 | 0), util.packInt64LE(Date.now() / 1000 | 0),
emptyNetAddress, //addr_recv, can be empty emptyNetAddress, //addr_recv, can be empty
emptyNetAddress, //addr_from, can be empty emptyNetAddress, //addr_from, can be empty
crypto.pseudoRandomBytes(8), //nonce, random unique ID crypto.pseudoRandomBytes(8), //nonce, random unique ID

View File

@ -212,10 +212,11 @@ var StratumClient = function(options){
socket.destroy(); socket.destroy();
return; return;
} }
if (dataBuffer.slice(-1) === '\n'){ if (dataBuffer.indexOf('\n') !== -1){
var messages = dataBuffer.split('\n'); var messages = dataBuffer.split('\n');
var incomplete = dataBuffer.slice(-1) === '\n' ? '' : messages.pop();
messages.forEach(function(message){ messages.forEach(function(message){
if (message.trim() === '') return; if (message === '') return;
var messageJson; var messageJson;
try { try {
messageJson = JSON.parse(message); messageJson = JSON.parse(message);
@ -231,7 +232,7 @@ var StratumClient = function(options){
handleMessage(messageJson); handleMessage(messageJson);
} }
}); });
dataBuffer = ''; dataBuffer = incomplete;
} }
}); });
socket.on('close', function() { socket.on('close', function() {