Cleaner message stream reading
This commit is contained in:
parent
0a6f05a067
commit
47892ab6d8
144
lib/peer.js
144
lib/peer.js
@ -4,6 +4,38 @@ var events = require('events');
|
||||
|
||||
var util = require('./util.js');
|
||||
|
||||
var commandStringBuffer = function(s){
|
||||
var buff = new Buffer(12);
|
||||
buff.fill(0);
|
||||
buff.write(s);
|
||||
return buff;
|
||||
};
|
||||
|
||||
/* Reads a set amount of bytes from a flowing stream, argument descriptions:
|
||||
- stream to read from, must have data emitter
|
||||
- amount of bytes to read
|
||||
- preRead argument can be used to set start with an existing data buffer
|
||||
- callback returns 1) data buffer and 2) lopped/over-read data */
|
||||
var readFlowingBytes = function(stream, amount, preRead, callback){
|
||||
|
||||
var buff = preRead ? preRead : new Buffer([]);
|
||||
|
||||
var readData = function(data){
|
||||
buff = Buffer.concat([buff, data], 2);
|
||||
if (buff.length >= amount){
|
||||
var returnData = buff.slice(0, amount);
|
||||
var lopped;
|
||||
|
||||
if (buff.length > amount)
|
||||
lopped = buff.slice(amount);
|
||||
callback(returnData, lopped);
|
||||
}
|
||||
else
|
||||
stream.once('data', readData);
|
||||
};
|
||||
|
||||
readData(buff);
|
||||
};
|
||||
|
||||
var Peer = module.exports = function(options){
|
||||
|
||||
@ -19,13 +51,14 @@ var Peer = module.exports = function(options){
|
||||
var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex');
|
||||
var userAgent = util.varStringBuffer('/node-stratum/');
|
||||
var blockStartHeight = new Buffer('00000000', 'hex'); //block start_height, can be empty
|
||||
|
||||
//If protocol version is new enough, add do not relay transactions flag byte, outlined in BIP37
|
||||
//https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#extensions-to-existing-messages
|
||||
var relayTransactions = options.protocolVersion >= 70001 ? new Buffer([false]) : new Buffer([]);
|
||||
|
||||
//Command strings found at: https://en.bitcoin.it/wiki/Protocol_specification
|
||||
var commands = {
|
||||
version: new Buffer('76657273696F6E0000000000', 'hex')
|
||||
version: commandStringBuffer('version'),
|
||||
inv: commandStringBuffer('inv')
|
||||
};
|
||||
|
||||
|
||||
@ -51,86 +84,67 @@ var Peer = module.exports = function(options){
|
||||
|
||||
SetupMessageParser(client);
|
||||
|
||||
|
||||
}
|
||||
|
||||
function SetupMessageParser(client){
|
||||
var buff = new Buffer([]);
|
||||
var msgCommand, msgLength, msgChecksum, readingPayload = false;
|
||||
|
||||
var readData = function(data){
|
||||
buff = Buffer.concat([buff, data], 2);
|
||||
if (!readingPayload){
|
||||
if (buff.length < 24) return;
|
||||
var msgMagic = buff.readUInt32LE(0);
|
||||
msgCommand = buff.slice(4, 16).toString().split("\0", 1)[0];
|
||||
msgLength = buff.readUInt32LE(16);
|
||||
msgChecksum = buff.readUInt32LE(20);
|
||||
var beginReadingMessage = function(preRead){
|
||||
|
||||
readFlowingBytes(client, 24, preRead, function(header, lopped){
|
||||
var msgMagic = header.readUInt32LE(0);
|
||||
if (msgMagic !== magicInt){
|
||||
buff = new Buffer([]);
|
||||
_this.emit('error', 'bad magic number from peer');
|
||||
beginReadingMessage(null);
|
||||
return;
|
||||
}
|
||||
readingPayload = true;
|
||||
if (buff.length > 24){
|
||||
var d = buff.slice(24);
|
||||
buff = new Buffer([]);
|
||||
readData(d);
|
||||
}
|
||||
else
|
||||
buff = new Buffer([]);
|
||||
}
|
||||
else{
|
||||
if (buff.length >= msgLength){
|
||||
|
||||
var msgPayload = buff.slice(0, msgLength);
|
||||
if (util.doublesha(msgPayload).readUInt32LE(0) !== msgChecksum){
|
||||
_this.emit('error', 'bad payload checksum from peer');
|
||||
var msgCommand = header.slice(4, 16).toString();
|
||||
var msgLength = header.readUInt32LE(16);
|
||||
var msgChecksum = header.readUInt32LE(20);
|
||||
readFlowingBytes(client, msgLength, lopped, function(payload, lopped){
|
||||
if (util.doublesha(payload).readUInt32LE(0) !== msgChecksum){
|
||||
_this.emit('error', 'bad payload - failed checksum');
|
||||
beginReadingMessage(null);
|
||||
return;
|
||||
}
|
||||
else{
|
||||
HandleMessage(msgCommand, msgPayload);
|
||||
}
|
||||
|
||||
if (buff.length > msgLength){
|
||||
var d = buff.slice(msgLength);
|
||||
buff = new Buffer([]);
|
||||
readingPayload = false;
|
||||
readData(d);
|
||||
}
|
||||
else{
|
||||
buff = new Buffer([]);
|
||||
readingPayload = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
HandleMessage(msgCommand, payload);
|
||||
beginReadingMessage(lopped);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
client.on('data', readData);
|
||||
beginReadingMessage(null);
|
||||
}
|
||||
|
||||
|
||||
//Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv
|
||||
function HandleInv(payload){
|
||||
//sloppy varint decoding
|
||||
var count = payload.readUInt8(0);
|
||||
payload = payload.slice(1);
|
||||
if (count >= 0xfd)
|
||||
{
|
||||
count = payload.readUInt16LE(0);
|
||||
payload = payload.slice(2);
|
||||
}
|
||||
while (count--)
|
||||
{
|
||||
if (payload.readUInt32LE(0) === invVectMsgBlock){
|
||||
var block = payload.slice(4, 36).toString('hex');
|
||||
console.log('block found ' + block);
|
||||
_this.emit('blockFound', block);
|
||||
}
|
||||
payload = payload.slice(36);
|
||||
}
|
||||
}
|
||||
|
||||
function HandleMessage(command, payload){
|
||||
|
||||
//Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv
|
||||
if (command === 'inv'){
|
||||
//sloppy varint decoding
|
||||
var count = payload.readUInt8(0);
|
||||
payload = payload.slice(1);
|
||||
if (count >= 0xfd)
|
||||
{
|
||||
count = payload.readUInt16LE(0);
|
||||
payload = payload.slice(2);
|
||||
}
|
||||
while (count--)
|
||||
{
|
||||
if (payload.readUInt32LE(0) === invVectMsgBlock){
|
||||
var block = payload.slice(4, 36).toString('hex');
|
||||
console.log('block found ' + block);
|
||||
_this.emit('blockFound', block);
|
||||
}
|
||||
payload = payload.slice(36);
|
||||
}
|
||||
switch(command){
|
||||
case commands.inv.toString():
|
||||
HandleInv(payload);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure
|
||||
|
||||
Loading…
Reference in New Issue
Block a user