make events be processed sequencially

This commit is contained in:
Manuel Araoz 2015-04-01 15:39:28 -03:00
parent c58d415655
commit 9a41658f18
3 changed files with 28 additions and 41 deletions

View File

@ -37,13 +37,25 @@ EventBus.prototype.process = function(e) {
); );
}); });
}; };
var eventsEmitted = processEvent(e)
var whenPreviousFinishes = Promise.resolve();
if (this.previous && !this.previous.isFulfilled()) {
whenPreviousFinishes = this.previous;
}
var current = whenPreviousFinishes
.then(function() {
return processEvent(e);
})
.then(function() { .then(function() {
done.forEach(function(event) { done.forEach(function(event) {
self.emit(event.name || event.constructor.name, event); self.emit(event.name || event.constructor.name, event);
}); });
}); });
return eventsEmitted; current.e = e;
this.previous = current;
return current;
}; };

View File

@ -49,7 +49,6 @@ NetworkMonitor.prototype.setupPeer = function(peer) {
}); });
}); });
peer.on('block', function(m) { peer.on('block', function(m) {
console.log('block', m.block.id);
self.bus.process(m.block) self.bus.process(m.block)
.catch(function(err) { .catch(function(err) {
self.abort(err); self.abort(err);
@ -74,8 +73,7 @@ NetworkMonitor.prototype.stop = function(reason) {
}; };
NetworkMonitor.prototype.abort = function(reason) { NetworkMonitor.prototype.abort = function(reason) {
this.peer.disconnect(); this.peer.disconnect();
console.log('Unexpected error, aborting:', reason); throw reason;
process.exit(1);
}; };
NetworkMonitor.prototype.syncFrom = function(start) { NetworkMonitor.prototype.syncFrom = function(start) {

View File

@ -14,7 +14,6 @@ var $ = bitcore.util.preconditions;
var JSUtil = bitcore.util.js; var JSUtil = bitcore.util.js;
var _ = bitcore.deps._; var _ = bitcore.deps._;
var LOCK = 'lock-';
var NULLBLOCKHASH = bitcore.util.buffer.emptyBuffer(32).toString('hex'); var NULLBLOCKHASH = bitcore.util.buffer.emptyBuffer(32).toString('hex');
var GENESISPARENT = { var GENESISPARENT = {
height: -1, height: -1,
@ -61,27 +60,6 @@ function BlockService(opts) {
}); });
} }
BlockService.prototype.writeLock = function() {
var self = this;
return new Promise(function(resolve) {
console.log('getting lock');
var checkLock = function() {
if (self.lock) {
setImmediate(checkLock);
return;
}
self.lock = true;
console.log('lock acquired');
resolve();
};
checkLock();
});
};
BlockService.prototype.unlock = function() {
this.lock = false;
};
/** /**
* Transforms data as received from an RPC result structure for `getblock`, * Transforms data as received from an RPC result structure for `getblock`,
@ -233,7 +211,7 @@ BlockService.prototype.onBlock = function(block) {
var events = []; var events = [];
return this.save(block) return this.save(block)
.then(function(block) { .then(function(block) {
console.log('block', block.id, 'saved'); console.log('block', block.id, 'saved with height', block.height);
block.transactions.forEach(function(tx) { block.transactions.forEach(function(tx) {
events.push(tx); events.push(tx);
}); });
@ -266,51 +244,49 @@ BlockService.prototype._confirmBlock = function(block) {
var ops = []; var ops = [];
return this.writeLock() return Promise.try(function() {
.then(function() { //console.log(1);
console.log(1);
return self._setNextBlock(ops, block.header.prevHash, block); return self._setNextBlock(ops, block.header.prevHash, block);
}).then(function() { }).then(function() {
console.log(2); //console.log(2);
if (block.header.prevHash.toString('hex') !== NULLBLOCKHASH) { if (block.header.prevHash.toString('hex') !== NULLBLOCKHASH) {
console.log(2.1); //console.log(2.1);
return self.getBlock(block.header.prevHash, { return self.getBlock(block.header.prevHash, {
withoutTransactions: true withoutTransactions: true
}); });
} else { } else {
console.log(2.2); //console.log(2.2);
return GENESISPARENT; return GENESISPARENT;
} }
}).then(function(parent) { }).then(function(parent) {
console.log(3); //console.log(3);
return self._setBlockHeight(ops, block, parent.height + 1); return self._setBlockHeight(ops, block, parent.height + 1);
}).then(function() { }).then(function() {
console.log(4); //console.log(4);
return self._setBlockByTs(ops, block); return self._setBlockByTs(ops, block);
}).then(function() { }).then(function() {
console.log(5); //console.log(5);
return Promise.all(block.transactions.map(function(transaction) { return Promise.all(block.transactions.map(function(transaction) {
return self.transactionService._confirmTransaction(ops, block, transaction); return self.transactionService._confirmTransaction(ops, block, transaction);
})); }));
}).then(function() { }).then(function() {
console.log(6); //console.log(6);
var p = self.database.batchAsync(ops); var p = self.database.batchAsync(ops);
console.log(6.5); //console.log(6.5);
return p; return p;
}) })
.then(this.unlock.bind(this))
.then(function() { .then(function() {
console.log(7); //console.log(7);
return block; return block;
}); });
}; };
@ -334,6 +310,7 @@ BlockService.prototype._setNextBlock = function(ops, prevBlockHash, block) {
}; };
BlockService.prototype._setBlockHeight = function(ops, block, height) { BlockService.prototype._setBlockHeight = function(ops, block, height) {
block.height = height;
return Promise.try(function() { return Promise.try(function() {
ops.push({ ops.push({
type: 'put', type: 'put',