workers: refactor worker protocol.

This commit is contained in:
Christopher Jeffrey 2016-07-27 05:42:36 -07:00
parent cd8adbcc74
commit 0b620c2e47
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD

View File

@ -389,8 +389,8 @@ function Worker(id) {
self.emit('error', e);
});
this.parser.on('packet', function(job, body) {
self.emit('packet', job, body);
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
this._init();
@ -419,28 +419,28 @@ Worker.prototype._init = function _init() {
utils.log.apply(utils, items);
});
this.on('packet', function(job, body) {
this.on('packet', function(packet) {
var err, result;
if (body.name === 'event') {
self.emit.apply(self, body.items);
self.emit('event', body.items);
if (packet.cmd === 'event') {
self.emit.apply(self, packet.items);
self.emit('event', packet.items);
return;
}
if (body.name === 'response') {
err = body.items[0];
result = body.items[1];
if (packet.cmd === 'response') {
err = packet.items[0];
result = packet.items[1];
if (err)
err = toError(err);
self.emit('response ' + job, err, result);
self.emit('response ' + packet.job, err, result);
return;
}
err = new Error('Unknown packet: ' + body.name);
err = new Error('Unknown packet: ' + packet.cmd);
self.emit('error', err);
});
@ -471,13 +471,13 @@ Worker.prototype.write = function write(data) {
/**
* Frame and send a packet.
* @param {String} job
* @param {String} name
* @param {String} cmd
* @param {Array} items
* @returns {Boolean}
*/
Worker.prototype.send = function send(job, name, items) {
return this.write(this.framer.packet(job, name, items));
Worker.prototype.send = function send(job, cmd, items) {
return this.write(this.framer.packet(job, cmd, items));
};
/**
@ -612,8 +612,8 @@ function Master(options) {
self.emit('error', e);
});
this.parser.on('packet', function(job, body) {
self.emit('packet', job, body);
this.parser.on('packet', function(packet) {
self.emit('packet', packet);
});
}
@ -641,13 +641,13 @@ Master.prototype.write = function write(data) {
/**
* Frame and send a packet.
* @param {String} job
* @param {String} name
* @param {String} cmd
* @param {Array} items
* @returns {Boolean}
*/
Master.prototype.send = function send(job, name, items) {
return this.write(this.framer.packet(job, name, items));
Master.prototype.send = function send(job, cmd, items) {
return this.write(this.framer.packet(job, cmd, items));
};
/**
@ -709,22 +709,22 @@ Master.listen = function listen(options) {
master.sendEvent('worker error', fromError(err));
});
master.on('packet', function(job, body) {
master.on('packet', function(packet) {
var result;
if (body.name === 'event') {
master.emit('event', body.items);
master.emit.apply(master, body.items);
if (packet.cmd === 'event') {
master.emit('event', packet.items);
master.emit.apply(master, packet.items);
return;
}
try {
result = jobs[body.name].apply(jobs, body.items);
result = jobs[packet.cmd].apply(jobs, packet.items);
} catch (e) {
return master.send(job, 'response', [fromError(e)]);
return master.send(packet.job, 'response', [fromError(e)]);
}
return master.send(job, 'response', [null, result]);
return master.send(packet.job, 'response', [null, result]);
});
bcoin.master = master;
@ -819,31 +819,28 @@ function Framer() {
utils.inherits(Framer, EventEmitter);
Framer.prototype.packet = function packet(job, name, items) {
Framer.prototype.packet = function packet(job, cmd, items) {
var p = new BufferWriter();
var payload = this.body(name, items);
var payload = this.body(items);
assert(cmd.length < 12);
assert(payload.length <= 0xffffffff);
p.writeU32(0xdeadbeef);
p.writeU32(job);
p.writeU32(payload.length);
p.writeString(cmd, 'ascii');
p.fill(0, 12 - cmd.length);
p.writeU32(payload.length + 1);
p.writeBytes(payload);
return p.render();
};
Framer.prototype.body = function body(name, items) {
var p = new BufferWriter();
if (name)
p.writeVarString(name, 'ascii');
else
p.writeVarint(0);
Framer.item(items, p);
p.writeU8(0x0a);
return p.render();
};
Framer.prototype.body = function body(items) {
return Framer.item(items);
};
Framer.item = function _item(item, writer) {
var p = BufferWriter(writer);
var i, keys;
@ -942,8 +939,8 @@ function Parser() {
EventEmitter.call(this);
this.waiting = 12;
this.header = null;
this.waiting = 24;
this.packet = null;
this.pending = [];
this.total = 0;
}
@ -998,57 +995,66 @@ Parser.prototype.feed = function feed(data) {
};
Parser.prototype.parse = function parse(data) {
var header, body;
var packet;
if (!this.header) {
header = this.parseHeader(data);
if (header.magic !== 0xdeadbeef) {
this.emit('error', new Error('Bad magic number.'));
if (!this.packet) {
try {
packet = this.parseHeader(data);
} catch (e) {
this.emit('error', e);
return;
}
this.header = header;
this.waiting = header.size;
this.packet = packet;
this.waiting = packet.size;
return;
}
header = this.header;
this.waiting = 12;
this.header = null;
packet = this.packet;
this.waiting = 24;
this.packet = null;
try {
body = this.parseBody(data);
packet.items = this.parseBody(data);
} catch (e) {
this.emit('error', e);
return;
}
this.emit('packet', header.job, body);
if (data[data.length - 1] !== 0x0a) {
this.emit('error', new Error('No trailing newline.'));
return;
}
this.emit('packet', packet);
};
Parser.prototype.parseHeader = function parseHeader(data) {
return {
magic: data.readUInt32LE(0, true),
job: data.readUInt32LE(4, true),
size: data.readUInt32LE(8, true)
};
var magic, job, size, i, cmd;
magic = data.readUInt32LE(0, true);
if (magic !== 0xdeadbeef)
throw new Error('Bad magic number: ' + magic.toString(16));
job = data.readUInt32LE(4, true);
for (i = 0; data[i + 8] !== 0 && i < 12; i++);
if (i === 12)
throw new Error('Not NULL-terminated cmd');
cmd = data.toString('ascii', 8, 8 + i);
size = data.readUInt32LE(20, true);
return new Packet(job, cmd, size);
};
Parser.prototype.parseBody = function parseBody(data) {
var p = new BufferReader(data, true);
var name, items;
name = p.readVarString('ascii');
items = Parser.parseItem(p);
assert(p.readU8() === 0x0a);
return {
name: name || null,
items: items
};
return Parser.parseItem(data);
};
Parser.parseItem = function parseItem(data) {
@ -1107,6 +1113,18 @@ Parser.parseItem = function parseItem(data) {
}
};
/**
* Packet
* @constructor
*/
function Packet(job, cmd, size) {
this.job = job;
this.cmd = cmd;
this.size = size;
this.items = null;
}
/*
* Helpers
*/