From 47892ab6d82ef5cb9abd9a3e52e64e4764d15366 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Sat, 22 Feb 2014 20:52:11 -0700 Subject: [PATCH] Cleaner message stream reading --- lib/peer.js | 144 ++++++++++++++++++++++++++++------------------------ 1 file changed, 79 insertions(+), 65 deletions(-) diff --git a/lib/peer.js b/lib/peer.js index 4e20b44..45ab6ff 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -4,6 +4,38 @@ var events = require('events'); var util = require('./util.js'); +var commandStringBuffer = function(s){ + var buff = new Buffer(12); + buff.fill(0); + buff.write(s); + return buff; +}; + +/* Reads a set amount of bytes from a flowing stream, argument descriptions: + - stream to read from, must have data emitter + - amount of bytes to read + - preRead argument can be used to set start with an existing data buffer + - callback returns 1) data buffer and 2) lopped/over-read data */ +var readFlowingBytes = function(stream, amount, preRead, callback){ + + var buff = preRead ? preRead : new Buffer([]); + + var readData = function(data){ + buff = Buffer.concat([buff, data], 2); + if (buff.length >= amount){ + var returnData = buff.slice(0, amount); + var lopped; + + if (buff.length > amount) + lopped = buff.slice(amount); + callback(returnData, lopped); + } + else + stream.once('data', readData); + }; + + readData(buff); +}; var Peer = module.exports = function(options){ @@ -19,13 +51,14 @@ var Peer = module.exports = function(options){ var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex'); var userAgent = util.varStringBuffer('/node-stratum/'); var blockStartHeight = new Buffer('00000000', 'hex'); //block start_height, can be empty + //If protocol version is new enough, add do not relay transactions flag byte, outlined in BIP37 //https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#extensions-to-existing-messages var relayTransactions = options.protocolVersion >= 70001 ? new Buffer([false]) : new Buffer([]); - //Command strings found at: https://en.bitcoin.it/wiki/Protocol_specification var commands = { - version: new Buffer('76657273696F6E0000000000', 'hex') + version: commandStringBuffer('version'), + inv: commandStringBuffer('inv') }; @@ -51,86 +84,67 @@ var Peer = module.exports = function(options){ SetupMessageParser(client); - } function SetupMessageParser(client){ - var buff = new Buffer([]); - var msgCommand, msgLength, msgChecksum, readingPayload = false; - var readData = function(data){ - buff = Buffer.concat([buff, data], 2); - if (!readingPayload){ - if (buff.length < 24) return; - var msgMagic = buff.readUInt32LE(0); - msgCommand = buff.slice(4, 16).toString().split("\0", 1)[0]; - msgLength = buff.readUInt32LE(16); - msgChecksum = buff.readUInt32LE(20); + var beginReadingMessage = function(preRead){ + + readFlowingBytes(client, 24, preRead, function(header, lopped){ + var msgMagic = header.readUInt32LE(0); if (msgMagic !== magicInt){ - buff = new Buffer([]); _this.emit('error', 'bad magic number from peer'); + beginReadingMessage(null); return; } - readingPayload = true; - if (buff.length > 24){ - var d = buff.slice(24); - buff = new Buffer([]); - readData(d); - } - else - buff = new Buffer([]); - } - else{ - if (buff.length >= msgLength){ - - var msgPayload = buff.slice(0, msgLength); - if (util.doublesha(msgPayload).readUInt32LE(0) !== msgChecksum){ - _this.emit('error', 'bad payload checksum from peer'); + var msgCommand = header.slice(4, 16).toString(); + var msgLength = header.readUInt32LE(16); + var msgChecksum = header.readUInt32LE(20); + readFlowingBytes(client, msgLength, lopped, function(payload, lopped){ + if (util.doublesha(payload).readUInt32LE(0) !== msgChecksum){ + _this.emit('error', 'bad payload - failed checksum'); + beginReadingMessage(null); + return; } - else{ - HandleMessage(msgCommand, msgPayload); - } - - if (buff.length > msgLength){ - var d = buff.slice(msgLength); - buff = new Buffer([]); - readingPayload = false; - readData(d); - } - else{ - buff = new Buffer([]); - readingPayload = false; - } - } - } + HandleMessage(msgCommand, payload); + beginReadingMessage(lopped); + }); + }); }; - client.on('data', readData); + beginReadingMessage(null); + } + + //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv + function HandleInv(payload){ + //sloppy varint decoding + var count = payload.readUInt8(0); + payload = payload.slice(1); + if (count >= 0xfd) + { + count = payload.readUInt16LE(0); + payload = payload.slice(2); + } + while (count--) + { + if (payload.readUInt32LE(0) === invVectMsgBlock){ + var block = payload.slice(4, 36).toString('hex'); + console.log('block found ' + block); + _this.emit('blockFound', block); + } + payload = payload.slice(36); + } } function HandleMessage(command, payload){ - //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv - if (command === 'inv'){ - //sloppy varint decoding - var count = payload.readUInt8(0); - payload = payload.slice(1); - if (count >= 0xfd) - { - count = payload.readUInt16LE(0); - payload = payload.slice(2); - } - while (count--) - { - if (payload.readUInt32LE(0) === invVectMsgBlock){ - var block = payload.slice(4, 36).toString('hex'); - console.log('block found ' + block); - _this.emit('blockFound', block); - } - payload = payload.slice(36); - } + switch(command){ + case commands.inv.toString(): + HandleInv(payload); + break; } + } //Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure