From b1232593d89a4fb18408d82975a737d1b1e84c87 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 1 Mar 2016 19:35:15 -0800 Subject: [PATCH] txdb. --- lib/bcoin.js | 1 + lib/bcoin/node.js | 1 + lib/bcoin/tx.js | 18 +- lib/bcoin/txdb.js | 1288 +++++++++++++++++++++++++++++++++++++++++ lib/bcoin/wallet.js | 74 +-- lib/bcoin/walletdb.js | 183 +++++- test/wallet-test.js | 104 +++- 7 files changed, 1592 insertions(+), 77 deletions(-) create mode 100644 lib/bcoin/txdb.js diff --git a/lib/bcoin.js b/lib/bcoin.js index e75e1694..63e415a4 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -69,6 +69,7 @@ bcoin.coin = require('./bcoin/coin'); bcoin.tx = require('./bcoin/tx'); bcoin.mtx = require('./bcoin/mtx'); bcoin.txpool = require('./bcoin/tx-pool'); +bcoin.txdb = require('./bcoin/txdb'); bcoin.abstractblock = require('./bcoin/abstractblock'); bcoin.compactblock = require('./bcoin/compactblock'); bcoin.block = require('./bcoin/block'); diff --git a/lib/bcoin/node.js b/lib/bcoin/node.js index d67c31a8..94beb55f 100644 --- a/lib/bcoin/node.js +++ b/lib/bcoin/node.js @@ -100,6 +100,7 @@ Fullnode.prototype._init = function _init() { // Emit events for any TX we see that's // is relevant to one of our wallets. this.on('tx', function(tx) { + return; self.walletdb.ownTX(tx, function(err, input, output) { if (err) return self.emit('error', err); diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index ad1e9061..d6f6a6b2 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -1145,11 +1145,12 @@ TX.prototype.toExtended = function toExtended(coins) { buf._witnessSize = tx._witnessSize; buf._size = tx._size; + buf._extendedSize = off; return buf; }; -TX._fromExtended = function _fromExtended(buf) { +TX._fromExtended = function _fromExtended(buf, coins) { var tx, coinCount, chunkSize, coin, i; var off = 0; @@ -1161,7 +1162,7 @@ TX._fromExtended = function _fromExtended(buf) { tx.height = utils.readU32(buf, off); off += 4; - tx.block = buf.slice(off, off + 32); + tx.block = buf.slice(off, off + 32).toString('hex'); off += 32; tx.index = utils.readU32(buf, off); off += 4; @@ -1170,12 +1171,15 @@ TX._fromExtended = function _fromExtended(buf) { tx.ps = utils.readU32(buf, off); off += 4; + if (+tx.block === 0) + tx.block = null; + if (tx.height === 0x7fffffff) tx.height = -1; - if (buf.length > off) { + if (coins) { coinCount = utils.readIntv(buf, off); - off = cointCount.off; + off = coinCount.off; coinCount = coinCount.r; for (i = 0; i < coinCount; i++) { chunkSize = utils.readIntv(buf, off); @@ -1189,11 +1193,13 @@ TX._fromExtended = function _fromExtended(buf) { } } + tx._extendedSize = off; + return tx; }; -TX.fromExtended = function fromExtended(buf) { - return new TX(TX._fromExtended(buf)); +TX.fromExtended = function fromExtended(buf, coins) { + return new TX(TX._fromExtended(buf, coins)); }; /** diff --git a/lib/bcoin/txdb.js b/lib/bcoin/txdb.js new file mode 100644 index 00000000..f4b39425 --- /dev/null +++ b/lib/bcoin/txdb.js @@ -0,0 +1,1288 @@ +/** + * txdb.js - persistent transaction pool + * Copyright (c) 2014-2015, Fedor Indutny (MIT License) + * https://github.com/indutny/bcoin + */ + +var bn = require('bn.js'); +var bcoin = require('../bcoin'); +var utils = bcoin.utils; +var assert = bcoin.utils.assert; +var EventEmitter = require('events').EventEmitter; + +/** + * TXPool + */ + +function TXPool(prefix, db) { + var self = this; + + if (!(this instanceof TXPool)) + return new TXPool(wallet, txs); + + EventEmitter.call(this); + + this.db = db; + this.prefix = prefix || 'pool'; +} + +utils.inherits(TXPool, EventEmitter); + +TXPool.prototype.add = function add(tx, callback) { + var self = this; + + if (Array.isArray(tx)) { + return utils.forEachSerial(tx, function(tx, next) { + self.add(tx, next); + }, callback); + } + + this._filter(tx, function(err, result) { + if (err) + return callback(err); + + if (!result) + return callback(null, false); + + return self._add(tx, callback); + }); +}; + +TXPool.prototype._filter = function _filter(tx, callback) { + return callback(null, true); +}; + +TXPool.prototype._hasAddress = function _hasAddress(address, callback) { + var p = this.prefix + '/'; + var self = this; + + callback = utils.ensure(callback); + + if (!address) + return callback(null, false); + + var iter = this.db.db.iterator({ + gte: p + 'a/' + address, + lte: p + 'a/' + address + '~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, ids); + }); + } + + return iter.end(function(err) { + if (err) + return callback(err); + callback(null, true); + }); + }); + })(); +}; + +TXPool.prototype._addOrphan = function _addOrphan(orphans, orphan) { + var tx = orphan.tx.toExtended(true); + var buf; + + buf = new Buffer(tx.length + 4); + utils.copy(tx, buf, 0); + utils.writeU32(buf, orphan.index, tx.length); + + if (!orphans) + return buf; + + return Buffer.concat([orphans, buf]); +}; + +TXPool.prototype._fromOrphans = function _fromOrphans(buf) { + var txs = []; + var tx, index, buf; + + if (!buf) + return txs; + + while (buf.length) { + tx = bcoin.tx.fromExtended(buf, true); + index = utils.readU32(buf, tx._extendedSize); + buf = buf.slice(tx._extendedSize + 4); + txs.push({ tx: tx, index: index }); + } + + return txs; +}; + +TXPool.prototype._add = function add(tx, callback) { + var self = this; + var p = this.prefix + '/'; + var hash = tx.hash('hex'); + var updated = false; + + callback = utils.ensure(callback); + + var batch = this.db.batch(); + + self.getTX(hash, function(err, existing) { + if (err) + return callback(err); + + if (existing) { + // Tricky - update the tx and coin in storage, + // and remove pending flag to mark as confirmed. + if (existing.ts === 0 && tx.ts !== 0) { + batch.put(p + 't/t/' + hash, tx.toExtended()); + batch.del(p + 'p/' + hash); + + tx.inputs.forEach(function(input) { + var type = input.getType(); + var address = input.getAddress(); + var uaddr; + + if (input.isCoinbase()) + return; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + }); + + tx.outputs.forEach(function(output) { + var type = output.getType(); + var address = output.getAddress(); + var uaddr, coinRaw; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + + coinRaw = bcoin.protocol.framer.coin({ + version: tx.version, + height: tx.height, + value: output.value, + script: output.script, + hash: hash, + index: i, + spent: false + }, true); + + batch.put(p + 'u/t/' + hash + '/' + i, coinRaw); + }); + + batch.write(function(err) { + if (err) + return callback(err); + self.emit('confirmed', tx); + self.emit('tx', tx); + return callback(null, true); + }); + } + return callback(null, false); + } + + // Consume unspent money or add orphans + utils.forEachSerial(tx.inputs, function(input, next, i) { + var key = input.prevout.hash + '/' + input.prevout.index; + self.getCoin(input.prevout.hash, input.prevout.index, function(err, coin) { + if (err) + return next(err); + + if (coin) { + // Add TX to inputs and spend money + input.output = coin; + + assert(input.prevout.hash === coin.hash); + assert(input.prevout.index === coin.index); + + // Skip invalid transactions + if (!tx.verify(i)) + return callback(null, false); + + updated = true; + + var type = input.getType(); + var address = input.getAddress(); + + if (type === 'pubkey' || type === 'multisig') + address = null; + + if (input.isCoinbase()) + return next(); + + if (address) { + batch.del( + p + 'u/a/' + address + + '/' + input.prevout.hash + + '/' + input.prevout.index); + } + + batch.del(p + 'u/t/' + input.prevout.hash + '/' + input.prevout.index); + return next(); + } + + // Only add orphans if this input is ours. + self._hasAddress(input.getAddress(), function(err, result) { + if (err) + return callback(err); + + if (!result) + return next(); + + // Add orphan, if no parent transaction is yet known + self.db.get(p + 'o/' + key, function(err, orphans) { + if (err && err.type !== 'NotFoundError') + return callback(err); + + // orphans = self._addOrphan(orphans, { tx: tx, index: i }); + + if (orphans) { + try { + orphans = JSON.parse(orphans.toString('utf8')); + } catch (e) { + return callback(e); + } + } else { + orphans = []; + } + + orphans.push({ + tx: tx.toExtended(true).toString('hex'), + index: i + }); + + orphans = new Buffer(JSON.stringify(orphans), 'utf8'); + + batch.put(p + 'o/' + key, orphans); + + return next(); + }); + }); + }); + }, function(err) { + if (err) + return callback(err); + + // Add unspent outputs or resolve orphans + utils.forEachSerial(tx.outputs, function(output, next, i) { + // Do not add unspents for outputs that aren't ours. + self._hasAddress(output.getAddress(), function(err, result) { + if (err) + return callback(err); + + if (!result) + return next(); + + var coin = bcoin.coin(tx, i); + + var key = hash + '/' + i; + + self.db.get(p + 'o/' + key, function(err, orphans) { + var some; + + if (err && err.type !== 'NotFoundError') + return callback(err); + + if (orphans) { + try { + orphans = JSON.parse(orphans.toString('utf8')).map(function(orphan) { + orphan.tx = bcoin.tx.fromExtended(new Buffer(orphan.tx, 'hex'), true); + return orphan; + }); + } catch (e) { + return next(e); + } + } + + // Add input to orphan + if (orphans) { + some = false; + + utils.forEachSerial(orphans, function(orphan, next, j) { + if (some) + return next(); + + orphan.tx.inputs[orphan.index].output = coin; + + assert(orphan.tx.inputs[orphan.index].prevout.hash === hash); + assert(orphan.tx.inputs[orphan.index].prevout.index === i); + + // Verify that input script is correct, if not - add + // output to unspent and remove orphan from storage + if (orphan.tx.verify(orphan.index)) { + some = true; + return next(); + } + + self.remove(orphan.tx, function(err) { + if (err) + return next(err); + return next(); + }); + }, function(err) { + if (err) + return next(err); + + if (!some) + orphans = null; + + self.db.del(p + 'o/' + key, finish); + }); + } else { + finish(); + } + + function finish(err) { + if (err) + return next(err); + + if (!orphans) { + var type = output.getType(); + var address = output.getAddress(); + if (type === 'pubkey' || type === 'multisig') + address = null; + if (address) + batch.put(p + 'u/a/' + address + '/' + hash + '/' + i, new Buffer([])); + batch.put(p + 'u/t/' + hash + '/' + i, coin.toRaw()); + updated = true; + } + + next(); + } + }); + + return true; + }); + }, function(err) { + if (err) + return callback(err); + + batch.put(p + 't/t/' + hash, tx.toExtended()); + if (tx.ts === 0) + batch.put(p + 'p/' + hash, new Buffer([])); + + tx.getAddresses().forEach(function(address) { + batch.put(p + 't/a/' + address + '/' + hash, new Buffer([])); + if (tx.ts === 0) + batch.put(p + 'p/a/' + address + '/' + hash, new Buffer([])); + }); + + batch.write(function(err) { + if (err) + return callback(err); + + self.emit('tx', tx); + + if (tx.ts !== 0) + self.emit('confirmed', tx); + + return callback(null, true); + }); + }); + }); + }); +}; + +TXPool.prototype._add_ = function _add(tx, callback) { + var self = this; + var p = this.prefix + '/'; + var hash = tx.hash('hex'); + var uniq = {}; + + callback = utils.ensure(callback); + + this.getTX(hash, function(err, existing) { + var batch; + + if (err) + return callback(err); + + batch = self.db.batch(); + + if (existing) { + // Tricky - update the tx and coin in storage, + // and remove pending flag to mark as confirmed. + if (existing.ts === 0 && tx.ts !== 0) { + batch.put(p + 't/t/' + hash, tx.toExtended()); + batch.del(p + 'p/' + hash); + + tx.inputs.forEach(function(input) { + var type = input.getType(); + var address = input.getAddress(); + var uaddr; + + if (input.isCoinbase()) + return; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + }); + + tx.outputs.forEach(function(output) { + var type = output.getType(); + var address = output.getAddress(); + var uaddr, coinRaw; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + + coinRaw = bcoin.protocol.framer.coin({ + version: tx.version, + height: tx.height, + value: output.value, + script: output.script, + hash: hash, + index: i, + spent: false + }, true); + + batch.put(p + 'u/t/' + hash + '/' + i, coinRaw); + }); + + batch.write(function(err) { + if (err) + return callback(err); + self.emit('confirmed', tx); + self.emit('tx', tx); + return callback(null, true); + }); + } + return callback(null, false); + } + + batch.put(p + 't/t/' + hash, tx.toExtended()); + if (tx.ts === 0) + batch.put(p + 'p/' + hash, new Buffer([])); + + tx.inputs.forEach(function(input) { + var type = input.getType(); + var address = input.getAddress(); + var uaddr; + + if (input.isCoinbase()) + return; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) { + batch.put(p + 't/a/' + uaddr + '/' + hash, new Buffer([])); + if (tx.ts === 0) + batch.put(p + 'p/a/' + uaddr + '/' + hash, new Buffer([])); + } + + if (address) { + batch.del( + p + 'u/a/' + address + + '/' + input.prevout.hash + + '/' + input.prevout.index); + } + + batch.del(p + 'u/t/' + input.prevout.hash + '/' + input.prevout.index); + }); + + tx.outputs.forEach(function(output, i) { + var type = output.getType(); + var address = output.getAddress(); + var uaddr, coinRaw; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + coinRaw = bcoin.protocol.framer.coin({ + version: tx.version, + height: tx.height, + value: output.value, + script: output.script, + hash: hash, + index: i, + spent: false + }, true); + + if (uaddr) { + batch.put(p + 't/a/' + uaddr + '/' + hash, new Buffer([])); + if (tx.ts === 0) + batch.put(p + 'p/a/' + uaddr + '/' + hash, new Buffer([])); + } + + if (address) + batch.put(p + 'u/a/' + address + '/' + hash + '/' + i, new Buffer([])); + + batch.put(p + 'u/t/' + hash + '/' + i, coinRaw); + }); + + batch.write(function(err) { + if (err) + return callback(err); + + self.emit('tx', tx); + + if (tx.ts !== 0) + self.emit('confirmed', tx); + + return callback(null, true); + }); + }); +}; + +TXPool.prototype.remove = function remove(hash, callback) { + var self = this; + var p = this.prefix + '/'; + var uniq = {}; + + if (hash.hash) + hash = hash.hash('hex'); + + this.getTX(hash, function(err, tx) { + var batch; + + if (err) + return callback(err); + + if (!tx) + return callback(null, true); + + batch = self.db.batch(); + + assert(tx.hash('hex') === hash); + + batch.del(p + 't/t/' + hash); + if (tx.ts === 0) + batch.del(p + 'p/' + hash); + + self.fillTX(tx, function(err) { + if (err) + return next(err); + + tx.inputs.forEach(function(input) { + var type = input.getType(); + var address = input.getAddress(); + var uaddr, coinRaw; + + if (input.isCoinbase()) + return; + + if (!input.output) + return; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + coinRaw = bcoin.protocol.framer.coin(input.output, true); + + if (uaddr) { + batch.del(p + 't/a/' + uaddr + '/' + hash); + if (tx.ts === 0) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + } + + if (address) { + batch.put(p + 'u/a/' + address + + '/' + input.prevout.hash + + '/' + input.prevout.index, + new Buffer([])); + } + + batch.put(p + 'u/t/' + + input.prevout.hash + + '/' + input.prevout.index, + coinRaw); + }); + + tx.outputs.forEach(function(output, i) { + var type = output.getType(); + var address = output.getAddress(); + var uaddr; + + if (type === 'pubkey' || type === 'multisig') + address = null; + + uaddr = address; + + if (uaddr) { + if (!uniq[uaddr]) + uniq[uaddr] = true; + else + uaddr = null; + } + + if (uaddr) { + batch.del(p + 't/a/' + uaddr + '/' + hash); + if (tx.ts === 0) + batch.del(p + 'p/a/' + uaddr + '/' + hash); + } + + if (address) + batch.del(p + 'u/a/' + address + '/' + hash + '/' + i); + + batch.del(p + 'u/t/' + hash + '/' + i); + }); + + batch.write(function(err) { + if (err) + return callback(err); + + self.emit('remove tx', tx); + return callback(null, true); + }); + }); + }); +}; + +TXPool.prototype.getTXHashes = function getTXHashes(address, callback) { + var p = this.prefix + '/'; + var self = this; + var txs = []; + + callback = utils.ensure(callback); + + if (Array.isArray(address)) { + return utils.forEachSerial(address, function(address, next) { + self.getTXHashes(address, function(err, tx) { + if (err) + return next(err); + + txs = txs.concat(tx); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + txs = utils.uniqs(txs); + + return callback(null, txs); + }); + } + + var iter = this.db.db.iterator({ + gte: p + 't/a/' + address, + lte: p + 't/a/' + address + '~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, txs); + }); + } + + txs.push(key.split('/')[4]); + + next(); + }); + })(); +}; + +TXPool.prototype.getPendingHashes = function getPendingHashes(address, callback) { + var p = this.prefix + '/'; + var self = this; + var txs = []; + + callback = utils.ensure(callback); + + if (Array.isArray(address)) { + return utils.forEachSerial(address, function(address, next) { + assert(address); + self.getPendingHashes(address, function(err, tx) { + if (err) + return next(err); + + txs = txs.concat(tx); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + txs = utils.uniqs(txs); + + return callback(null, txs); + }); + } + + var iter = this.db.db.iterator({ + gte: address ? p + 'p/a/' + address : p + 'p', + lte: address ? p + 'p/a/' + address + '~' : p + 'p~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, txs); + }); + } + + if (address) + txs.push(key.split('/')[4]); + else + txs.push(key.split('/')[2]); + + next(); + }); + })(); +}; + +TXPool.prototype.getCoinIDs = function getCoinIDs(address, callback) { + var p = this.prefix + '/'; + var self = this; + var coins = []; + + callback = utils.ensure(callback); + + if (Array.isArray(address)) { + return utils.forEachSerial(address, function(address, next) { + self.getCoinIDs(address, function(err, coin) { + if (err) + return next(err); + + coins = coins.concat(coin); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + coins = utils.uniqs(coins); + + return callback(null, coins); + }); + } + + var iter = this.db.db.iterator({ + gte: p + 'u/a/' + address, + lte: p + 'u/a/' + address + '~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, coins); + }); + } + + coins.push(key.split('/').slice(4).join('/')); + + next(); + }); + })(); +}; + +TXPool.prototype.getTXByAddress = function getTXByAddress(address, callback) { + var self = this; + var txs = []; + + return this.getTXHashes(address, function(err, hashes) { + if (err) + return callback(err); + + if (!hashes.length) + return callback(null, hashes); + + utils.forEachSerial(hashes, function(hash, next) { + self.getTX(hash, function(err, tx) { + if (err) + return callback(err); + + if (!tx) + return next(); + + txs.push(tx); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + return callback(null, txs); + }); + }); +}; + +TXPool.prototype.getLast = function getLast(address, callback) { + return this.getTXByAddress(address, function(err, txs) { + var lastTs, lastHeight; + + if (err) + return callback(err); + + lastTs = -1; + lastHeight = -1; + + txs.forEach(function(tx) { + if (tx.ts > lastTs) + lastTs = tx.ts; + + if (tx.height > lastHeight) + lastHeight = tx.height; + }); + + if (lastTs === -1) + lastTs = utils.now() - 2 * 7 * 24 * 60 * 60; + + return callback(null, lastTs, lastHeight); + }); +}; + +TXPool.prototype.getPendingByAddress = function getPendingByAddress(address, callback) { + var txs = []; + + return this.getPendingHashes(address, function(err, hashes) { + if (err) + return callback(err); + + if (!hashes.length) + return callback(null, hashes); + + utils.forEachSerial(hashes, function(hash, next) { + self.getTX(hash, function(err, tx) { + if (err) + return callback(err); + + if (!tx) + return next(); + + txs.push(tx); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + return callback(null, txs); + }); + }); +}; + +TXPool.prototype.getCoinByAddress = function getCoinByAddress(address, callback) { + var self = this; + var coins = []; + + return this.getCoinIDs(address, function(err, ids) { + if (err) + return callback(err); + + if (!ids.length) + return callback(null, ids); + + utils.forEachSerial(ids, function(id, next) { + var parts = id.split('/'); + self.getCoin(parts[0], +parts[1], function(err, coin) { + if (err) + return callback(err); + + if (!coin) + return next(); + + coins.push(coin); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + return callback(null, coins); + }); + }); +}; + +TXPool.prototype.fillTX = function fillTX(tx, callback) { + var self = this; + + if (Array.isArray(tx)) { + return utils.forEachSerial(tx, function(tx, next) { + self.fillTX(tx, function(err) { + if (err) + return next(err); + + next(); + }); + }, callback); + } + + callback = utils.asyncify(callback); + + if (tx.isCoinbase()) + return callback(null, tx); + + utils.forEach(tx.inputs, function(input, next) { + if (input.output) + return next(); + + self.getTX(input.prevout.hash, function(err, tx) { + if (err) + return next(err); + + if (tx) + input.output = bcoin.coin(tx, input.prevout.index); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + return callback(null, tx); + }); +}; + +TXPool.prototype.fillCoin = function fillCoin(tx, callback) { + var self = this; + + if (Array.isArray(tx)) { + return utils.forEachSerial(tx, function(tx, next) { + self.fillCoin(tx, function(err) { + if (err) + return next(err); + + next(); + }); + }, callback); + } + + callback = utils.asyncify(callback); + + if (tx.isCoinbase()) + return callback(null, tx); + + utils.forEach(tx.inputs, function(input, next) { + if (input.output) + return next(); + + self.getCoin(input.prevout.hash, input.prevout.index, function(err, coin) { + if (err) + return callback(err); + + if (coin) + input.output = coin; + + next(); + }); + }, function(err) { + if (err) + return callback(err); + return callback(null, tx); + }); +}; + +TXPool.prototype.getTX = function getTX(hash, callback) { + var p = this.prefix + '/'; + var id = p + 't/t/' + hash; + + this.db.get(id, function(err, tx) { + if (err) { + if (err.type === 'NotFoundError') + return callback(); + return callback(err); + } + return callback(null, bcoin.tx.fromExtended(tx)); + }); +}; + +TXPool.prototype.getCoin = function getCoin(hash, index, callback) { + var p = this.prefix + '/'; + var id = p + 'u/t/' + hash + '/' + index; + + this.db.get(id, function(err, coin) { + if (err) { + if (err.type === 'NotFoundError') + return callback(); + return callback(err); + } + + return callback(null, bcoin.coin.fromRaw(coin)); + }); +}; + +TXPool.prototype.getAllByAddress = function getAllByAddress(address, callback) { + return this.getTXByAddress(address, callback); +}; + +TXPool.prototype.getUnspentByAddress = function getUnspentByAddress(address, callback) { + return this.getCoinByAddress(address, callback); +}; + +TXPool.prototype.getPendingByAddress = function getPendingByAddress(address) { + return this.getPendingByAddress(address, callback); +}; + +TXPool.prototype.getBalanceByAddress = function getBalanceByAddress(address, callback) { + return this.getCoinByAddress(address, function(err, coins) { + if (err) + return callback(err); + + coins = coins.reduce(function(acc, coin) { + return acc.iadd(coin.value); + }, new bn(0)); + + return callback(null, coins); + }); +}; + +function WalletPool(db) { + TXPool.call(this, 'w', db); +} + +utils.inherits(WalletPool, TXPool); + +WalletPool.prototype._filter = function _filter(tx, callback) { + return this.testTX(tx, callback); +}; + +WalletPool.prototype.getAll = function getAll(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.getAllByAddress(addresses, callback); + }); +}; + +WalletPool.prototype.getUnspent = function getUnspent(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.getUnspentByAddress(addresses, callback); + }); +}; + +WalletPool.prototype.getPending = function getPending(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.getPendingByAddress(addresses, callback); + }); +}; + +WalletPool.prototype.getBalance = function getBalance(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.getBalanceByAddress(addresses, callback); + }); +}; + +WalletPool.prototype.getAddresses = function getAddresses(id, callback) { + if (typeof id === 'string') + return callback(null, [id]); + + if (Array.isArray(id)) + return callback(null, id); + + if (id.addressMap) + return callback(null, Object.keys(id.addressMap)); + + if (typeof id === 'object') + return callback(null, Object.keys(id)); + + return this.db.get('w/w/' + id, function(err, buf) { + var json; + + if (err) + return callback(err); + + try { + json = JSON.parse(buf.toString('utf8')); + } catch (e) { + return callback(e); + } + + return callback(null, Object.keys(json.addressMap)); + }); +}; + +WalletPool.prototype.getIDs = function _getIDs(address, callback) { + var self = this; + var ids = []; + + var iter = this.db.db.iterator({ + gte: 'w/a/' + address, + lte: 'w/a/' + address + '~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + callback = utils.ensure(callback); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, ids); + }); + } + + ids.push(key.split('/')[3]); + + next(); + }); + })(); +}; + +WalletPool.prototype.testTX = function test(tx, callback) { + var self = this; + + utils.forEachSerial(tx.getAddresses(), function(address, next) { + self.getIDs(address, function(err, ids) { + if (err) + return next(err); + + if (ids.length > 0) + return callback(null, true); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + return callback(null, false); + }); +}; + +/** + * Expose + */ + +module.exports = TXPool; diff --git a/lib/bcoin/wallet.js b/lib/bcoin/wallet.js index a7afbbce..344f9e48 100644 --- a/lib/bcoin/wallet.js +++ b/lib/bcoin/wallet.js @@ -106,6 +106,7 @@ Wallet.prototype._init = function _init() { assert(!this._initialized); this._initialized = true; + this.id = this.getID(); if (Object.keys(this.addressMap).length === 0) { for (i = 0; i < this.receiveDepth - 1; i++) @@ -128,20 +129,7 @@ Wallet.prototype._init = function _init() { assert(!this.receiveAddress.change); assert(this.changeAddress.change); - this.id = this.getID(); - this.tx = new bcoin.txpool(this); - - // Notify owners about new accepted transactions - this.tx.on('update', function(lastTs, lastHeight, tx) { - var b = this.getBalance(); - if (prevBalance && prevBalance.cmp(b) !== 0) - self.emit('balance', b); - if (tx) - self.emit('update', tx); - self.lastTs = Math.max(lastTs, self.lastTs); - self.lastHeight = Math.max(lastHeight, self.lastHeight); - prevBalance = b; - }); + this.tx = this.db.tx; this.tx.on('tx', function(tx) { self.emit('tx', tx); @@ -158,12 +146,6 @@ Wallet.prototype._init = function _init() { self.emit('error', err); }); - if (options.txs) - this.tx.populate(options.txs); - - this.lastTs = this.tx._lastTs; - this.lastHeight = this.tx._lastHeight; - this.save(function(err) { if (err) throw err; @@ -552,8 +534,9 @@ Wallet.prototype.fill = function fill(tx, options) { return true; }; -Wallet.prototype.fillPrevout = function fillPrevout(tx) { +Wallet.prototype.fillPrevout = function fillPrevout(tx, callback) { return tx.fillPrevout(this); + return this.db.fillTX(tx, callback); }; Wallet.prototype.createTX = function createTX(options, outputs) { @@ -797,32 +780,34 @@ Wallet.prototype.sign = function sign(tx, type, index) { }, 0); }; -Wallet.prototype.addTX = function addTX(tx, block) { - return this.tx.add(tx); +Wallet.prototype.addTX = function addTX(tx, callback) { + if (!this.db) + return; + return this.db.addTX(tx, callback); }; -Wallet.prototype.getAll = function getAll(address) { - return this.tx.getAll(address); +Wallet.prototype.getAll = function getAll(callback) { + if (!this.db) + return; + return this.db.getAll(this, callback); }; -Wallet.prototype.getUnspent = function getUnspent(address) { - return this.tx.getUnspent(address); +Wallet.prototype.getUnspent = function getUnspent(callback) { + if (!this.db) + return; + return this.db.getUnspent(this, callback); }; -Wallet.prototype.getPending = function getPending(address) { - return this.tx.getPending(address); +Wallet.prototype.getPending = function getPending(callback) { + if (!this.db) + return; + return this.db.getPending(this, callback); }; -Wallet.prototype.getSent = function getSent(address) { - return this.tx.getSent(address); -}; - -Wallet.prototype.getReceived = function getReceived(address) { - return this.tx.getReceived(address); -}; - -Wallet.prototype.getBalance = function getBalance(address) { - return this.tx.getBalance(address); +Wallet.prototype.getBalance = function getBalance(callback) { + if (!this.db) + return; + return this.db.getBalance(this, callback); }; Wallet.prototype.__defineGetter__('script', function() { @@ -901,10 +886,7 @@ Wallet.prototype.toJSON = function toJSON() { keys: this.keys.map(function(key) { return key.xpubkey; }), - balance: utils.btc(this.getBalance()), - txs: this.options.noPool ? [] : this.tx.getAll().map(function(tx) { - return tx.toCompact(); - }) + txs: [] }; }; @@ -945,7 +927,7 @@ Wallet.fromJSON = function fromJSON(json, passphrase) { Wallet.prototype._saveAddress = function _saveAddress(address, callback) { callback = utils.ensure(callback); - if (!this.options.store || !this.options.db) + if (!this.db) return utils.nextTick(callback); return this.db.saveAddress(this.id, address, callback); @@ -954,10 +936,10 @@ Wallet.prototype._saveAddress = function _saveAddress(address, callback) { Wallet.prototype.save = function save(callback) { callback = utils.ensure(callback); - if (!this.options.store || !this.options.db) + if (!this.db) return utils.nextTick(callback); - return this.db.save(this.id, this, callback); + return this.db.save(this, callback); }; /** diff --git a/lib/bcoin/walletdb.js b/lib/bcoin/walletdb.js index de23686f..e23c01cc 100644 --- a/lib/bcoin/walletdb.js +++ b/lib/bcoin/walletdb.js @@ -20,6 +20,8 @@ var fs = bcoin.fs; */ function WalletDB(node, options) { + var self = this; + if (!(this instanceof WalletDB)) return new WalletDB(node, options); @@ -49,7 +51,47 @@ utils.inherits(WalletDB, EventEmitter); WalletDB._db = {}; +WalletDB.prototype.dump = function dump(callback) { + var self = this; + var records = []; + + var iter = this.db.db.iterator({ + gte: 'w', + lte: 'w~', + keys: true, + values: true, + fillCache: false, + keyAsBuffer: false, + valueAsBuffer: true + }); + + callback = utils.ensure(callback); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, records); + }); + } + + records.push([key, value.slice(0, 200).toString('hex')]); + + next(); + }); + })(); +}; + WalletDB.prototype._init = function _init() { + var self = this; var levelup; if (!WalletDB._db[this.file]) { @@ -73,6 +115,8 @@ WalletDB.prototype._init = function _init() { } this.db = WalletDB._db[this.file]; + + this.tx = new bcoin.txdb('w', this.db); }; WalletDB.prototype.getJSON = function getJSON(id, callback) { @@ -96,7 +140,10 @@ WalletDB.prototype.saveJSON = function saveJSON(id, json, callback) { json = utils.merge({}, json); delete json.store; delete json.db; + var save = bcoin.wallet.prototype.save; + bcoin.wallet.prototype.save = function() {}; json = new bcoin.wallet(json).toJSON(); + bcoin.wallet.prototype.save = save; } } @@ -106,7 +153,7 @@ WalletDB.prototype.saveJSON = function saveJSON(id, json, callback) { if (err) return callback(err); - if (json && self.type === 'leveldb') { + if (json) { batch = self.db.batch(); Object.keys(json.addressMap).forEach(function(address) { batch.put('w/a/' + address + '/' + json.id, new Buffer([])); @@ -138,7 +185,7 @@ WalletDB.prototype.removeJSON = function removeJSON(id, callback) { if (err) return callback(err); - if (json && self.type === 'leveldb') { + if (json) { batch = self.db.batch(); Object.keys(json.addressMap).forEach(function(address) { batch.del('w/a/' + address + '/' + json.id); @@ -264,7 +311,7 @@ WalletDB.prototype.save = function save(options, callback) { return this.saveJSON(options.id, options, callback); }; -WalletDB.prototype.remove = function save(id, callback) { +WalletDB.prototype.remove = function remove(id, callback) { var self = this; callback = utils.ensure(callback); @@ -313,6 +360,7 @@ WalletDB.prototype.removeAddress = function removeAddress(id, address, callback) this.db.del('w/a/' + address + '/' + id, callback); }; +/* WalletDB.prototype._getIDs = function _getIDs(address, callback) { var self = this; var ids = []; @@ -424,6 +472,135 @@ WalletDB.prototype.ownTX = function ownTX(tx, callback) { }); }); }; +*/ + +WalletDB.prototype.addTX = function addTX(tx, callback) { + return this.tx.add(tx, callback); +}; + +WalletDB.prototype.getAll = function getAll(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.tx.getAllByAddress(addresses, callback); + }); +}; + +WalletDB.prototype.getUnspent = function getUnspent(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.tx.getUnspentByAddress(addresses, callback); + }); +}; + +WalletDB.prototype.getPending = function getPending(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.tx.getPendingByAddress(addresses, callback); + }); +}; + +WalletDB.prototype.getBalance = function getBalance(id, callback) { + var self = this; + return this.getAddresses(id, function(err, addresses) { + if (err) + return callback(err); + return self.tx.getBalanceByAddress(addresses, callback); + }); +}; + +WalletDB.prototype.getAddresses = function getAddresses(id, callback) { + if (typeof id === 'string') + return callback(null, [id]); + + if (Array.isArray(id)) + return callback(null, id); + + if (id.addressMap) + return callback(null, Object.keys(id.addressMap)); + + if (typeof id === 'object') + return callback(null, Object.keys(id)); + + return this.db.get('w/w/' + id, function(err, buf) { + var json; + + if (err) + return callback(err); + + try { + json = JSON.parse(buf.toString('utf8')); + } catch (e) { + return callback(e); + } + + return callback(null, Object.keys(json.addressMap)); + }); +}; + +WalletDB.prototype.getIDs = function _getIDs(address, callback) { + var self = this; + var ids = []; + + var iter = this.db.db.iterator({ + gte: 'w/a/' + address, + lte: 'w/a/' + address + '~', + keys: true, + values: false, + fillCache: false, + keyAsBuffer: false + }); + + callback = utils.ensure(callback); + + (function next() { + iter.next(function(err, key, value) { + if (err) { + return iter.end(function() { + callback(err); + }); + } + + if (key === undefined) { + return iter.end(function(err) { + if (err) + return callback(err); + return callback(null, ids); + }); + } + + ids.push(key.split('/')[2]); + + next(); + }); + })(); +}; + +WalletDB.prototype.testTX = function test(tx, callback) { + var self = this; + + return callback(null, true); + utils.forEachSerial(tx.getAddresses(), function(address, next) { + self.getIDs(address, function(err, ids) { + if (err) + return next(err); + + if (ids.length > 0) + return callback(null, true); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + return callback(null, false); + }); +}; /** * Expose diff --git a/test/wallet-test.js b/test/wallet-test.js index 60e86719..db4ff28d 100644 --- a/test/wallet-test.js +++ b/test/wallet-test.js @@ -124,9 +124,11 @@ describe('Wallet', function() { assert(tx.verify()); }); - it('should have TX pool and be serializable', function() { - var w = bcoin.wallet(); - var f = bcoin.wallet(); + it('should have TX pool and be serializable', function(cb) { + bcoin.walletdb().create({ id: 'w' }, function(err, w) { + assert(w); + bcoin.walletdb().create({ id: 'f' }, function(err, f) { + assert(f); // Coinbase var t1 = bcoin.mtx().addOutput(w, 50000).addOutput(w, 1000); @@ -170,27 +172,85 @@ describe('Wallet', function() { fake.hint = 'fake'; // Fake TX should temporarly change output - w.addTX(fake); + // w.addTX(fake); - w.addTX(t4); - assert.equal(w.getBalance().toString(10), '22500'); - w.addTX(t1); - assert.equal(w.getBalance().toString(10), '73000'); - w.addTX(t2); - assert.equal(w.getBalance().toString(10), '47000'); - w.addTX(t3); - assert.equal(w.getBalance().toString(10), '22000'); - w.addTX(f1); - assert.equal(w.getBalance().toString(10), '11000'); - assert(w.getAll().some(function(tx) { - return tx.hash('hex') === f1.hash('hex'); - })); + process.on('uncaughtException', function() { + return; + w.db.dump(function(err, records) { + console.log(records); + process.exit(1); + }); + }); - var w2 = bcoin.wallet.fromJSON(w.toJSON()); - assert.equal(w2.getBalance().toString(10), '11000'); - assert(w2.getAll().some(function(tx) { - return tx.hash('hex') === f1.hash('hex'); - })); + w.addTX(fake, function(err) { + if (err) throw err; + assert(!err); + w.addTX(t4, function(err) { + if (err) throw err; + assert(!err); + if (0) { + w.db.tx.getCoinByAddress(Object.keys(w.addressMap), function(err, coins) { + console.log(coins); + cb(); + }); + return; + } + if (0) { + w.db.dump(function(err, records) { + console.log(records); + //process.exit(1); + return cb(); + }); + return; + } + w.getBalance(function(err, balance) { + assert(!err); + assert.equal(balance.toString(10), '22500'); + // assert.equal(balance.toString(10), '22000'); + w.addTX(t1, function(err) { + w.getBalance(function(err, balance) { + assert(!err); + assert.equal(balance.toString(10), '73000'); + w.addTX(t2, function(err) { + assert(!err); + w.getBalance(function(err, balance) { + assert(!err); + assert.equal(balance.toString(10), '47000'); + w.addTX(t3, function(err) { + assert(!err); + w.getBalance(function(err, balance) { + assert(!err); + assert.equal(balance.toString(10), '22000'); + w.addTX(f1, function(err) { + assert(!err); + w.getBalance(function(err, balance) { + assert(!err); + assert.equal(balance.toString(10), '11000'); + w.getAll(function(err, txs) { + assert(txs.some(function(tx) { + return tx.hash('hex') === f1.hash('hex'); + })); + + var w2 = bcoin.wallet.fromJSON(w.toJSON()); + // assert.equal(w2.getBalance().toString(10), '11000'); + // assert(w2.getAll().some(function(tx) { + // return tx.hash('hex') === f1.hash('hex'); + // })); + cb(); + }); + }); + }); + }); + }); + }); + }); + }); + }); + }); + }); + }); + }); + }); }); it('should fill tx with inputs', function(cb) {