diff --git a/lib/bcoin/workers.js b/lib/bcoin/workers.js index cbf853a0..9881ee0c 100644 --- a/lib/bcoin/workers.js +++ b/lib/bcoin/workers.js @@ -389,8 +389,8 @@ function Worker(id) { self.emit('error', e); }); - this.parser.on('packet', function(job, body) { - self.emit('packet', job, body); + this.parser.on('packet', function(packet) { + self.emit('packet', packet); }); this._init(); @@ -419,28 +419,28 @@ Worker.prototype._init = function _init() { utils.log.apply(utils, items); }); - this.on('packet', function(job, body) { + this.on('packet', function(packet) { var err, result; - if (body.name === 'event') { - self.emit.apply(self, body.items); - self.emit('event', body.items); + if (packet.cmd === 'event') { + self.emit.apply(self, packet.items); + self.emit('event', packet.items); return; } - if (body.name === 'response') { - err = body.items[0]; - result = body.items[1]; + if (packet.cmd === 'response') { + err = packet.items[0]; + result = packet.items[1]; if (err) err = toError(err); - self.emit('response ' + job, err, result); + self.emit('response ' + packet.job, err, result); return; } - err = new Error('Unknown packet: ' + body.name); + err = new Error('Unknown packet: ' + packet.cmd); self.emit('error', err); }); @@ -471,13 +471,13 @@ Worker.prototype.write = function write(data) { /** * Frame and send a packet. * @param {String} job - * @param {String} name + * @param {String} cmd * @param {Array} items * @returns {Boolean} */ -Worker.prototype.send = function send(job, name, items) { - return this.write(this.framer.packet(job, name, items)); +Worker.prototype.send = function send(job, cmd, items) { + return this.write(this.framer.packet(job, cmd, items)); }; /** @@ -612,8 +612,8 @@ function Master(options) { self.emit('error', e); }); - this.parser.on('packet', function(job, body) { - self.emit('packet', job, body); + this.parser.on('packet', function(packet) { + self.emit('packet', packet); }); } @@ -641,13 +641,13 @@ Master.prototype.write = function write(data) { /** * Frame and send a packet. * @param {String} job - * @param {String} name + * @param {String} cmd * @param {Array} items * @returns {Boolean} */ -Master.prototype.send = function send(job, name, items) { - return this.write(this.framer.packet(job, name, items)); +Master.prototype.send = function send(job, cmd, items) { + return this.write(this.framer.packet(job, cmd, items)); }; /** @@ -709,22 +709,22 @@ Master.listen = function listen(options) { master.sendEvent('worker error', fromError(err)); }); - master.on('packet', function(job, body) { + master.on('packet', function(packet) { var result; - if (body.name === 'event') { - master.emit('event', body.items); - master.emit.apply(master, body.items); + if (packet.cmd === 'event') { + master.emit('event', packet.items); + master.emit.apply(master, packet.items); return; } try { - result = jobs[body.name].apply(jobs, body.items); + result = jobs[packet.cmd].apply(jobs, packet.items); } catch (e) { - return master.send(job, 'response', [fromError(e)]); + return master.send(packet.job, 'response', [fromError(e)]); } - return master.send(job, 'response', [null, result]); + return master.send(packet.job, 'response', [null, result]); }); bcoin.master = master; @@ -819,31 +819,28 @@ function Framer() { utils.inherits(Framer, EventEmitter); -Framer.prototype.packet = function packet(job, name, items) { +Framer.prototype.packet = function packet(job, cmd, items) { var p = new BufferWriter(); - var payload = this.body(name, items); + var payload = this.body(items); + + assert(cmd.length < 12); + assert(payload.length <= 0xffffffff); + p.writeU32(0xdeadbeef); p.writeU32(job); - p.writeU32(payload.length); + p.writeString(cmd, 'ascii'); + p.fill(0, 12 - cmd.length); + p.writeU32(payload.length + 1); p.writeBytes(payload); - return p.render(); -}; - -Framer.prototype.body = function body(name, items) { - var p = new BufferWriter(); - - if (name) - p.writeVarString(name, 'ascii'); - else - p.writeVarint(0); - - Framer.item(items, p); - p.writeU8(0x0a); return p.render(); }; +Framer.prototype.body = function body(items) { + return Framer.item(items); +}; + Framer.item = function _item(item, writer) { var p = BufferWriter(writer); var i, keys; @@ -942,8 +939,8 @@ function Parser() { EventEmitter.call(this); - this.waiting = 12; - this.header = null; + this.waiting = 24; + this.packet = null; this.pending = []; this.total = 0; } @@ -998,57 +995,66 @@ Parser.prototype.feed = function feed(data) { }; Parser.prototype.parse = function parse(data) { - var header, body; + var packet; - if (!this.header) { - header = this.parseHeader(data); - - if (header.magic !== 0xdeadbeef) { - this.emit('error', new Error('Bad magic number.')); + if (!this.packet) { + try { + packet = this.parseHeader(data); + } catch (e) { + this.emit('error', e); return; } - this.header = header; - this.waiting = header.size; + this.packet = packet; + this.waiting = packet.size; return; } - header = this.header; - this.waiting = 12; - this.header = null; + packet = this.packet; + + this.waiting = 24; + this.packet = null; try { - body = this.parseBody(data); + packet.items = this.parseBody(data); } catch (e) { this.emit('error', e); return; } - this.emit('packet', header.job, body); + if (data[data.length - 1] !== 0x0a) { + this.emit('error', new Error('No trailing newline.')); + return; + } + + this.emit('packet', packet); }; Parser.prototype.parseHeader = function parseHeader(data) { - return { - magic: data.readUInt32LE(0, true), - job: data.readUInt32LE(4, true), - size: data.readUInt32LE(8, true) - }; + var magic, job, size, i, cmd; + + magic = data.readUInt32LE(0, true); + + if (magic !== 0xdeadbeef) + throw new Error('Bad magic number: ' + magic.toString(16)); + + job = data.readUInt32LE(4, true); + + for (i = 0; data[i + 8] !== 0 && i < 12; i++); + + if (i === 12) + throw new Error('Not NULL-terminated cmd'); + + cmd = data.toString('ascii', 8, 8 + i); + + size = data.readUInt32LE(20, true); + + return new Packet(job, cmd, size); }; Parser.prototype.parseBody = function parseBody(data) { - var p = new BufferReader(data, true); - var name, items; - - name = p.readVarString('ascii'); - items = Parser.parseItem(p); - - assert(p.readU8() === 0x0a); - - return { - name: name || null, - items: items - }; + return Parser.parseItem(data); }; Parser.parseItem = function parseItem(data) { @@ -1107,6 +1113,18 @@ Parser.parseItem = function parseItem(data) { } }; +/** + * Packet + * @constructor + */ + +function Packet(job, cmd, size) { + this.job = job; + this.cmd = cmd; + this.size = size; + this.items = null; +} + /* * Helpers */