From 31d42f3f517e068ef2c5ddb0ba84783f2d1c1d67 Mon Sep 17 00:00:00 2001 From: Matt Date: Fri, 21 Feb 2014 17:49:59 -0700 Subject: [PATCH] Began work on creating a peer node module for more efficient block notifications --- lib/daemon.js | 3 + lib/peer.js | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/pool.js | 27 ++++++++- lib/util.js | 4 ++ 4 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 lib/peer.js diff --git a/lib/daemon.js b/lib/daemon.js index 0e54f21..49f4f46 100644 --- a/lib/daemon.js +++ b/lib/daemon.js @@ -17,6 +17,7 @@ function DaemonInterface(options){ //private members var _this = this; this.options = options; + var persistentConnection; (function init(){ isOnline(function(online){ @@ -71,6 +72,8 @@ function DaemonInterface(options){ } }; + + var req = http.request(options, function(res) { var data = ''; res.setEncoding('utf8'); diff --git a/lib/peer.js b/lib/peer.js new file mode 100644 index 0000000..fca80d0 --- /dev/null +++ b/lib/peer.js @@ -0,0 +1,163 @@ +var net = require('net'); +var crypto = require('crypto'); +var events = require('events'); + +var util = require('./util.js'); + + +var Peer = module.exports = function(options){ + + var _this = this; + var client; + var magic = new Buffer(options.magic, 'hex'); + var magicInt = magic.readUInt32LE(0); + + //https://en.bitcoin.it/wiki/Protocol_specification#Inventory_Vectors + var invVectMsgBlock = 2; //Hash from inventory message is related to a data block + + var networkServices = new Buffer('0100000000000000', 'hex'); //NODE_NETWORK services (value 1 packed as uint64) + var emptyNetAddress = new Buffer('010000000000000000000000000000000000ffff000000000000', 'hex'); + var userAgent = util.varStringBuffer('/node-stratum/'); + var blockStartHeight = new Buffer('00000000', 'hex'); //block start_height, can be empty + var relayTransactions = options.protocolVersion >= 70001 ? new Buffer([false]) : new Buffer([]); + + //Command strings found at: https://en.bitcoin.it/wiki/Protocol_specification + var commands = { + version: new Buffer('76657273696F6E0000000000', 'hex') + }; + + + (function init(){ + Connect(); + })(); + + + function Connect(){ + + client = net.connect(options.host, options.port, function(){ + _this.emit('connected'); + SendVersion(); + }); + client.on('end', function(){ + _this.emit('disconnected'); + Connect(); + }); + client.on('error', function(){ + _this.emit('connectionFailed'); + }); + + + SetupMessageParser(client); + + + } + + function SetupMessageParser(client){ + var buff = new Buffer([]); + var msgCommand, msgLength, msgChecksum, readingPayload = false; + + var readData = function(data){ + buff = Buffer.concat([buff, data], 2); + if (!readingPayload){ + if (buff.length < 24) return; + var msgMagic = buff.readUInt32LE(0); + msgCommand = buff.slice(4, 16).toString().split("\0", 1)[0]; + msgLength = buff.readUInt32LE(16); + msgChecksum = buff.readUInt32LE(20); + if (msgMagic !== magicInt){ + buff = new Buffer([]); + _this.emit('error', 'bad magic number from peer'); + return; + } + readingPayload = true; + if (buff.length > 24){ + var d = buff.slice(24); + buff = new Buffer([]); + readData(d); + } + else + buff = new Buffer([]); + } + else{ + if (buff.length >= msgLength){ + + var msgPayload = buff.slice(0, msgLength); + if (util.doublesha(msgPayload).readUInt32LE(0) !== msgChecksum){ + _this.emit('error', 'bad payload checksum from peer'); + } + else{ + HandleMessage(msgCommand, msgPayload); + } + + if (buff.length > msgLength){ + var d = buff.slice(msgLength); + buff = new Buffer([]); + readingPayload = false; + readData(d); + } + else{ + buff = new Buffer([]); + readingPayload = false; + } + } + } + }; + + client.on('data', readData); + + } + + function HandleMessage(command, payload){ + + //Parsing inv message https://en.bitcoin.it/wiki/Protocol_specification#inv + if (command === 'inv'){ + //sloppy varint decoding + var count = payload.readUInt8(0); + payload = payload.slice(1); + if (count >= 0xfd) + { + count = payload.readUInt16LE(0); + payload = payload.slice(2); + } + while (count--) + { + if (payload.readUInt32LE(0) === invVectMsgBlock){ + var block = payload.slice(4, 36).toString('hex'); + console.log('block found ' + block); + _this.emit('blockFound', block); + } + payload = payload.slice(36); + } + } + } + + //Message structure defined at: https://en.bitcoin.it/wiki/Protocol_specification#Message_structure + function SendMessage(command, payload){ + var message = Buffer.concat([ + magic, + command, + util.packUInt32LE(payload.length), + util.doublesha(payload).slice(0, 4), + payload + ]); + client.write(message); + } + + function SendVersion(){ + var payload = Buffer.concat([ + util.packUInt32LE(options.protocolVersion), + networkServices, + util.packUInt32LE(Date.now() / 1000 | 0), + emptyNetAddress, //addr_recv, can be empty + emptyNetAddress, //addr_from, can be empty + crypto.pseudoRandomBytes(8), //nonce, random unique ID + userAgent, + blockStartHeight, + relayTransactions + ]); + SendMessage(commands.version, payload); + } + +}; + +Peer.prototype.__proto__ = events.EventEmitter.prototype; \ No newline at end of file diff --git a/lib/pool.js b/lib/pool.js index 85931a2..9d01b2b 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -3,6 +3,7 @@ var async = require('async'); var varDiff = require('./varDiff.js'); var daemon = require('./daemon.js'); +var peer = require('./peer.js'); var stratum = require('./stratum.js'); var jobManager = require('./jobManager.js'); var util = require('./util.js'); @@ -39,6 +40,22 @@ var pool = module.exports = function pool(options, authorizeFn){ SetupDaemonInterface(); }; + function SetupPeer(){ + if (!options.p2p || !options.p2p.enabled) + return; + _this.peer = new peer(options.p2p); + _this.peer.on('connected', function(){ + emitLog('system', 'connected to daemon as a peer node'); + }).on('disconnected', function(){ + emitWarningLog('system', 'peer node disconnected - attempting reconnection...'); + }).on('connectionFailed', function(){ + emitErrorLog('system', 'failed to connect to daemon as a peer node'); + }).on('error', function(msg){ + emitWarningLog('p2p', msg); + }).on('blockFound', function(hash){ + this.processBlockNotify(hash); + }); + } function SetupVarDiff(){ if (!options.varDiff.enabled){ @@ -163,7 +180,7 @@ var pool = module.exports = function pool(options, authorizeFn){ function SetupDaemonInterface(){ - emitLog('system','Connecting to daemon'); + //emitLog('system','Connecting to daemon'); _this.daemon = new daemon.interface(options.daemon); _this.daemon.on('online', function(){ async.parallel({ @@ -210,7 +227,10 @@ var pool = module.exports = function pool(options, authorizeFn){ ); } }, function(err, results){ - if (err) return; + if (err){ + emitErrorLog('system', 'Failed to daemon'); + return; + } emitLog('system','Connected to daemon'); @@ -245,6 +265,7 @@ var pool = module.exports = function pool(options, authorizeFn){ else{ SetupBlockPolling(); StartStratumServer(); + SetupPeer(); } }); } @@ -259,7 +280,7 @@ var pool = module.exports = function pool(options, authorizeFn){ function StartStratumServer(){ - emitLog('system', 'Stratum server starting on port ' + options.stratumPort); + //emitLog('system', 'Stratum server starting on port ' + options.stratumPort); _this.stratumServer = new stratum.Server({ port: options.stratumPort, authorizeFn: authorizeFn diff --git a/lib/util.js b/lib/util.js index a3c0df8..2edeba4 100644 --- a/lib/util.js +++ b/lib/util.js @@ -96,6 +96,10 @@ exports.varIntBuffer = function(n){ } }; +exports.varStringBuffer = function(string){ + var strBuff = new Buffer(string); + return Buffer.concat([exports.varIntBuffer(strBuff.length), strBuff]); +}; /* "serialized CScript" formatting as defined here: