diff --git a/contrib/streamtest.js b/contrib/streamtest.js new file mode 100644 index 00000000..937ab5e3 --- /dev/null +++ b/contrib/streamtest.js @@ -0,0 +1,149 @@ +'use strict' +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var async = require('async'); + +function main() { + var blockStream = new BlockStream(); + var processConcurrent = new ProcessConcurrent(); + var processSerial = new ProcessSerial(); + var writeStreamFast = new WriteStreamFast(); + var writeStreamSlow = new WriteStreamSlow(); + + var start = Date.now(); + + writeStreamFast.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + }); + + blockStream + .pipe(processConcurrent) + .pipe(writeStreamSlow); + + blockStream + .pipe(processSerial) + .pipe(writeStreamFast); +} + +function BlockStream() { + Readable.call(this, {objectMode: true, highWaterMark: 10}); + this.height = 0; +} + +inherits(BlockStream, Readable); + +BlockStream.prototype._read = function() { + var self = this; + console.log('_read'); + + setTimeout(function() { + self.height++; + if(self.height > 40) { + self.push(null); + return; + } + + console.log('ReadStream block ', self.height); + console.log(self.push({height: self.height})); + }, 500); +}; + +function ProcessSerial() { + Transform.call(this, {objectMode: true, highWaterMark: 10}); +} + +inherits(ProcessSerial, Transform); + +ProcessSerial.prototype._transform = function(block, enc, callback) { + var operations = [{index1: block.height}, {index2: block.height}]; + setTimeout(function() { + var obj = { + tipHeight: block.height, + operations: operations + }; + + callback(null, obj); + }, 100); +}; + +function ProcessConcurrent() { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.operations = []; + this.lastHeight = 0; +}; + +inherits(ProcessConcurrent, Transform); + +ProcessConcurrent.prototype._transform = function(block, enc, callback) { + var self = this; + + self.lastHeight = block.height; + + setTimeout(function() { + self.operations = self.operations.concat([{index3: block.height}, {index4: block.height}]); + + console.log(self.operations.length); + if(self.operations.length >= 10) { + var obj = { + concurrentTipHeight: self.lastHeight, + operations: self.operations + }; + self.operations = []; + + return callback(null, obj); + } + + callback(); + }, 100); +}; + +ProcessConcurrent.prototype._flush = function(callback) { + if(this.operations.length) { + var obj = { + concurrentTipHeight: this.lastHeight, + operations: this.operations + }; + + this.operations = []; + return callback(null, operations); + } +}; + +function WriteStreamSlow() { + Writable.call(this, {objectMode: true, highWaterMark: 10}); + this.writeTime = 0; +} + +inherits(WriteStreamSlow, Writable); + +WriteStreamSlow.prototype._write = function(operations, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamSlow block ', operations.concurrentTipHeight); + self.writeTime += 2000; + callback(); + }, 2000); +}; + +function WriteStreamFast() { + Writable.call(this, {objectMode: true, highWaterMark: 1}); + this.writeTime = 0; +} + +inherits(WriteStreamFast, Writable); + +WriteStreamFast.prototype._write = function(operations, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamFast block ', operations.tipHeight); + self.writeTime += 1000; + callback(); + }, 1000); +}; + +main(); \ No newline at end of file diff --git a/lib/services/address/index.js b/lib/services/address/index.js index dc5cefb1..818b7f35 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -134,7 +134,7 @@ AddressService.prototype.getPublishEvents = function() { * @param {Boolean} addOutput - If the block is being removed or added to the chain * @param {Function} callback */ -AddressService.prototype.blockHandler = function(block, connectBlock, callback) { +AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { var self = this; var txs = block.transactions; diff --git a/lib/services/db.js b/lib/services/db/index.js similarity index 79% rename from lib/services/db.js rename to lib/services/db/index.js index df163167..077e57ce 100644 --- a/lib/services/db.js +++ b/lib/services/db/index.js @@ -11,11 +11,12 @@ var BufferUtil = bitcore.util.buffer; var Networks = bitcore.Networks; var Block = bitcore.Block; var $ = bitcore.util.preconditions; -var index = require('../'); +var index = require('../../'); var errors = index.errors; var log = index.log; -var Transaction = require('../transaction'); -var Service = require('../service'); +var Transaction = require('../../transaction'); +var Service = require('../../service'); +var Sync = require('./sync'); /** * This service synchronizes a leveldb database with bitcoin block chain by connecting and @@ -65,6 +66,8 @@ function DB(options) { transaction: [], block: [] }; + + this._sync = new Sync(this.node, this); } util.inherits(DB, Service); @@ -161,15 +164,29 @@ DB.prototype.start = function(callback) { this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles, keyEncoding: 'binary', valueEncoding: 'binary'}); this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + this._sync.on('error', function(err) { + log.error(err); + }); + + this._sync.on('reorg', function() { + throw new Error('Reorg detected! Unhandled :(') + }); + + this._sync.on('initialsync', function() { + log.info('Initial sync complete'); + }); + + this.node.on('stopping', function() { + self._sync.stop(); + }); + this.node.once('ready', function() { // start syncing - self.sync(); + self._sync.initialSync(); // Notify that there is a new tip self.node.services.bitcoind.on('tip', function() { - if(!self.node.stopping) { - self.sync(); - } + self._sync.sync(); }); }); @@ -189,7 +206,7 @@ DB.prototype.start = function(callback) { return callback(err); } - setImmediate(callback); + self.loadConcurrentTip(callback); }); }); }; @@ -319,6 +336,55 @@ DB.prototype.loadTip = function(callback) { }); }; +DB.prototype.loadConcurrentTip = function(callback) { + var self = this; + + var options = { + keyEncoding: 'string', + valueEncoding: 'binary' + }; + + self.store.get(self.dbPrefix + 'concurrentTip', options, function(err, tipData) { + if(err && err instanceof levelup.errors.NotFoundError) { + self.concurrentTip = self.genesis; + self.concurrentTip.__height = 0; + return; + } else if(err) { + return callback(err); + } + + var hash = tipData.slice(0, 32).toString('hex'); + var height = tipData.readUInt32BE(32); + + var times = 0; + async.retry({times: 3, interval: self.retryInterval}, function(done) { + self.getBlock(hash, function(err, concurrentTip) { + if(err) { + times++; + log.warn('Bitcoind does not have our concurrentTip (' + hash + '). Bitcoind may have crashed and needs to catch up.'); + if(times < 3) { + log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); + } + return done(err); + } + + done(null, concurrentTip); + }); + }, function(err, concurrentTip) { + if(err) { + log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); + log.warn('Please reindex your database.'); + return callback(err); + } + + concurrentTip.__height = height; + self.concurrentTip = concurrentTip; + + callback(); + }); + }); +}; + /** * Will get a block from bitcoind and give a Bitcore Block * @param {String|Number} hash - A block hash or block height @@ -454,8 +520,30 @@ DB.prototype.getPrevHash = function(blockHash, callback) { * @param {Function} callback */ DB.prototype.connectBlock = function(block, callback) { + var self = this; + log.debug('DB handling new chain block'); - this.runAllBlockHandlers(block, true, callback); + var operations = []; + self.getConcurrentBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = ops; + + self.getSerialBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, true)); + operations.push(self.getConcurrentTipOperation(block, true)); + + self.store.batch(operations, callback); + }); + }); }; /** @@ -464,43 +552,70 @@ DB.prototype.connectBlock = function(block, callback) { * @param {Function} callback */ DB.prototype.disconnectBlock = function(block, callback) { + var self = this; + log.debug('DB removing chain block'); - this.runAllBlockHandlers(block, false, callback); + var operations = []; + self.getConcurrentBlockOperations(block, false, function(err, ops) { + if(err) { + return callback(err); + } + + operations = ops; + + self.getSerialBlockOperations(block, false, function(err, ops) { + if(err) { + return callback(err); + } + + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, false)); + operations.push(self.getConcurrentTipOperation(block, false)); + + self.store.batch(operations, callback); + }); + }); }; -/** - * Will collect all database operations for a block from other services that implement - * `blockHandler` methods and then save operations to the database. - * @param {Block} block - The bitcore block - * @param {Boolean} add - If the block is being added/connected or removed/disconnected - * @param {Function} callback - */ -DB.prototype.runAllBlockHandlers = function(block, add, callback) { +DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { var self = this; var operations = []; - // Notify block subscribers - // for (var i = 0; i < this.subscriptions.block.length; i++) { - // this.subscriptions.block[i].emit('db/block', block.hash); - // } + async.each( + this.node.services, + function(mod, next) { + if(mod.concurrentBlockHandler) { + $.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function'); - // Update tip - var tipData = new Buffer(36); - var heightBuffer = new Buffer(4); + mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } - if(add) { - heightBuffer.writeUInt32BE(block.__height); - tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); - } else { - heightBuffer.writeUInt32BE(block.__height - 1); - tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); - } + next(); + }); + } else { + setImmediate(next); + } + }, + function(err) { + if (err) { + return callback(err); + } - operations.push({ - type: 'put', - key: self.dbPrefix + 'tip', - value: tipData - }); + callback(null, operations); + } + ); +}; + +DB.prototype.getSerialBlockOperations = function(block, add, callback) { + var self = this; + var operations = []; async.eachSeries( this.node.services, @@ -516,6 +631,7 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) { $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); operations = operations.concat(ops); } + next(); }); } else { @@ -527,145 +643,43 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) { return callback(err); } - log.debug('Updating the database with operations', operations); - self.store.batch(operations, callback); + callback(null, operations); } ); }; -/** - * This function will find the common ancestor between the current chain and a forked block, - * by moving backwards on both chains until there is a meeting point. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.findCommonAncestor = function(block, done) { +DB.prototype.getTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; - var self = this; - - var mainPosition = self.tip.hash; - var forkPosition = block.hash; - - var mainHashesMap = {}; - var forkHashesMap = {}; - - mainHashesMap[mainPosition] = true; - forkHashesMap[forkPosition] = true; - - var commonAncestor = null; - - async.whilst( - function() { - return !commonAncestor; - }, - function(next) { - - if(mainPosition) { - var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition); - if(mainBlockIndex && mainBlockIndex.prevHash) { - mainHashesMap[mainBlockIndex.prevHash] = true; - mainPosition = mainBlockIndex.prevHash; - } else { - mainPosition = null; - } - } - - if(forkPosition) { - var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition); - if(forkBlockIndex && forkBlockIndex.prevHash) { - forkHashesMap[forkBlockIndex.prevHash] = true; - forkPosition = forkBlockIndex.prevHash; - } else { - forkPosition = null; - } - } - - if(forkPosition && mainHashesMap[forkPosition]) { - commonAncestor = forkPosition; - } - - if(mainPosition && forkHashesMap[mainPosition]) { - commonAncestor = mainPosition; - } - - if(!mainPosition && !forkPosition) { - return next(new Error('Unknown common ancestor')); - } - - setImmediate(next); - }, - function(err) { - done(err, commonAncestor); - } - ); -}; - -/** - * This function will attempt to rewind the chain to the common ancestor - * between the current chain and a forked block. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.syncRewind = function(block, done) { - - var self = this; - - self.findCommonAncestor(block, function(err, ancestorHash) { - if (err) { - return done(err); - } - log.warn('Reorg common ancestor found:', ancestorHash); - // Rewind the chain to the common ancestor - async.whilst( - function() { - // Wait until the tip equals the ancestor hash - return self.tip.hash !== ancestorHash; - }, - function(removeDone) { - - var tip = self.tip; - - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex'); - - self.getBlock(prevHash, function(err, previousTip) { - if (err) { - removeDone(err); - } - - // Undo the related indexes for this block - self.disconnectBlock(tip, function(err) { - if (err) { - return removeDone(err); - } - - // Set the new tip - previousTip.__height = self.tip.__height - 1; - self.tip = previousTip; - self.emit('removeblock', tip); - removeDone(); - }); - - }); - - }, done - ); - }); -}; - -/** - * This function will synchronize additional indexes for the chain based on - * the current active chain in the bitcoin daemon. In the event that there is - * a reorganization in the daemon, the chain will rewind to the last common - * ancestor and then resume syncing. - */ -DB.prototype.sync = function() { - var self = this; - - if (self.bitcoindSyncing || self.node.stopping || !self.tip) { - return; + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); } + return { + type: 'put', + key: this.dbPrefix + 'tip', + value: tipData + }; +}; + +DB.prototype.getConcurrentTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; + + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); + } + +<<<<<<< HEAD:lib/services/db.js self.bitcoindSyncing = true; var height; @@ -733,10 +747,16 @@ DB.prototype.sync = function() { } else { self.bitcoindSyncing = false; } - - }); - +======= + return { + type: 'put', + key: this.dbPrefix + 'concurrentTip', + value: tipData + }; }; +>>>>>>> feature/concurrency:lib/services/db/index.js + + DB.prototype.getPrefix = function(service, callback) { var self = this; diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js new file mode 100644 index 00000000..c2a0b24d --- /dev/null +++ b/lib/services/db/sync.js @@ -0,0 +1,386 @@ +'use strict' +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var EventEmitter = require('events').EventEmitter; +var async = require('async'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; + +function Sync(node, db) { + this.node = node; + this.db = db; + this.syncing = false; + this.highWaterMark = 100; +} + +inherits(Sync, EventEmitter); + +// Use a concurrent strategy to sync +Sync.prototype.initialSync = function() { + var self = this; + + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); + var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); + var writeStream = new WriteStream(this.highWaterMark, this.db); + var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processConcurrent); + this._handleErrors(processSerial); + this._handleErrors(writeStream); + + processSerial.on('finish', function() { + self.syncing = false; + self.emit('initialsync'); + }); + + this.blockStream + .pipe(processConcurrent) + .pipe(writeStream); + + this.blockStream + .pipe(processSerial); +}; + +// Get concurrent and serial block operations and write them together block by block +// Useful for when we are fully synced and we want to advance the concurrentTip and +// the tip together +Sync.prototype.sync = function() { + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(); + var processBoth = new ProcessBoth(this.db); + var writeStream = new WriteStream(this.db); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processBoth); + this._handleErrors(writeStream); + + writeStream.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + self.emit('synced'); + }); + + this.blockStream + .pipe(processBoth) + .pipe(writeStream1); +}; + +Sync.prototype.stop = function() { + if(this.blockStream) { + this.blockStream.stopping = true; + } +}; + +Sync.prototype._handleErrors = function(stream) { + var self = this; + + stream.on('error', function(err) { + self.syncing = false; + + if(err.reorg) { + return self.emit('reorg'); + } + + self.emit('error', err); + }); +}; + +function BlockStream(highWaterMark, bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.bitcoind = bitcoind; + this.lastHeight = lastHeight; + this.stopping = false; + this.queue = []; + this.processing = false; +} + +inherits(BlockStream, Readable); + +// BlockStream.prototype._read = function() { +// var self = this; + +// // TODO does not work :( + +// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); + +// if(blockCount <= 0 || this.stopping) { +// return self.push(null); +// } + +// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount)); + +// async.times(blockCount, function(n, next) { +// var height = self.lastHeight + n + 1; +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return next(err); +// } + +// block.__height = height; + +// next(null, block); +// }); +// }, function(err, blocks) { +// if(err) { +// return self.emit('error', err); +// } + +// for(var i = 0; i < blocks.length; i++) { +// self.push(blocks[i]); +// } + +// self.lastHeight += blocks.length; +// }); +// }; + +// BlockStream.prototype._read = function() { +// var self = this; + +// self.lastHeight++; +// //console.log('Fetching block ' + self.lastHeight); + +// var height = self.lastHeight; + +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return self.emit(err); +// } + +// block.__height = height; + +// //console.log('pushing block ' + block.__height); +// self.push(block); +// }); +// }; + +BlockStream.prototype._read = function() { + this.lastHeight++; + this.queue.push(this.lastHeight); + + this._process(); +}; + +BlockStream.prototype._process = function() { + var self = this; + + if(this.processing) { + return; + } + + this.processing = true; + + async.whilst( + function() { + return self.queue.length; + }, function(next) { + var heights = self.queue.slice(0, Math.min(5, self.queue.length)); + self.queue = self.queue.slice(heights.length); + + //console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]); + + async.map(heights, function(height, next) { + self.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + block.__height = height; + + setTimeout(function() { + // we're getting blocks from bitcoind too fast? + // it pauses at 8100 + next(null, block); + }, 1); + }); + }, function(err, blocks) { + if(err) { + return next(err); + } + + for(var i = 0; i < blocks.length; i++) { + self.push(blocks[i]); + } + + next(); + }); + }, function(err) { + if(err) { + return self.emit('error', err); + } + + self.processing = false; + } + ); +} + +function ProcessSerial(highWaterMark, db, tip) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.tip = tip; +} + +inherits(ProcessSerial, Writable); + +ProcessSerial.prototype._write = function(block, enc, callback) { + var self = this; + + //console.log('serial', block.__height); + + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); + if(prevHash !== self.tip.hash) { + var err = new Error('Reorg detected'); + err.reorg = true; + return callback(err); + } + + if(check()) { + return self._process(block, callback); + } + + self.db.once('concurrentaddblock', function() { + if(!check()) { + var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height); + return self.emit('error', err); + } + self._process(block, callback); + }); + + function check() { + return self.db.concurrentTip.__height < block.__height; + } +}; + +ProcessSerial.prototype._process = function(block, callback) { + var self = this; + + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + + operations.push(self.db.getTipOperation(block, true)); + + var obj = { + tip: block, + operations: operations + }; + + self.tip = block; + + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } + + self.db.tip = block; + self.db.emit('addblock'); + if(self.db.tip.__height % 100 === 0) { + console.log('Tip:', self.db.tip.__height); + } + + callback(); + }); + }); +}; + +function ProcessConcurrent(highWaterMark, db) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.operations = []; + this.lastBlock = 0; + this.blockCount = 0; +}; + +inherits(ProcessConcurrent, Transform); + +ProcessConcurrent.prototype._transform = function(block, enc, callback) { + var self = this; + + //console.log('concurrent', block.__height); + + this.lastBlock = block; + + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + + self.blockCount++; + self.operations = self.operations.concat(operations); + + if(self.blockCount >= 1) { //self.operations.length >= 100) { + self.operations.push(self.db.getConcurrentTipOperation(block, true)); + var obj = { + concurrentTip: block, + operations: self.operations + } + self.operations = []; + self.blockCount = 0; + + return callback(null, obj); + } + + callback(); + }); +}; + +ProcessConcurrent.prototype._flush = function(callback) { + if(this.operations.length) { + this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true)); + var obj = { + concurrentTipHeight: this.lastBlock, + operations: this.operations + }; + + this.operations = []; + return callback(null, operations); + } +}; + +function WriteStream(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.writeTime = 0; + this.lastConcurrentOutputHeight = 0; +} + +inherits(WriteStream, Writable); + +WriteStream.prototype._write = function(obj, enc, callback) { + var self = this; + + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } + + self.db.concurrentTip = obj.concurrentTip; + self.db.emit('concurrentaddblock'); + if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { + console.log('Concurrent tip:', self.db.concurrentTip.__height); + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; + } + + callback(); + }); +}; + +module.exports = Sync;