diff --git a/app/models/Address.js b/app/models/Address.js index fd0f8d0f..be918b3c 100644 --- a/app/models/Address.js +++ b/app/models/Address.js @@ -102,54 +102,3 @@ function spec() { } module.defineClass(spec); - -/** - * Addr Schema Idea for moogose. Not used now. - * -var AddressSchema = new Schema({ - - // For now we keep this as short as possible - // More fields will be propably added as we move - // forward with the UX - addr: { - type: String, - index: true, - unique: true, - }, - inputs: [{ - type: mongoose.Schema.Types.ObjectId, - ref: 'TransactionItem' //Edit: I'd put the schema. Silly me. - }], - output: [{ - type: mongoose.Schema.Types.ObjectId, - ref: 'TransactionItem' //Edit: I'd put the schema. Silly me. - }], -}); - - -AddressSchema.statics.load = function(id, cb) { - this.findOne({ - _id: id - }).exec(cb); -}; - - -AddressSchema.statics.fromAddr = function(hash, cb) { - this.findOne({ - hash: hash, - }).exec(cb); -}; - - -AddressSchema.statics.fromAddrWithInfo = function(hash, cb) { - this.fromHash(hash, function(err, addr) { - if (err) return cb(err); - if (!addr) { return cb(new Error('Addr not found')); } -// TODO -// addr.getInfo(function(err) { return cb(err,addr); } ); - }); -}; - -module.exports = mongoose.model('Address', AddressSchema); -*/ - diff --git a/app/models/Block.js b/app/models/Block.js deleted file mode 100644 index 8a01ca57..00000000 --- a/app/models/Block.js +++ /dev/null @@ -1,186 +0,0 @@ -'use strict'; - -/** - * Module dependencies. - */ -var mongoose = require('mongoose'), - Schema = mongoose.Schema, - RpcClient = require('bitcore/RpcClient').class(), - util = require('bitcore/util/util'), - async = require('async'), - BitcoreBlock= require('bitcore/Block').class(), - TransactionOut = require('./TransactionOut'), - config = require('../../config/config') - ; - -/** - * Block Schema - */ -var BlockSchema = new Schema({ - - // For now we keep this as short as possible - // More fields will be propably added as we move - // forward with the UX - _id: { - type: Buffer, - index: true, - unique: true, - required: true, - }, - time: Number, - nextBlockHash: Buffer, - isOrphan: Boolean, -}); - -BlockSchema.virtual('hash').get(function () { - return this._id; -}); - - -BlockSchema.virtual('hash').set(function (hash) { - this._id = hash; -}); - - -BlockSchema.virtual('hashStr').get(function () { - return this._id.toString('hex'); -}); - - -BlockSchema.virtual('hashStr').set(function (hashStr) { - if (hashStr) - this._id = new Buffer(hashStr,'hex'); - else - this._id = null; -}); - - - -BlockSchema.virtual('nextBlockHashStr').get(function () { - return this.nextBlockHash.toString('hex'); -}); - -BlockSchema.virtual('nextBlockHashStr').set(function (hashStr) { - if (hashStr) - this.nextBlockHash = new Buffer(hashStr,'hex'); - else - this.nextBlockHash = null; -}); - -/* -BlockSchema.path('title').validate(function(title) { - return title.length; -},'Title cannot be blank'); -*/ - -/** - * Statics - */ - -BlockSchema.statics.customCreate = function(block, cb) { - var Self= this; - - var BlockSchema = mongoose.model('Block', BlockSchema); - - var newBlock = new Self(); - - newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000); - newBlock.hashStr = block.hash; - newBlock.isOrphan = block.isOrphan; - newBlock.nextBlockHashStr = block.nextBlockHash; - - var insertedTxs, updateAddrs; - - async.series([ - function(a_cb) { - TransactionOut.createFromTxs(block.tx, block.isOrphan, - function(err, inInsertedTxs, inUpdateAddrs) { - insertedTxs = inInsertedTxs; - updateAddrs = inUpdateAddrs; - return a_cb(err); - }); - }, function(a_cb) { - newBlock.save(function(err) { - return a_cb(err); - }); - }], - function (err) { - return cb(err, newBlock, insertedTxs, updateAddrs); - }); -}; - - -BlockSchema.statics.blockIndex = function(height, cb) { - var rpc = new RpcClient(config.bitcoind); - var hashStr = {}; - rpc.getBlockHash(height, function(err, bh){ - if (err) return cb(err); - hashStr.blockHash = bh.result; - cb(null, hashStr); - }); -}; - -BlockSchema.statics.fromHash = function(hashStr, cb) { - var hash = new Buffer(hashStr, 'hex'); - - this.findOne({ - _id: hash, - }).exec(cb); -}; - - -BlockSchema.statics.fromHashWithInfo = function(hashStr, cb) { - var That = this; - - That.fromHash(hashStr, function(err, block) { - if (err) return cb(err); - - if (!block) { - // No in mongo...but maybe in bitcoind... lets query it - block = new That(); - - block.hashStr = hashStr; - block.getInfo(function(err, blockInfo) { - if (err) return cb(err); - if (!blockInfo) return cb(); - - block.save(function(err) { - return cb(err,block); - }); - }); - } - else { - block.getInfo(function(err) { - return cb(err,block); - }); - } - }); -}; - -// TODO: Can we store the rpc instance in the Block object? -BlockSchema.methods.getInfo = function (next) { - - var self = this; - var rpc = new RpcClient(config.bitcoind); - - rpc.getBlock(self.hashStr, function(err, blockInfo) { - // Not found? - if (err && err.code === -5) return next(); - - if (err) return next(err); - - /* - * Not sure this is the right way to do it. - * Any other way to lazy load a property in a mongoose object? - */ - - self.info = blockInfo.result; - self.info.reward = BitcoreBlock.getBlockValue(self.info.height) / util.COIN ; - - return next(null, self.info); - }); -}; - - - -module.exports = mongoose.model('Block', BlockSchema); diff --git a/app/models/Transaction.js b/app/models/Transaction.js index 58a39e82..f3b20dc1 100644 --- a/app/models/Transaction.js +++ b/app/models/Transaction.js @@ -28,8 +28,6 @@ function spec() { }); }; - - Transaction.prototype._fillInfo = function(next) { var self = this; @@ -42,8 +40,6 @@ function spec() { }); }; - - Transaction._fillOutpoints = function(info, cb) { if (!info || info.isCoinBase) return cb(); @@ -51,17 +47,17 @@ function spec() { var valueIn = 0; var incompleteInputs = 0; async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) { - TransactionOut.fromTxIdN(i.txid, i.vout, function(err, out) { + TransactionOut.fromTxIdN(i.txid, i.vout, function(err, addr, valueSat) { - if (err || !out || ! out.addr) { + + if (err || !addr || !valueSat ) { console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid); incompleteInputs = 1; return c_in(); // error not scaled } - - i.addr = out.addr; - i.valueSat = out.value_sat; - i.value = out.value_sat / util.COIN; + i.addr = addr; + i.valueSat = valueSat; + i.value = valueSat / util.COIN; valueIn += i.valueSat; return c_in(); diff --git a/app/models/TransactionOut.js b/app/models/TransactionOut.js deleted file mode 100644 index a0fec41d..00000000 --- a/app/models/TransactionOut.js +++ /dev/null @@ -1,270 +0,0 @@ -'use strict'; - -/** - * Module dependencies. - */ -var mongoose = require('mongoose'), - async = require('async'), - util = require('bitcore/util/util'), - TransactionRpc = require('../../lib/TransactionRpc').class(), - Schema = mongoose.Schema; - -var CONCURRENCY = 15; -// TODO: use bitcore networks module -var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; - -var TransactionOutSchema = new Schema({ - txidBuf: { - type: Buffer, - index: true, - }, - index: Number, - addr: { - type: String, - index: true, - }, - value_sat: Number, - fromOrphan: Boolean, - - spendTxIdBuf: Buffer, - spendIndex: Number, - spendFromOrphan: Boolean, -}); - - -// Compound index - -TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true}); -TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true}); - -TransactionOutSchema.virtual('txid').get(function () { - return this.txidBuf.toString('hex'); -}); - - -TransactionOutSchema.virtual('spendTxid').get(function () { - if (!this.spendTxIdBuf) return (null); - return this.spendTxIdBuf.toString('hex'); -}); - - -TransactionOutSchema.virtual('txid').set(function (txidStr) { - if (txidStr) - this.txidBuf = new Buffer(txidStr,'hex'); - else - this.txidBuf = null; -}); - -TransactionOutSchema.statics.fromTxId = function(txid, cb) { - var txidBuf = new Buffer(txid, 'hex'); - - this.find({ - txidBuf: txidBuf, - }).exec(function (err,items) { - - // sort by index - return cb(err,items.sort(function(a,b){ - return a.index - b.index; - })); - }); -}; - -TransactionOutSchema.statics.fromTxIdOne = function(txid, cb) { - var txidBuf = new Buffer(txid, 'hex'); - - this.find({ - txidBuf: txidBuf, - }).exec(function (err,item) { - return cb(err, item[0]); - }); -}; - - -TransactionOutSchema.statics.fromTxIdN = function(txid, n, cb) { - var txidBuf = new Buffer(txid, 'hex'); - this.findOne({ - txidBuf: txidBuf, index: n - }).exec(cb); -}; - -TransactionOutSchema.statics.removeFromTxId = function(txid, cb) { - var txidBuf = new Buffer(txid, 'hex'); - this.remove({ txidBuf: txidBuf }).exec(cb); -}; - - - -TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, fromOrphan, cb) { - - var Self = this; - var addrs = []; - var is_new = true; - - if (txInfo.hash) { - - // adapt bitcore TX object to bitcoind JSON response - txInfo.txid = txInfo.hash; - - var count = 0; - txInfo.vin = txInfo.in.map(function (txin) { - var i = {}; - - if (txin.coinbase) { - txInfo.isCoinBase = true; - } - else { - i.txid= txin.prev_out.hash; - i.vout= txin.prev_out.n; - }; - i.n = count++; - return i; - }); - - - count = 0; - txInfo.vout = txInfo.out.map(function (txout) { - var o = {}; - - o.value = txout.value; - o.n = count++; - - if (txout.addrStr){ - o.scriptPubKey = {}; - o.scriptPubKey.addresses = [txout.addrStr]; - } - return o; - }); - - } - - var bTxId = new Buffer(txInfo.txid,'hex'); - - - async.series([ - // Input Outpoints (mark them as spended) - function(p_c) { - if (txInfo.isCoinBase) return p_c(); - async.forEachLimit(txInfo.vin, CONCURRENCY, - function(i, next_out) { - var b = new Buffer(i.txid,'hex'); - var data = { - txidBuf: b, - index: i.vout, - - spendTxIdBuf: bTxId, - spendIndex: i.n, - }; - if (fromOrphan) data.spendFromOrphan = true; - Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); - }, - function (err) { - if (err) { - if (!err.message.match(/E11000/)) { - console.log('ERR at TX %s: %s', txInfo.txid, err); - return cb(err); - } - } - return p_c(); - }); - }, - // Parse Outputs - function(p_c) { - async.forEachLimit(txInfo.vout, CONCURRENCY, - function(o, next_out) { - if (o.value && o.scriptPubKey && - o.scriptPubKey.addresses && - o.scriptPubKey.addresses[0] && - ! o.scriptPubKey.addresses[1] // TODO : not supported - ){ - - // This is only to broadcast (WIP) -// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { -// addrs.push(o.scriptPubKey.addresses[0]); -// } - - var data = { - txidBuf: bTxId, - index : o.n, - - value_sat : o.value * util.COIN, - addr : o.scriptPubKey.addresses[0], - }; - if (fromOrphan) data.fromOrphan = true; - Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out); - } - else { - console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n); - return next_out(); - } - }, - function (err) { - if (err) { - if (err.message.match(/E11000/)) { - is_new = false; - } - else { - console.log('ERR at TX %s: %s', txInfo.txid, err); - return cb(err); - } - } - return p_c(); - }); - }], function(err) { - return cb(err, addrs, is_new); - }); -}; - - -// txs can be a [hashes] or [txObjects] -TransactionOutSchema.statics.createFromTxs = function(txs, fromOrphan, next) { - var Self = this; - - if (typeof fromOrphan === 'function') { - next = fromOrphan; - fromOrphan = false; - } - - if (!txs) return next(); - - var inserted_txs = []; - var updated_addrs = {}; - - async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { - - var txInfo; - - async.series([ - function(a_cb) { - if (typeof t !== 'string') { - txInfo = t; - return a_cb(); - } - - // Is it from genesis block? (testnet==livenet) - // TODO: parse it from networks.genesisTX? - if (t === genesisTXID) return a_cb(); - - TransactionRpc.getRpcInfo(t, function(err, inInfo) { - txInfo =inInfo; - return a_cb(err); - }); - }, - function(a_cb) { - if (!txInfo) return a_cb(); - - Self.storeTransactionOuts(txInfo, fromOrphan, function(err, addrs) { - if (err) return a_cb(err); - return a_cb(); - }); - }], - function(err) { - return each_cb(err); - }); - }, - function(err) { - return next(err, inserted_txs, updated_addrs); - }); -}; - - -module.exports = mongoose.model('TransactionOut', TransactionOutSchema); diff --git a/config/config.js b/config/config.js index f28964cb..7ab059b1 100644 --- a/config/config.js +++ b/config/config.js @@ -24,6 +24,7 @@ module.exports = { appName: 'Insight ' + env, port: process.env.PORT || 3000, db: 'mongodb://localhost/insight-' + env, + leveldb: './db', bitcoind: { protocol: process.env.BITCOIND_PROTO || 'http', user: process.env.BITCOIND_USER || 'user', diff --git a/dev-util/block-level.js b/dev-util/block-level.js new file mode 100755 index 00000000..559717f7 --- /dev/null +++ b/dev-util/block-level.js @@ -0,0 +1,25 @@ +#!/usr/bin/env node + +var + config = require('../config/config'), + levelup = require('levelup'); + + +db = levelup(config.leveldb + '/blocks'); + +db.createReadStream({start: 'b-'}) + .on('data', function (data) { +console.log('[block-level.js.11:data:]',data); //TODO + if (data==false) c++; + }) + .on('error', function (err) { + return cb(err); + }) + .on('close', function () { + return cb(null); + }) + .on('end', function () { + return cb(null); + }); + + diff --git a/lib/BlockDb.js b/lib/BlockDb.js new file mode 100644 index 00000000..59ba778b --- /dev/null +++ b/lib/BlockDb.js @@ -0,0 +1,123 @@ +'use strict'; + +require('classtool'); + + +function spec() { + + var TIMESTAMP_ROOT = 'b-ts-'; + var ORPHAN_FLAG_ROOT = 'b-orphan-'; + + + /** + * Module dependencies. + */ + var RpcClient = require('bitcore/RpcClient').class(), + util = require('bitcore/util/util'), + levelup = require('levelup'), + BitcoreBlock= require('bitcore/Block').class(), + TransactionDb = require('.//TransactionDb'), + config = require('../config/config'), + fs = require('fs'); + + var BlockDb = function() { + this.db = levelup(config.leveldb + '/blocks'); + }; + + BlockDb.prototype.drop = function(cb) { + var self = this; + var path = config.leveldb + '/blocks'; + require('leveldown').destroy(path, function () { + fs.mkdirSync(config.leveldb); + fs.mkdirSync(path); + self.db = levelup(path); + return cb(); + }); + }; + + BlockDb.prototype.add = function(b, cb) { + var self = this; + + if (!b.hash) return cb(new Error('no Hash at Block.save')); + + + var time_key = TIMESTAMP_ROOT + + ( b.timestamp || Math.round(new Date().getTime() / 1000) ); + + + self.db.batch() + .put(time_key, b.hash) + .put(ORPHAN_FLAG_ROOT + b.hash, b.isOrphan || 0) + .write(cb); + }; + + BlockDb.prototype.countNotOrphan = function(hash, cb) { + var c = 0; + this.db.createReadStream({start: ORPHAN_FLAG_ROOT}) + .on('data', function (data) { + if (data === false) c++; + }) + .on('error', function (err) { + return cb(err); + }) + .on('close', function () { + return cb(null); + }) + .on('end', function () { + return cb(null); + }); + }; + + BlockDb.prototype.has = function(hash, cb) { + var self = this; + + var k = ORPHAN_FLAG_ROOT + hash; + self.db.get(k, function (err,val) { + + var ret; + + if (err && err.notFound) { + err = null; + ret = false; + } + if (typeof val !== 'undefined') { + ret = true; + } + return cb(err, ret); + }); + }; + + + BlockDb.prototype.fromHashWithInfo = function(hash, cb) { + var rpc = new RpcClient(config.bitcoind); + + rpc.getBlock(hash, function(err, info) { + // Not found? + if (err && err.code === -5) return cb(); + if (err) return cb(err); + + info.reward = BitcoreBlock.getBlockValue(info.height) / util.COIN ; + + return cb(null, { + hash: hash, + info: info.result, + }); + }); + }; + + BlockDb.blockIndex = function(height, cb) { + var rpc = new RpcClient(config.bitcoind); + rpc.getBlockHash(height, function(err, bh){ + if (err) return cb(err); + + cb(null, { blockHash: bh.result }); + }); + }; + + + + return BlockDb; +} +module.defineClass(spec); + + diff --git a/lib/HistoricSync.js b/lib/HistoricSync.js index 9d22e1bf..6bf14d8a 100644 --- a/lib/HistoricSync.js +++ b/lib/HistoricSync.js @@ -9,11 +9,11 @@ function spec() { var RpcClient = require('bitcore/RpcClient').class(); var bitutil = require('bitcore/util/util'); var Address = require('bitcore/Address').class(); + var Deserialize = require('bitcore/Deserialize'); var Script = require('bitcore/Script').class(); var networks = require('bitcore/networks'); var async = require('async'); var config = require('../config/config'); - var Block = require('../app/models/Block'); var Sync = require('./Sync').class(); var sockets = require('../app/controllers/socket.js'); var BlockExtractor = require('./BlockExtractor.js').class(); @@ -36,7 +36,7 @@ function spec() { this.syncPercentage = 0; this.syncedBlocks = 0; this.skippedBlocks = 0; - + this.orphanBlocks = 0; } function p() { @@ -95,11 +95,12 @@ function spec() { syncPercentage: this.syncPercentage, skippedBlocks: this.skippedBlocks, syncedBlocks: this.syncedBlocks, + orphanBlocks: this.orphanBlocks, error: this.error, }; }; - HistoricSync.prototype.showProgress = function(height) { + HistoricSync.prototype.showProgress = function() { var self = this; if (self.error) { @@ -109,11 +110,17 @@ function spec() { self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3); if (self.syncPercentage > 100) self.syncPercentage = 100; - p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks, height)); + p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks)); } if (self.opts.shouldBroadcast) { sockets.broadcastSyncInfo(self.info()); } + +//TODO +if (self.syncPercentage>10) { + console.log(self.info()); + process.exit(1); +} }; HistoricSync.prototype.getPrevNextBlock = function(blockHash, blockEnd, scanOpts, cb) { @@ -128,14 +135,13 @@ function spec() { async.series([ // Already got it? function(c) { - Block.fromHash(blockHash, function(err, block) { + self.sync.hasBlock(blockHash, function(err, ret) { if (err) { p(err); return c(err); } - if (block) { - existed = true; - } + + if (ret) existed = true; return c(); }); }, @@ -181,14 +187,10 @@ function spec() { ], function(err) { if (err) { - self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks); - self.status = 'aborted'; - self.showProgress(); - p(self.err); + self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks)); return cb(err); } else { - self.err = null; self.status = 'syncing'; } @@ -259,30 +261,20 @@ function spec() { return addrStrs; }; +var kk=0; - - HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) { + HistoricSync.prototype.getBlockFromFile = function(scanOpts, cb) { var self = this; var nextHash; var blockInfo; - var isMainChain; var existed; async.series([ - // Is it in mainchain? - function(c) { - self.rpc.getBlockHash(height, function(err, res) { - if (err) return cb(err); - - nextHash = res.result; - return c(); - }); - }, //show some (inacurate) status function(c) { if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) { - self.showProgress(height); + self.showProgress(); } return c(); @@ -294,6 +286,7 @@ function spec() { if (err || ! b) return c(err); blockInfo = b.getStandardizedObject(b.txs, self.network); + blockInfo.curWork = Deserialize.intFromCompact(b.bits); var ti=0; // Get TX Address @@ -320,10 +313,6 @@ function spec() { //store it function(c) { - isMainChain = blockInfo.hash === nextHash; - - blockInfo.isOrphan = !isMainChain; - /* * In file sync, orphan blocks are just ignored. * This is to simplify our schema and the @@ -338,29 +327,25 @@ function spec() { return c(); }); }, - ], function(err) { + function(c) { - if (err) { - self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks); - self.status = 'aborted'; - self.showProgress(); - p(err); - return cb(err); + if (self.prevHash && blockInfo.prev_block !== self.prevHash) { + self.setError('found orphan:' + self.prevHash + ' vs. ' + blockInfo.prev_block); } else { - // Continue if (blockInfo && blockInfo.hash) { + self.prevHash = blockInfo.hash; self.syncedBlocks++; - self.err = null; - self.status = 'syncing'; - - return cb(null, true, isMainChain); - } - else { - self.err = null; + } else self.status = 'finished'; - } + } + + return c(); + }, + ], function(err) { + if (err) { + self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks)); } return cb(err); }); @@ -371,8 +356,6 @@ function spec() { HistoricSync.prototype.importHistory = function(scanOpts, next) { var self = this; - var retry_secs = 2; - var lastBlock; async.series([ @@ -404,10 +387,7 @@ function spec() { function(cb) { if (scanOpts.upToExisting) { // should be isOrphan = true or null to be more accurate. - Block.count({ - isOrphan: null - }, - function(err, count) { + self.sync.countNotOrphan(function(err, count) { if (err) return cb(err); self.syncedBlocks = count || 0; @@ -455,25 +435,19 @@ function spec() { if (scanOpts.fromFiles) { - var keepGoing = true; var height = 0; + self.status = 'syncing'; async.whilst(function() { - return keepGoing; + return self.status === 'syncing'; }, function (w_cb) { - self.getBlockFromFile(height, scanOpts, function(err, inKeepGoing, wasMainChain) { - keepGoing = inKeepGoing; - if (wasMainChain) height++; - //Black magic from http://stackoverflow.com/questions/20936486/node-js-maximum-call-stack-size-exceeded + self.getBlockFromFile(scanOpts, function(err) { setImmediate(function(){ - return w_cb(err); }); - }) + }); }, function(err) { - -console.log('[HistoricSync.js.468]'); //TODO - return next(); + return next(err); }); } else { @@ -489,11 +463,10 @@ console.log('[HistoricSync.js.468]'); //TODO HistoricSync.prototype.smartImport = function(scanOpts, next) { var self = this; - Block.fromHash(self.genesis, function(err, b) { + self.sync.hasBlock(self.genesis, function(err, b) { if (err) return next(err); - if (!b || scanOpts.destroy) { p('Could not find Genesis block. Running FULL SYNC'); if (config.bitcoind.dataDir) { diff --git a/lib/Sync.js b/lib/Sync.js index bb445fad..39b4348b 100644 --- a/lib/Sync.js +++ b/lib/Sync.js @@ -6,13 +6,15 @@ require('classtool'); function spec() { var mongoose = require('mongoose'); var config = require('../config/config'); - var Block = require('../app/models/Block'); - var TransactionOut = require('../app/models/TransactionOut'); var sockets = require('../app/controllers/socket.js'); + var BlockDb = require('./BlockDb').class(); + var TransactionDb = require('./TransactionDb').class(); var async = require('async'); function Sync() { + this.blockDb = new BlockDb(); + this.txDb = new TransactionDb(); } Sync.prototype.init = function(opts, cb) { @@ -60,30 +62,46 @@ function spec() { Sync.prototype.destroy = function(next) { var self = this; async.series([ - function(b) { try {self.db.collections.blocks.drop(b);} catch (e) { return b(); } }, + function(b) { try {self.blockDb.drop(b);} catch (e) { return b(); } }, function(b) { try {self.db.collections.transactionitems.drop(b);} catch (e) { return b(); } }, function(b) { try {self.db.collections.transactionouts.drop(b);} catch (e) { return b(); } }, ], next); }; + + Sync.prototype.hasBlock = function(hash, cb) { + var self = this; + + return self.blockDb.has(hash, cb); + }; + + Sync.prototype.countNotOrphan = function(hash, cb) { + var self = this; + return self.blockDb.countNotOrphan(hash, cb); + }; + + + Sync.prototype.storeBlock = function(block, cb) { var self = this; - Block.customCreate(block, function(err, block, inserted_txs, updated_addrs){ + self.txDb.createFromBlock(block, function(err, insertedTxs, updateAddrs) { if (err) return cb(err); - self._handleBroadcast(block, inserted_txs, updated_addrs); - - return cb(); + self.blockDb.add(block, function(err){ + if (err) return cb(err); + self._handleBroadcast(block, insertedTxs, updateAddrs); + return cb(); + }); }); }; - Sync.prototype._handleBroadcast = function(block, inserted_txs, updated_addrs) { + Sync.prototype._handleBroadcast = function(hash, inserted_txs, updated_addrs) { var self = this; - if (block && self.opts.broadcast_blocks) { - sockets.broadcast_block(block); + if (hash && self.opts.broadcast_blocks) { + sockets.broadcast_block({hash: hash}); } if (inserted_txs && self.opts.broadcast_txs) { @@ -107,7 +125,8 @@ function spec() { Sync.prototype.storeTxs = function(txs, cb) { var self = this; - TransactionOut.createFromTxs(txs, function(err, inserted_txs, updated_addrs) { + // TODO + self.txDb.createFromTxs(txs, function(err, inserted_txs, updated_addrs) { if (err) return cb(err); self._handleBroadcast(null, inserted_txs, updated_addrs); diff --git a/lib/TransactionDb.js b/lib/TransactionDb.js new file mode 100644 index 00000000..4ca43e62 --- /dev/null +++ b/lib/TransactionDb.js @@ -0,0 +1,282 @@ +'use strict'; + +require('classtool'); + + +function spec() { + var ROOT = 'tx-'; //tx-- => [addr, btc_sat] + var OUTS_ROOT = 'txouts-'; //txouts-- => [addr, btc_sat] + var ADDR_ROOT = 'txouts-addr-'; //txouts-addr---- => (+/-) btc_sat + + // TODO: use bitcore networks module + var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'; + var CONCURRENCY = 100; + + /** + * Module dependencies. + */ + var TransactionRpc = require('./TransactionRpc').class(), + util = require('bitcore/util/util'), + levelup = require('levelup'), + async = require('async'), + config = require('../config/config'), + fs = require('fs'); + + var TransactionDb = function() { + this.db = levelup(config.leveldb + '/txs'); + }; + + TransactionDb.prototype.drop = function(cb) { + var self = this; + var path = config.leveldb + '/blocks'; + require('leveldown').destroy(path, function () { + fs.mkdirSync(config.leveldb); + fs.mkdirSync(path); + self.db = levelup(path); + return cb(); + }); + }; + +/* + txidBuf: { + type: Buffer, + index: true, + }, + index: Number, + addr: { + type: String, + index: true, + }, + value_sat: Number, + fromOrphan: Boolean, + + spendTxIdBuf: Buffer, + spendIndex: Number, + spendFromOrphan: Boolean, + */ + +// TransactionDb.prototype.fromTxIdOne = function(txid, cb) { TODO + TransactionDb.prototype.has = function(txid, cb) { + var self = this; + + var k = ROOT + txid; + self.db.get(k, function (err,val) { + + var ret; + + if (err && err.notFound) { + err = null; + ret = false; + } + if (typeof val !== undefined) { + ret = true; + } + return cb(err, ret); + }); + }; + + + TransactionDb.prototype.fromTxIdN = function(txid, n, cb) { + var self = this; + + var k = OUTS_ROOT + txid + '-' + n; + + self.db.get(k, function (err,val) { + if (err && err.notFound) { + err = null; + } + + var a = val.split('-'); + + return cb(err, val, a[0], a[1]); + }); + }; + + TransactionDb.prototype.adaptTxObject = function(txInfo) { + + // adapt bitcore TX object to bitcoind JSON response + txInfo.txid = txInfo.hash; + + var count = 0; + txInfo.vin = txInfo.in.map(function (txin) { + var i = {}; + + if (txin.coinbase) { + txInfo.isCoinBase = true; + } + else { + i.txid= txin.prev_out.hash; + i.vout= txin.prev_out.n; + } + i.n = count++; + return i; + }); + + + count = 0; + txInfo.vout = txInfo.out.map(function (txout) { + var o = {}; + + o.value = txout.value; + o.n = count++; + + if (txout.addrStr){ + o.scriptPubKey = {}; + o.scriptPubKey.addresses = [txout.addrStr]; + } + return o; + }); + }; + + + TransactionDb.prototype.add = function(tx, fromOrphan, cb) { + var self = this; + var addrs = []; + var is_new = true; + + if (tx.hash) self.adaptTxObject(tx); + + //TODO + var ts = 1; + + + //TODO + if (fromOrphan) return cb(); + + async.series([ + // Input Outpoints (mark them as spended) + function(p_c) { + if (tx.isCoinBase) return p_c(); + async.forEachLimit(tx.vin, CONCURRENCY, + function(i, next_out) { + + // TODO +return next_out(); + +/* self.db.batch() + .put() + var data = { + txidBuf: b, + index: i.vout, + + spendTxIdBuf: bTxId, + spendIndex: i.n, + }; + if (fromOrphan) data.spendFromOrphan = true; + Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out); +*/ + }, + function (err) { + if (err) { + if (!err.message.match(/E11000/)) { + console.log('ERR at TX %s: %s', tx.txid, err); + return cb(err); + } + } + return p_c(); + }); + }, + // Parse Outputs + function(p_c) { + async.forEachLimit(tx.vout, CONCURRENCY, + function(o, next_out) { + if (o.value && o.scriptPubKey && + o.scriptPubKey.addresses && + o.scriptPubKey.addresses[0] && + ! o.scriptPubKey.addresses[1] // TODO : not supported + ){ + + // This is only to broadcast (WIP) + // if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) { + // addrs.push(o.scriptPubKey.addresses[0]); + // } + + //if (fromOrphan) data.fromOrphan = true; // TODO + + var addr = o.scriptPubKey.addresses[0]; + var sat = o.value * util.COIN; + self.db.batch() + .put( OUTS_ROOT + tx.txid + o.n, addr + ':' + sat) + .put( ADDR_ROOT + addr + '-' + ts + '-' + tx.txid + + '-' + o.n, sat) + .write(next_out); + + } + else { + console.log ('WARN in TX: %s could not parse OUTPUT %d', tx.txid, o.n); + return next_out(); + } + }, + function (err) { + if (err) { + if (err.message.match(/E11000/)) { + is_new = false; + } + else { + console.log('ERR at TX %s: %s', tx.txid, err); + return cb(err); + } + } + return p_c(); + }); + }], function(err) { + return cb(err, addrs, is_new); + }); + }; + + TransactionDb.prototype.createFromArray = function(txs, fromOrphan, blockHash, next) { + var self = this; + + if (!txs) return next(); + + // TODO + var insertedTxs = []; + var updatedAddrs = {}; + + async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) { + if (typeof t === 'string') { + // Is it from genesis block? (testnet==livenet) + // TODO: parse it from networks.genesisTX? + if (t === genesisTXID) return each_cb(); + + TransactionRpc.getRpcInfo(t, function(err, inInfo) { + if (!inInfo) return each_cb(err); + + self.add(inInfo, fromOrphan, function(err) { + if (err) return each_cb(err); + + self.db.put(ROOT + t, blockHash, function(err) { + return each_cb(err); + }); + }); + }); + } + else { + self.add(t, fromOrphan, function(err) { + if (err) return each_cb(err); + + self.db.put(ROOT + t.txid, blockHash, function(err) { + return each_cb(err); + }); + }); + } + }, + function(err) { + + + return next(err, insertedTxs, updatedAddrs); + }); +}; + + +// txs can be a [hashes] or [txObjects] + TransactionDb.prototype.createFromBlock = function(b, next) { + var self = this; + if (!b.tx) return next(); + + return self.createFromArray(b.tx, b.isOrphan, b.hash, next); + }; + + return TransactionDb; +} +module.defineClass(spec); diff --git a/package.json b/package.json index faf621ff..6860d54e 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,8 @@ }, "dependencies": { "async": "*", + "leveldown": "*", + "levelup": "*", "glob": "*", "classtool": "*", "commander": "*",