utils: rewrite async object.

This commit is contained in:
Christopher Jeffrey 2016-11-10 13:33:40 -08:00
parent d088412380
commit 06b35d4ab2
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD

View File

@ -6,8 +6,9 @@
'use strict';
var utils = require('../utils/utils');
var co = require('../utils/co');
var utils = require('./utils');
var co = require('./co');
var Locker = require('./locker');
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
@ -25,10 +26,11 @@ function AsyncObject() {
EventEmitter.call(this);
this._asyncLock = new Locker();
this.loading = false;
this.closing = false;
this.loaded = false;
this.locker = null;
}
utils.inherits(AsyncObject, EventEmitter);
@ -39,18 +41,23 @@ utils.inherits(AsyncObject, EventEmitter);
*/
AsyncObject.prototype.open = co(function* open() {
var err, unlock;
var unlock = yield this._asyncLock.lock();
try {
return yield this.__open();
} finally {
unlock();
}
});
assert(!this.closing, 'Cannot open while closing.');
/**
* Open the object (without a lock).
* @private
* @returns {Promise}
*/
AsyncObject.prototype.__open = co(function* open() {
if (this.loaded)
return yield co.wait();
if (this.loading)
return yield this._onOpen();
if (this.locker)
unlock = yield this.locker.lock();
return;
this.emit('preopen');
@ -59,25 +66,15 @@ AsyncObject.prototype.open = co(function* open() {
try {
yield this._open();
} catch (e) {
err = e;
}
yield co.wait();
if (err) {
this.loading = false;
this._error('open', err);
if (unlock)
unlock();
throw err;
this.emit('error', e);
throw e;
}
this.loading = false;
this.loaded = true;
this.emit('open');
if (unlock)
unlock();
this.emit('open');
});
/**
@ -86,45 +83,40 @@ AsyncObject.prototype.open = co(function* open() {
*/
AsyncObject.prototype.close = co(function* close() {
var unlock, err;
var unlock = yield this._asyncLock.lock();
try {
return yield this.__close();
} finally {
unlock();
}
});
assert(!this.loading, 'Cannot close while loading.');
/**
* Close the object (without a lock).
* @private
* @returns {Promise}
*/
AsyncObject.prototype.__close = co(function* close() {
if (!this.loaded)
return yield co.wait();
if (this.closing)
return yield this._onClose();
if (this.locker)
unlock = yield this.locker.lock();
return;
this.emit('preclose');
this.closing = true;
this.loaded = false;
try {
yield this._close();
} catch (e) {
err = e;
}
yield co.wait();
if (err) {
this.closing = false;
this._error('close', err);
if (unlock)
unlock();
throw err;
this.emit('error', e);
throw e;
}
this.closing = false;
this.emit('close');
this.loaded = false;
if (unlock)
unlock();
this.emit('close');
});
/**
@ -135,25 +127,6 @@ AsyncObject.prototype.close = co(function* close() {
AsyncObject.prototype.destroy = AsyncObject.prototype.close;
/**
* Emit an error for `open` or `close` listeners.
* @private
* @param {String} event
* @param {Error} err
*/
AsyncObject.prototype._error = function _error(event, err) {
var listeners = this.listeners(event);
var i;
this.removeAllListeners(event);
for (i = 0; i < listeners.length; i++)
listeners[i](err);
this.emit('error', err);
};
/**
* Initialize the object.
* @private
@ -174,32 +147,6 @@ AsyncObject.prototype._close = function _close(callback) {
throw new Error('Abstract method.');
};
/**
* Wait for open event.
* @private
* @returns {Promise}
*/
AsyncObject.prototype._onOpen = function _onOpen() {
var self = this;
return new Promise(function(resolve, reject) {
return self.once('open', resolve);
});
};
/**
* Wait for close event.
* @private
* @returns {Promise}
*/
AsyncObject.prototype._onClose = function _onClose() {
var self = this;
return new Promise(function(resolve, reject) {
return self.once('close', resolve);
});
};
/*
* Expose
*/