concurrency wip
This commit is contained in:
parent
7662bf1bf5
commit
25c1492580
@ -2,19 +2,20 @@
|
||||
var BaseService = require('../service');
|
||||
var inherits = require('util').inherits;
|
||||
|
||||
function TimestampService(options) {
|
||||
function BlockService(options) {
|
||||
BaseService.call(this, options);
|
||||
this.currentBlock = null;
|
||||
this.currentTimestamp = null;
|
||||
}
|
||||
|
||||
inherits(TimestampService, BaseService);
|
||||
inherits(BlockService, BaseService);
|
||||
|
||||
TimestampService.dependencies = [
|
||||
'db'
|
||||
BlockService.dependencies = [
|
||||
'db',
|
||||
'transaction'
|
||||
];
|
||||
|
||||
TimestampService.prototype.start = function(callback) {
|
||||
BlockService.prototype.start = function(callback) {
|
||||
var self = this;
|
||||
|
||||
this.store = this.node.services.db.store;
|
||||
@ -30,11 +31,11 @@ TimestampService.prototype.start = function(callback) {
|
||||
});
|
||||
};
|
||||
|
||||
TimestampService.prototype.stop = function(callback) {
|
||||
BlockService.prototype.stop = function(callback) {
|
||||
setImmediate(callback);
|
||||
};
|
||||
|
||||
TimestampService.prototype.blockHandler = function(block, connectBlock, callback) {
|
||||
BlockService.prototype.blockHandler = function(block, connectBlock, callback) {
|
||||
var self = this;
|
||||
|
||||
var action = 'put';
|
||||
@ -69,6 +70,8 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
|
||||
self.currentBlock = block.hash;
|
||||
self.currentTimestamp = timestamp;
|
||||
|
||||
// TODO combine with block header
|
||||
|
||||
operations = operations.concat(
|
||||
[
|
||||
{
|
||||
@ -88,7 +91,16 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
|
||||
});
|
||||
};
|
||||
|
||||
TimestampService.prototype.getTimestamp = function(hash, callback) {
|
||||
BlockService.prototype.getBlock = function(hash, callback) {
|
||||
// get block header
|
||||
// get individual transactions
|
||||
};
|
||||
|
||||
BlockService.prototype.getBlockByHeight = function(height, callback) {
|
||||
|
||||
};
|
||||
|
||||
BlockService.prototype.getTimestamp = function(hash, callback) {
|
||||
var self = this;
|
||||
|
||||
if (hash === self.currentBlock) {
|
||||
@ -107,40 +119,40 @@ TimestampService.prototype.getTimestamp = function(hash, callback) {
|
||||
});
|
||||
};
|
||||
|
||||
TimestampService.prototype._encodeBlockTimestampKey = function(hash) {
|
||||
BlockService.prototype._encodeBlockTimestampKey = function(hash) {
|
||||
return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]);
|
||||
};
|
||||
|
||||
TimestampService.prototype._decodeBlockTimestampKey = function(buffer) {
|
||||
BlockService.prototype._decodeBlockTimestampKey = function(buffer) {
|
||||
return buffer.slice(2).toString('hex');
|
||||
};
|
||||
|
||||
TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) {
|
||||
BlockService.prototype._encodeBlockTimestampValue = function(timestamp) {
|
||||
var timestampBuffer = new Buffer(new Array(8));
|
||||
timestampBuffer.writeDoubleBE(timestamp);
|
||||
return timestampBuffer;
|
||||
};
|
||||
|
||||
TimestampService.prototype._decodeBlockTimestampValue = function(buffer) {
|
||||
BlockService.prototype._decodeBlockTimestampValue = function(buffer) {
|
||||
return buffer.readDoubleBE(0);
|
||||
};
|
||||
|
||||
TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) {
|
||||
BlockService.prototype._encodeTimestampBlockKey = function(timestamp) {
|
||||
var timestampBuffer = new Buffer(new Array(8));
|
||||
timestampBuffer.writeDoubleBE(timestamp);
|
||||
return Buffer.concat([this.prefix, timestampBuffer]);
|
||||
};
|
||||
|
||||
TimestampService.prototype._decodeTimestampBlockKey = function(buffer) {
|
||||
BlockService.prototype._decodeTimestampBlockKey = function(buffer) {
|
||||
return buffer.readDoubleBE(2);
|
||||
};
|
||||
|
||||
TimestampService.prototype._encodeTimestampBlockValue = function(hash) {
|
||||
BlockService.prototype._encodeTimestampBlockValue = function(hash) {
|
||||
return new Buffer(hash, 'hex');
|
||||
};
|
||||
|
||||
TimestampService.prototype._decodeTimestampBlockValue = function(buffer) {
|
||||
BlockService.prototype._decodeTimestampBlockValue = function(buffer) {
|
||||
return buffer.toString('hex');
|
||||
};
|
||||
|
||||
module.exports = TimestampService;
|
||||
module.exports = BlockService;
|
||||
@ -533,6 +533,40 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) {
|
||||
);
|
||||
};
|
||||
|
||||
DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) {
|
||||
var self = this;
|
||||
var operations = [];
|
||||
|
||||
async.each(
|
||||
this.node.services,
|
||||
function(mod, next) {
|
||||
if(mod.concurrenctBlockHandler) {
|
||||
$.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function');
|
||||
|
||||
mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
if (ops) {
|
||||
$.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array');
|
||||
operations = operations.concat(ops);
|
||||
}
|
||||
next();
|
||||
});
|
||||
} else {
|
||||
setImmediate(next);
|
||||
}
|
||||
},
|
||||
function(err) {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, operations);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* This function will find the common ancestor between the current chain and a forked block,
|
||||
* by moving backwards on both chains until there is a meeting point.
|
||||
@ -653,13 +687,165 @@ DB.prototype.syncRewind = function(block, done) {
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype.sync = function() {
|
||||
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.bitcoindSyncing = true;
|
||||
|
||||
self._getBitcoindBlocks();
|
||||
self._concurrentSync();
|
||||
self._serialSync();
|
||||
};
|
||||
|
||||
DB.prototype._concurrentSync = function() {
|
||||
var self = this;
|
||||
|
||||
async.whilst(function() {
|
||||
return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping;
|
||||
}, function(done) {
|
||||
if(!self.blockBuffer.length) {
|
||||
// wait to get more blocks from bitcoind
|
||||
return setTimeout(done, 1000);
|
||||
}
|
||||
|
||||
// get x blocks off block buffer
|
||||
var blocks = self.blockBuffer.slice(0, 10);
|
||||
self.blockBuffer = self.blockBuffer.slice(10);
|
||||
|
||||
var operations = [];
|
||||
async.each(blocks, function(block, next) {
|
||||
self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) {
|
||||
if(err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
operations = operations.concat(ops);
|
||||
next();
|
||||
});
|
||||
}, function(err) {
|
||||
if(err) {
|
||||
return done(err);
|
||||
}
|
||||
|
||||
// push concurrency height
|
||||
self.concurrentHeight += blocks.length;
|
||||
|
||||
operations.push({
|
||||
|
||||
});
|
||||
|
||||
log.debug('Updating the database with operations', operations);
|
||||
self.store.batch(operations, done);
|
||||
});
|
||||
}, function(err) {
|
||||
if (err) {
|
||||
Error.captureStackTrace(err);
|
||||
return self.node.emit('error', err);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype._serialSync = function() {
|
||||
var self = this;
|
||||
// depends on block and transaction services
|
||||
|
||||
var height = self.tip.__height;
|
||||
async.whilst(function() {
|
||||
return height < self.node.services.bitcoind.height && !self.node.stopping;
|
||||
}, function(done) {
|
||||
if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) {
|
||||
// wait
|
||||
return setTimeout(done, 1000);
|
||||
}
|
||||
|
||||
// get block from block services
|
||||
self.node.services.block.getBlockByHeight(height + 1, function(err, block) {
|
||||
if(err) {
|
||||
return done(err);
|
||||
}
|
||||
|
||||
if(block.prevHash.reverse().toString('hex') !== self.tip.hash) {
|
||||
// TODO need to rewind our serial blocks as well as concurrent blocks!
|
||||
}
|
||||
|
||||
|
||||
// This block appends to the current chain tip and we can
|
||||
// immediately add it to the chain and create indexes.
|
||||
|
||||
// Populate height
|
||||
block.__height = self.tip.__height + 1;
|
||||
|
||||
// Create indexes
|
||||
self.connectBlock(block, function(err) {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
self.tip = block;
|
||||
log.debug('Chain added block to main chain');
|
||||
self.emit('addblock', block);
|
||||
setImmediate(done);
|
||||
});
|
||||
});
|
||||
}, function(err) {
|
||||
if (err) {
|
||||
Error.captureStackTrace(err);
|
||||
return self.node.emit('error', err);
|
||||
}
|
||||
|
||||
if(self.node.stopping) {
|
||||
self.bitcoindSyncing = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (self.node.services.bitcoind.isSynced()) {
|
||||
self.bitcoindSyncing = false;
|
||||
self.node.emit('synced');
|
||||
} else {
|
||||
self.bitcoindSyncing = false;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
DB.prototype._getBitcoindBlocks = function() {
|
||||
var self = this;
|
||||
|
||||
// keep n blocks in block buffer
|
||||
// when we run out get more from bitcoind
|
||||
|
||||
var lastHeight = self.tip.__height;
|
||||
|
||||
async.whilst(function() {
|
||||
return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping;
|
||||
}, function(done) {
|
||||
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length);
|
||||
|
||||
async.timesLimit(blockCount, 5, function(n, next) {
|
||||
self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) {
|
||||
if(err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
self.blockBuffer.push(block);
|
||||
next();
|
||||
});
|
||||
}, done);
|
||||
}, function(err) {
|
||||
if (err) {
|
||||
Error.captureStackTrace(err);
|
||||
return self.node.emit('error', err);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* This function will synchronize additional indexes for the chain based on
|
||||
* the current active chain in the bitcoin daemon. In the event that there is
|
||||
* a reorganization in the daemon, the chain will rewind to the last common
|
||||
* ancestor and then resume syncing.
|
||||
*/
|
||||
DB.prototype.sync = function() {
|
||||
DB.prototype.syncOld = function() {
|
||||
var self = this;
|
||||
|
||||
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user