diff --git a/lib/peer.js b/lib/peer.js index eab8da7..d5fc4d9 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -8,46 +8,55 @@ var util = require('./util.js'); //Example of p2p in node from TheSeven: http://paste.pm/e54.js -var commandStringBuffer = function(s){ - var buff = new Buffer(12); +var fixedLenStringBuffer = function(s, len) { + var buff = new Buffer(len); buff.fill(0); buff.write(s); return buff; }; +var commandStringBuffer = function (s) { + return fixedLenStringBuffer(s, 12); +}; + /* Reads a set amount of bytes from a flowing stream, argument descriptions: - stream to read from, must have data emitter - amount of bytes to read - preRead argument can be used to set start with an existing data buffer - callback returns 1) data buffer and 2) lopped/over-read data */ -var readFlowingBytes = function(stream, amount, preRead, callback){ +var readFlowingBytes = function (stream, amount, preRead, callback) { var buff = preRead ? preRead : new Buffer([]); - var readData = function(data){ - buff = Buffer.concat([buff, data], 2); - if (buff.length >= amount){ + var readData = function (data) { + buff = Buffer.concat([buff, data]); + if (buff.length >= amount) { var returnData = buff.slice(0, amount); - var lopped = buff.length > amount ? buff.slice(amount): null; + var lopped = buff.length > amount ? buff.slice(amount) : null; callback(returnData, lopped); } else stream.once('data', readData); }; - readData(buff); + readData(new Buffer([])); }; -var Peer = module.exports = function(options){ +var Peer = module.exports = function (options) { var _this = this; var client; var magic = new Buffer(options.magic, 'hex'); var magicInt = magic.readUInt32LE(0); + var verack = false; //https://en.bitcoin.it/wiki/Protocol_specification#Inventory_Vectors - var invVectMsgBlock = 2; //Hash from inventory message is related to a data block - + var invCodes = { + error: 0, + tx: 1, + block: 2 + }; + var networkServices = new Buffer('0100000000000000', 'hex'); //NODE_NETWORK services (value 1 packed as uint64) var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex'); var userAgent = util.varStringBuffer('/node-stratum/'); @@ -59,27 +68,34 @@ var Peer = module.exports = function(options){ var commands = { version: commandStringBuffer('version'), - inv: commandStringBuffer('inv') + inv: commandStringBuffer('inv'), + verack: commandStringBuffer('verack'), + addr: commandStringBuffer('addr'), + getblocks: commandStringBuffer('getblocks') }; - (function init(){ + (function init() { Connect(); })(); - function Connect(){ + function Connect() { - client = net.connect(options.host, options.port, function(){ + client = net.connect({ + host: options.host, + port: options.port + }, function () { _this.emit('connected'); SendVersion(); }); - client.on('end', function(){ + client.on('end', function () { _this.emit('disconnected'); + verack = false; Connect(); }); - client.on('error', function(){ - _this.emit('connectionFailed'); + client.on('error', function (e) { + _this.emit('connectionFailed', e); }); @@ -87,22 +103,29 @@ var Peer = module.exports = function(options){ } - function SetupMessageParser(client){ + function SetupMessageParser(client) { - var beginReadingMessage = function(preRead){ + var beginReadingMessage = function (preRead) { - readFlowingBytes(client, 24, preRead, function(header, lopped){ + readFlowingBytes(client, 24, preRead, function (header, lopped) { var msgMagic = header.readUInt32LE(0); - if (msgMagic !== magicInt){ + if (msgMagic !== magicInt) { _this.emit('error', 'bad magic number from peer'); - beginReadingMessage(null); + while (header.readUInt32LE(0) !== magicInt && header.length >= 4) { + header = header.slice(1); + } + if (header.readUInt32LE(0) === magicInt) { + beginReadingMessage(header); + } else { + beginReadingMessage(new Buffer([])); + } return; } var msgCommand = header.slice(4, 16).toString(); var msgLength = header.readUInt32LE(16); var msgChecksum = header.readUInt32LE(20); - readFlowingBytes(client, msgLength, lopped, function(payload, lopped){ - if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum){ + readFlowingBytes(client, msgLength, lopped, function (payload, lopped) { + if (util.sha256d(payload).readUInt32LE(0) !== msgChecksum) { _this.emit('error', 'bad payload - failed checksum'); beginReadingMessage(null); return; @@ -118,7 +141,7 @@ var Peer = module.exports = function(options){ //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv - function HandleInv(payload){ + function HandleInv(payload) { //sloppy varint decoding var count = payload.readUInt8(0); payload = payload.slice(1); @@ -127,29 +150,42 @@ var Peer = module.exports = function(options){ count = payload.readUInt16LE(0); payload = payload.slice(2); } - while (count--) - { - if (payload.readUInt32LE(0) === invVectMsgBlock){ - var block = payload.slice(4, 36).toString('hex'); - console.log('block found ' + block); - _this.emit('blockFound', block); + while (count--) { + switch(payload.readUInt32LE(0)) { + case invCodes.error: + break; + case invCodes.tx: + var tx = payload.slice(4, 36).toString('hex'); + break; + case invCodes.block: + var block = payload.slice(4, 36).toString('hex'); + _this.emit('blockFound', block); + break; } payload = payload.slice(36); } } - function HandleMessage(command, payload){ - - switch(command){ + function HandleMessage(command, payload) { + _this.emit('peerMessage', {command: command, payload: payload}); + switch (command) { case commands.inv.toString(): HandleInv(payload); break; + case commands.verack.toString(): + if(!verack) { + verack = true; + _this.emit('verack'); + } + break; + default: + break; } } //Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure - function SendMessage(command, payload){ + function SendMessage(command, payload) { var message = Buffer.concat([ magic, command, @@ -158,13 +194,14 @@ var Peer = module.exports = function(options){ payload ]); client.write(message); + _this.emit('sentMessage', message); } - function SendVersion(){ + function SendVersion() { var payload = Buffer.concat([ util.packUInt32LE(options.protocolVersion), networkServices, - util.packUInt32LE(Date.now() / 1000 | 0), + util.packInt64LE(Date.now() / 1000 | 0), emptyNetAddress, //addr_recv, can be empty emptyNetAddress, //addr_from, can be empty crypto.pseudoRandomBytes(8), //nonce, random unique ID diff --git a/lib/stratum.js b/lib/stratum.js index c197436..d5016f1 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -212,10 +212,11 @@ var StratumClient = function(options){ socket.destroy(); return; } - if (dataBuffer.slice(-1) === '\n'){ + if (dataBuffer.indexOf('\n') !== -1){ var messages = dataBuffer.split('\n'); + var incomplete = dataBuffer.slice(-1) === '\n' ? '' : messages.pop(); messages.forEach(function(message){ - if (message.trim() === '') return; + if (message === '') return; var messageJson; try { messageJson = JSON.parse(message); @@ -231,7 +232,7 @@ var StratumClient = function(options){ handleMessage(messageJson); } }); - dataBuffer = ''; + dataBuffer = incomplete; } }); socket.on('close', function() {