From cf3ffa375db19e211e8afc9247b1ec11e5ed99eb Mon Sep 17 00:00:00 2001 From: LucasJones Date: Wed, 16 Apr 2014 03:28:29 +0100 Subject: [PATCH 1/2] Improve stratum message handling Prevent the case where multiple messages are chunked together without being processed (occasionally leading to the buffer growing too large and the client being kicked). Recognize messages consisting only of spaces as malformed. --- lib/stratum.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/stratum.js b/lib/stratum.js index c197436..d5016f1 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -212,10 +212,11 @@ var StratumClient = function(options){ socket.destroy(); return; } - if (dataBuffer.slice(-1) === '\n'){ + if (dataBuffer.indexOf('\n') !== -1){ var messages = dataBuffer.split('\n'); + var incomplete = dataBuffer.slice(-1) === '\n' ? '' : messages.pop(); messages.forEach(function(message){ - if (message.trim() === '') return; + if (message === '') return; var messageJson; try { messageJson = JSON.parse(message); @@ -231,7 +232,7 @@ var StratumClient = function(options){ handleMessage(messageJson); } }); - dataBuffer = ''; + dataBuffer = incomplete; } }); socket.on('close', function() { From 0de1e50f59a012f013a47cef19f634ffce869d70 Mon Sep 17 00:00:00 2001 From: LucasJones Date: Wed, 16 Apr 2014 12:56:20 +0100 Subject: [PATCH 2/2] Some work on the p2p system Fix several bugs in peer.js and enable it to successfully connect to a peer (tested with bitcoind). --- lib/peer.js | 113 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/lib/peer.js b/lib/peer.js index eab8da7..d5fc4d9 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -8,46 +8,55 @@ var util = require('./util.js'); //Example of p2p in node from TheSeven: http://paste.pm/e54.js -var commandStringBuffer = function(s){ - var buff = new Buffer(12); +var fixedLenStringBuffer = function(s, len) { + var buff = new Buffer(len); buff.fill(0); buff.write(s); return buff; }; +var commandStringBuffer = function (s) { + return fixedLenStringBuffer(s, 12); +}; + /* Reads a set amount of bytes from a flowing stream, argument descriptions: - stream to read from, must have data emitter - amount of bytes to read - preRead argument can be used to set start with an existing data buffer - callback returns 1) data buffer and 2) lopped/over-read data */ -var readFlowingBytes = function(stream, amount, preRead, callback){ +var readFlowingBytes = function (stream, amount, preRead, callback) { var buff = preRead ? preRead : new Buffer([]); - var readData = function(data){ - buff = Buffer.concat([buff, data], 2); - if (buff.length >= amount){ + var readData = function (data) { + buff = Buffer.concat([buff, data]); + if (buff.length >= amount) { var returnData = buff.slice(0, amount); - var lopped = buff.length > amount ? buff.slice(amount): null; + var lopped = buff.length > amount ? buff.slice(amount) : null; callback(returnData, lopped); } else stream.once('data', readData); }; - readData(buff); + readData(new Buffer([])); }; -var Peer = module.exports = function(options){ +var Peer = module.exports = function (options) { var _this = this; var client; var magic = new Buffer(options.magic, 'hex'); var magicInt = magic.readUInt32LE(0); + var verack = false; //https://en.bitcoin.it/wiki/Protocol_specification#Inventory_Vectors - var invVectMsgBlock = 2; //Hash from inventory message is related to a data block - + var invCodes = { + error: 0, + tx: 1, + block: 2 + }; + var networkServices = new Buffer('0100000000000000', 'hex'); //NODE_NETWORK services (value 1 packed as uint64) var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex'); var userAgent = util.varStringBuffer('/node-stratum/'); @@ -59,27 +68,34 @@ var Peer = module.exports = function(options){ var commands = { version: commandStringBuffer('version'), - inv: commandStringBuffer('inv') + inv: commandStringBuffer('inv'), + verack: commandStringBuffer('verack'), + addr: commandStringBuffer('addr'), + getblocks: commandStringBuffer('getblocks') }; - (function init(){ + (function init() { Connect(); })(); - function Connect(){ + function Connect() { - client = net.connect(options.host, options.port, function(){ + client = net.connect({ + host: options.host, + port: options.port + }, function () { _this.emit('connected'); SendVersion(); }); - client.on('end', function(){ + client.on('end', function () { _this.emit('disconnected'); + verack = false; Connect(); }); - client.on('error', function(){ - _this.emit('connectionFailed'); + client.on('error', function (e) { + _this.emit('connectionFailed', e); }); @@ -87,22 +103,29 @@ var Peer = module.exports = function(options){ } - function SetupMessageParser(client){ + function SetupMessageParser(client) { - var beginReadingMessage = function(preRead){ + var beginReadingMessage = function (preRead) { - readFlowingBytes(client, 24, preRead, function(header, lopped){ + readFlowingBytes(client, 24, preRead, function (header, lopped) { var msgMagic = header.readUInt32LE(0); - if (msgMagic !== magicInt){ + if (msgMagic !== magicInt) { _this.emit('error', 'bad magic number from peer'); - beginReadingMessage(null); + while (header.readUInt32LE(0) !== magicInt && header.length >= 4) { + header = header.slice(1); + } + if (header.readUInt32LE(0) === magicInt) { + beginReadingMessage(header); + } else { + beginReadingMessage(new Buffer([])); + } return; } var msgCommand = header.slice(4, 16).toString(); var msgLength = header.readUInt32LE(16); var msgChecksum = header.readUInt32LE(20); - readFlowingBytes(client, msgLength, lopped, function(payload, lopped){ - if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum){ + readFlowingBytes(client, msgLength, lopped, function (payload, lopped) { + if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum) { _this.emit('error', 'bad payload - failed checksum'); beginReadingMessage(null); return; @@ -118,7 +141,7 @@ var Peer = module.exports = function(options){ //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv - function HandleInv(payload){ + function HandleInv(payload) { //sloppy varint decoding var count = payload.readUInt8(0); payload = payload.slice(1); @@ -127,29 +150,42 @@ var Peer = module.exports = function(options){ count = payload.readUInt16LE(0); payload = payload.slice(2); } - while (count--) - { - if (payload.readUInt32LE(0) === invVectMsgBlock){ - var block = payload.slice(4, 36).toString('hex'); - console.log('block found ' + block); - _this.emit('blockFound', block); + while (count--) { + switch(payload.readUInt32LE(0)) { + case invCodes.error: + break; + case invCodes.tx: + var tx = payload.slice(4, 36).toString('hex'); + break; + case invCodes.block: + var block = payload.slice(4, 36).toString('hex'); + _this.emit('blockFound', block); + break; } payload = payload.slice(36); } } - function HandleMessage(command, payload){ - - switch(command){ + function HandleMessage(command, payload) { + _this.emit('peerMessage', {command: command, payload: payload}); + switch (command) { case commands.inv.toString(): HandleInv(payload); break; + case commands.verack.toString(): + if(!verack) { + verack = true; + _this.emit('verack'); + } + break; + default: + break; } } //Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure - function SendMessage(command, payload){ + function SendMessage(command, payload) { var message = Buffer.concat([ magic, command, @@ -158,13 +194,14 @@ var Peer = module.exports = function(options){ payload ]); client.write(message); + _this.emit('sentMessage', message); } - function SendVersion(){ + function SendVersion() { var payload = Buffer.concat([ util.packUInt32LE(options.protocolVersion), networkServices, - util.packUInt32LE(Date.now() / 1000 | 0), + util.packInt64LE(Date.now() / 1000 | 0), emptyNetAddress, //addr_recv, can be empty emptyNetAddress, //addr_from, can be empty crypto.pseudoRandomBytes(8), //nonce, random unique ID