From 9a41658f18e6b5984bbb1286ae1ef1c9cb7a9d9c Mon Sep 17 00:00:00 2001 From: Manuel Araoz Date: Wed, 1 Apr 2015 15:39:28 -0300 Subject: [PATCH] make events be processed sequencially --- lib/eventbus.js | 16 ++++++++++++-- lib/networkmonitor.js | 4 +--- lib/services/block.js | 49 ++++++++++++------------------------------- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/lib/eventbus.js b/lib/eventbus.js index 2859d9a8..90730674 100644 --- a/lib/eventbus.js +++ b/lib/eventbus.js @@ -37,13 +37,25 @@ EventBus.prototype.process = function(e) { ); }); }; - var eventsEmitted = processEvent(e) + + var whenPreviousFinishes = Promise.resolve(); + if (this.previous && !this.previous.isFulfilled()) { + whenPreviousFinishes = this.previous; + } + + var current = whenPreviousFinishes + .then(function() { + return processEvent(e); + }) .then(function() { done.forEach(function(event) { self.emit(event.name || event.constructor.name, event); }); }); - return eventsEmitted; + current.e = e; + this.previous = current; + return current; + }; diff --git a/lib/networkmonitor.js b/lib/networkmonitor.js index 85a5acd5..e0a1c2ba 100644 --- a/lib/networkmonitor.js +++ b/lib/networkmonitor.js @@ -49,7 +49,6 @@ NetworkMonitor.prototype.setupPeer = function(peer) { }); }); peer.on('block', function(m) { - console.log('block', m.block.id); self.bus.process(m.block) .catch(function(err) { self.abort(err); @@ -74,8 +73,7 @@ NetworkMonitor.prototype.stop = function(reason) { }; NetworkMonitor.prototype.abort = function(reason) { this.peer.disconnect(); - console.log('Unexpected error, aborting:', reason); - process.exit(1); + throw reason; }; NetworkMonitor.prototype.syncFrom = function(start) { diff --git a/lib/services/block.js b/lib/services/block.js index b49e5b09..39468652 100644 --- a/lib/services/block.js +++ b/lib/services/block.js @@ -14,7 +14,6 @@ var $ = bitcore.util.preconditions; var JSUtil = bitcore.util.js; var _ = bitcore.deps._; -var LOCK = 'lock-'; var NULLBLOCKHASH = bitcore.util.buffer.emptyBuffer(32).toString('hex'); var GENESISPARENT = { height: -1, @@ -61,27 +60,6 @@ function BlockService(opts) { }); } -BlockService.prototype.writeLock = function() { - var self = this; - return new Promise(function(resolve) { - console.log('getting lock'); - var checkLock = function() { - if (self.lock) { - setImmediate(checkLock); - return; - } - self.lock = true; - console.log('lock acquired'); - resolve(); - - }; - checkLock(); - }); -}; - -BlockService.prototype.unlock = function() { - this.lock = false; -}; /** * Transforms data as received from an RPC result structure for `getblock`, @@ -233,7 +211,7 @@ BlockService.prototype.onBlock = function(block) { var events = []; return this.save(block) .then(function(block) { - console.log('block', block.id, 'saved'); + console.log('block', block.id, 'saved with height', block.height); block.transactions.forEach(function(tx) { events.push(tx); }); @@ -266,51 +244,49 @@ BlockService.prototype._confirmBlock = function(block) { var ops = []; - return this.writeLock() - .then(function() { - console.log(1); + return Promise.try(function() { + //console.log(1); return self._setNextBlock(ops, block.header.prevHash, block); }).then(function() { - console.log(2); + //console.log(2); if (block.header.prevHash.toString('hex') !== NULLBLOCKHASH) { - console.log(2.1); + //console.log(2.1); return self.getBlock(block.header.prevHash, { withoutTransactions: true }); } else { - console.log(2.2); + //console.log(2.2); return GENESISPARENT; } }).then(function(parent) { - console.log(3); + //console.log(3); return self._setBlockHeight(ops, block, parent.height + 1); }).then(function() { - console.log(4); + //console.log(4); return self._setBlockByTs(ops, block); }).then(function() { - console.log(5); + //console.log(5); return Promise.all(block.transactions.map(function(transaction) { return self.transactionService._confirmTransaction(ops, block, transaction); })); }).then(function() { - console.log(6); + //console.log(6); var p = self.database.batchAsync(ops); - console.log(6.5); + //console.log(6.5); return p; }) - .then(this.unlock.bind(this)) .then(function() { - console.log(7); + //console.log(7); return block; }); }; @@ -334,6 +310,7 @@ BlockService.prototype._setNextBlock = function(ops, prevBlockHash, block) { }; BlockService.prototype._setBlockHeight = function(ops, block, height) { + block.height = height; return Promise.try(function() { ops.push({ type: 'put',