From f6a35d081029d9d347e086896eb7bf61f0050bc7 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 20 Jan 2016 18:18:27 -0800 Subject: [PATCH] more chain db work. --- lib/bcoin/chain.js | 263 +++++++++++++++++++++++++-------------------- 1 file changed, 148 insertions(+), 115 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 73f66f9a..545fb537 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -721,27 +721,36 @@ var BLOCK_SIZE = 80; // var BLOCK_SIZE = 144; function ChainDB(chain) { + var exists; + + this.chain = chain; this.file = process.env.HOME + '/bcoin-' + network.type + '.blockchain'; + + this._queue = []; + this._cache = {}; + this._bufferPool = {}; + this._nullBlock = new Buffer(BLOCK_SIZE); + this._nullBlock.fill(0); + try { fs.unlinkSync(this.file); } catch (e) { + ; } - var exists = false; + try { fs.accessSync(file); exists = true; } catch (e) { exists = false; } - if (!exists) + + if (!exists) { fs.writeFileSync(this.file, new Buffer(0)); + fs.truncateSync(this.file, 0); + } + this.fd = fs.openSync(this.file, 'r+'); - this.chain = chain; - this._buffer = []; - this._cache = {}; - this._bufferPool = {}; - this._nullBlock = new Buffer(BLOCK_SIZE); - this._nullBlock.fill(0); } ChainDB.prototype.getBuffer = function(size) { @@ -750,66 +759,162 @@ ChainDB.prototype.getBuffer = function(size) { return this._bufferPool[size]; }; -ChainDB.prototype.save = function save(entry) { +ChainDB.prototype.size = function size() { + try { + return fs.statSync(this.file).size; + } catch (e) { + return 0; + } +}; + +ChainDB.prototype.count = function count() { + return this.size() / BLOCK_SIZE | 0; +}; + +ChainDB.prototype.get = function get(height) { + var data; + + if (this._cache[height]) + return this._cache[height]; + + if (this._queue[height]) + return this._queue[height]; + + data = this._read(BLOCK_SIZE, height * BLOCK_SIZE); + + if (!data) + return; + + if (utils.read32(data, 0) === 0) + return; + + return ChainBlock.fromRaw(this.chain, height, data); +}; + +ChainDB.prototype.save = function save(entry, callback) { + var self = this; + var raw, offset; + // Cache the past 2016 blocks in memory this._cache[entry.height] = entry; delete this._cache[entry.height - network.powDiffInterval]; assert(Object.keys(this._cache).length < network.powDiffInterval + 1); - return this._write(new Buffer(entry.toRaw()), entry.height * BLOCK_SIZE); -}; - -ChainDB.prototype._write = function _write(data, offset) { - var size, bytes, index, hash; - var self = this; - - if (offset < 0 || offset == null) - return false; - - hash = offset; - size = data.length; - bytes = 0; - index = 0; // Something is already writing. Cancel it // and synchronously write the data after // it cancels. - if (this._buffer[hash]) { - assert(this._buffer[hash].data.length === data.length); - this._buffer[hash].data = data; - this._buffer[hash].queue.push([data, offset]); + if (this._queue[entry.height]) { + this._queue[entry.height] = entry; return; } // Speed up writes by doing them asynchronously // and keeping the data to be written in memory. - this._buffer[hash] = { queue: [], data: data }; + this._queue[entry.height] = entry; + + // Write asynchronously to the db. + raw = new Buffer(entry.toRaw()); + offset = entry.height * BLOCK_SIZE; + + return this._write(raw, offset, function(err, success) { + var item = self._queue[entry.height]; + + assert(item); + + // Something tried to write here but couldn't. + // Synchronously write it and get it over with. + try { + if (item !== entry) + success = self._writeSync(new Buffer(item.toRaw()), offset); + } catch (e) { + err = e; + } + + delete self._queue[entry.height]; + + if (err) { + if (callback) + return callback(err); + else + throw err; + } + + if (callback) + return callback(null, success); + }); +}; + +ChainDB.prototype.del = function del(height) { + assert(height >= 0); + + this._writeSync(this._nullBlock, height * BLOCK_SIZE); + + // If we deleted several blocks at the end, go back + // to the last non-null block and truncate the file + // beyond that point. + if (height * BLOCK_SIZE + BLOCK_SIZE === this.size()) { + while (this.isNull(height)) + height--; + fs.ftruncateSync(this.fd, height * BLOCK_SIZE + BLOCK_SIZE); + } + + return true; +}; + +ChainDB.prototype.isNull = function isNull(height) { + var data = this._read(1, height * BLOCK_SIZE); + if (!data) + return false; + return utils.read32(data, 0) === 0; +}; + +ChainDB.prototype._read = function _read(size, offset) { + var data = this.getBuffer(size); + var bytes = 0; + var index = 0; + + if (offset < 0 || offset == null) + return; + + try { + while (bytes = fs.readSync(this.fd, data, index, size, offset)) { + index += bytes; + size -= bytes; + offset += bytes; + if (index === data.length) + return data; + } + } catch (e) { + return; + } +}; + +ChainDB.prototype._write = function _write(data, offset, callback) { + var self = this; + var size, bytes, index; + + if (offset < 0 || offset == null) + return false; + + size = data.length; + bytes = 0; + index = 0; (function callee() { fs.write(self.fd, data, index, size, offset, function(err, bytes) { - // Something tried to write here but couldn't. - // Synchronously write it and get it over with. - if (self._buffer[hash].queue.length) { - self._buffer[hash].queue.forEach(function(chunk) { - self._writeSync(chunk[0], chunk[1]); - }); - delete self._buffer[hash]; - return; - } + if (err) + return callback(err); index += bytes; size -= bytes; offset += bytes; - if (index === data.length) { - delete self._buffer[hash]; - return; - } + if (index === data.length) + return callback(null, true); callee(); }); })(); - - return false; }; ChainDB.prototype._writeSync = function _writeSync(data, offset) { @@ -833,78 +938,6 @@ ChainDB.prototype._writeSync = function _writeSync(data, offset) { return false; }; -ChainDB.prototype.del = function del(height) { - this._write(this._nullBlock, height * BLOCK_SIZE); - if (height * BLOCK_SIZE + BLOCK_SIZE === this.size()) { - while (this.isNull(height)) - height--; - fs.ftruncateSync(this.fd, height * BLOCK_SIZE + BLOCK_SIZE); - } - return true; -}; - -ChainDB.prototype.isNull = function isNull(height) { - var data = this._read(1, height * BLOCK_SIZE); - if (!data) - return false; - return utils.read32(data, 0) === 0; -}; - -ChainDB.prototype._read = function _read(size, offset) { - var data = this.getBuffer(size); - var bytes = 0; - var index = 0; - - if (offset < 0 || offset == null) - return; - - if (this._buffer[offset]) { - assert(this._buffer[offset].data.length >= size); - return this._buffer[offset].data.slice(0, size); - } - - try { - while (bytes = fs.readSync(this.fd, data, index, size, offset)) { - index += bytes; - size -= bytes; - offset += bytes; - if (index === data.length) { - if (utils.read32(data, 0) === 0) - return; - return data; - } - } - } catch (e) { - return; - } -}; - -ChainDB.prototype.size = function size() { - try { - return fs.statSync(this.file).size; - } catch (e) { - return 0; - } -}; - -ChainDB.prototype.count = function count() { - return this.size() / BLOCK_SIZE | 0; -}; - -ChainDB.prototype.get = function get(height) { - var data; - - if (this._cache[height]) - return this._cache[height]; - - data = this._read(BLOCK_SIZE, height * BLOCK_SIZE); - - if (!data) - return; - - return ChainBlock.fromRaw(this.chain, height, data); -}; - /** * ChainBlock */