sync with levelDb working!

This commit is contained in:
Matias Alejo Garcia 2014-02-03 23:30:46 -03:00
parent fc509f6e7e
commit 5734890f10
11 changed files with 507 additions and 593 deletions

View File

@ -102,54 +102,3 @@ function spec() {
}
module.defineClass(spec);
/**
* Addr Schema Idea for moogose. Not used now.
*
var AddressSchema = new Schema({
// For now we keep this as short as possible
// More fields will be propably added as we move
// forward with the UX
addr: {
type: String,
index: true,
unique: true,
},
inputs: [{
type: mongoose.Schema.Types.ObjectId,
ref: 'TransactionItem' //Edit: I'd put the schema. Silly me.
}],
output: [{
type: mongoose.Schema.Types.ObjectId,
ref: 'TransactionItem' //Edit: I'd put the schema. Silly me.
}],
});
AddressSchema.statics.load = function(id, cb) {
this.findOne({
_id: id
}).exec(cb);
};
AddressSchema.statics.fromAddr = function(hash, cb) {
this.findOne({
hash: hash,
}).exec(cb);
};
AddressSchema.statics.fromAddrWithInfo = function(hash, cb) {
this.fromHash(hash, function(err, addr) {
if (err) return cb(err);
if (!addr) { return cb(new Error('Addr not found')); }
// TODO
// addr.getInfo(function(err) { return cb(err,addr); } );
});
};
module.exports = mongoose.model('Address', AddressSchema);
*/

View File

@ -1,186 +0,0 @@
'use strict';
/**
* Module dependencies.
*/
var mongoose = require('mongoose'),
Schema = mongoose.Schema,
RpcClient = require('bitcore/RpcClient').class(),
util = require('bitcore/util/util'),
async = require('async'),
BitcoreBlock= require('bitcore/Block').class(),
TransactionOut = require('./TransactionOut'),
config = require('../../config/config')
;
/**
* Block Schema
*/
var BlockSchema = new Schema({
// For now we keep this as short as possible
// More fields will be propably added as we move
// forward with the UX
_id: {
type: Buffer,
index: true,
unique: true,
required: true,
},
time: Number,
nextBlockHash: Buffer,
isOrphan: Boolean,
});
BlockSchema.virtual('hash').get(function () {
return this._id;
});
BlockSchema.virtual('hash').set(function (hash) {
this._id = hash;
});
BlockSchema.virtual('hashStr').get(function () {
return this._id.toString('hex');
});
BlockSchema.virtual('hashStr').set(function (hashStr) {
if (hashStr)
this._id = new Buffer(hashStr,'hex');
else
this._id = null;
});
BlockSchema.virtual('nextBlockHashStr').get(function () {
return this.nextBlockHash.toString('hex');
});
BlockSchema.virtual('nextBlockHashStr').set(function (hashStr) {
if (hashStr)
this.nextBlockHash = new Buffer(hashStr,'hex');
else
this.nextBlockHash = null;
});
/*
BlockSchema.path('title').validate(function(title) {
return title.length;
},'Title cannot be blank');
*/
/**
* Statics
*/
BlockSchema.statics.customCreate = function(block, cb) {
var Self= this;
var BlockSchema = mongoose.model('Block', BlockSchema);
var newBlock = new Self();
newBlock.time = block.time ? block.time : Math.round(new Date().getTime() / 1000);
newBlock.hashStr = block.hash;
newBlock.isOrphan = block.isOrphan;
newBlock.nextBlockHashStr = block.nextBlockHash;
var insertedTxs, updateAddrs;
async.series([
function(a_cb) {
TransactionOut.createFromTxs(block.tx, block.isOrphan,
function(err, inInsertedTxs, inUpdateAddrs) {
insertedTxs = inInsertedTxs;
updateAddrs = inUpdateAddrs;
return a_cb(err);
});
}, function(a_cb) {
newBlock.save(function(err) {
return a_cb(err);
});
}],
function (err) {
return cb(err, newBlock, insertedTxs, updateAddrs);
});
};
BlockSchema.statics.blockIndex = function(height, cb) {
var rpc = new RpcClient(config.bitcoind);
var hashStr = {};
rpc.getBlockHash(height, function(err, bh){
if (err) return cb(err);
hashStr.blockHash = bh.result;
cb(null, hashStr);
});
};
BlockSchema.statics.fromHash = function(hashStr, cb) {
var hash = new Buffer(hashStr, 'hex');
this.findOne({
_id: hash,
}).exec(cb);
};
BlockSchema.statics.fromHashWithInfo = function(hashStr, cb) {
var That = this;
That.fromHash(hashStr, function(err, block) {
if (err) return cb(err);
if (!block) {
// No in mongo...but maybe in bitcoind... lets query it
block = new That();
block.hashStr = hashStr;
block.getInfo(function(err, blockInfo) {
if (err) return cb(err);
if (!blockInfo) return cb();
block.save(function(err) {
return cb(err,block);
});
});
}
else {
block.getInfo(function(err) {
return cb(err,block);
});
}
});
};
// TODO: Can we store the rpc instance in the Block object?
BlockSchema.methods.getInfo = function (next) {
var self = this;
var rpc = new RpcClient(config.bitcoind);
rpc.getBlock(self.hashStr, function(err, blockInfo) {
// Not found?
if (err && err.code === -5) return next();
if (err) return next(err);
/*
* Not sure this is the right way to do it.
* Any other way to lazy load a property in a mongoose object?
*/
self.info = blockInfo.result;
self.info.reward = BitcoreBlock.getBlockValue(self.info.height) / util.COIN ;
return next(null, self.info);
});
};
module.exports = mongoose.model('Block', BlockSchema);

View File

@ -28,8 +28,6 @@ function spec() {
});
};
Transaction.prototype._fillInfo = function(next) {
var self = this;
@ -42,8 +40,6 @@ function spec() {
});
};
Transaction._fillOutpoints = function(info, cb) {
if (!info || info.isCoinBase) return cb();
@ -51,17 +47,17 @@ function spec() {
var valueIn = 0;
var incompleteInputs = 0;
async.eachLimit(info.vin, CONCURRENCY, function(i, c_in) {
TransactionOut.fromTxIdN(i.txid, i.vout, function(err, out) {
TransactionOut.fromTxIdN(i.txid, i.vout, function(err, addr, valueSat) {
if (err || !out || ! out.addr) {
if (err || !addr || !valueSat ) {
console.log('Could not get TXouts in %s,%d from %s ', i.txid, i.vout, info.txid);
incompleteInputs = 1;
return c_in(); // error not scaled
}
i.addr = out.addr;
i.valueSat = out.value_sat;
i.value = out.value_sat / util.COIN;
i.addr = addr;
i.valueSat = valueSat;
i.value = valueSat / util.COIN;
valueIn += i.valueSat;
return c_in();

View File

@ -1,270 +0,0 @@
'use strict';
/**
* Module dependencies.
*/
var mongoose = require('mongoose'),
async = require('async'),
util = require('bitcore/util/util'),
TransactionRpc = require('../../lib/TransactionRpc').class(),
Schema = mongoose.Schema;
var CONCURRENCY = 15;
// TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
var TransactionOutSchema = new Schema({
txidBuf: {
type: Buffer,
index: true,
},
index: Number,
addr: {
type: String,
index: true,
},
value_sat: Number,
fromOrphan: Boolean,
spendTxIdBuf: Buffer,
spendIndex: Number,
spendFromOrphan: Boolean,
});
// Compound index
TransactionOutSchema.index({txidBuf: 1, index: 1}, {unique: true, sparse: true});
TransactionOutSchema.index({spendTxIdBuf: 1, spendIndex: 1}, {unique: true, sparse: true});
TransactionOutSchema.virtual('txid').get(function () {
return this.txidBuf.toString('hex');
});
TransactionOutSchema.virtual('spendTxid').get(function () {
if (!this.spendTxIdBuf) return (null);
return this.spendTxIdBuf.toString('hex');
});
TransactionOutSchema.virtual('txid').set(function (txidStr) {
if (txidStr)
this.txidBuf = new Buffer(txidStr,'hex');
else
this.txidBuf = null;
});
TransactionOutSchema.statics.fromTxId = function(txid, cb) {
var txidBuf = new Buffer(txid, 'hex');
this.find({
txidBuf: txidBuf,
}).exec(function (err,items) {
// sort by index
return cb(err,items.sort(function(a,b){
return a.index - b.index;
}));
});
};
TransactionOutSchema.statics.fromTxIdOne = function(txid, cb) {
var txidBuf = new Buffer(txid, 'hex');
this.find({
txidBuf: txidBuf,
}).exec(function (err,item) {
return cb(err, item[0]);
});
};
TransactionOutSchema.statics.fromTxIdN = function(txid, n, cb) {
var txidBuf = new Buffer(txid, 'hex');
this.findOne({
txidBuf: txidBuf, index: n
}).exec(cb);
};
TransactionOutSchema.statics.removeFromTxId = function(txid, cb) {
var txidBuf = new Buffer(txid, 'hex');
this.remove({ txidBuf: txidBuf }).exec(cb);
};
TransactionOutSchema.statics.storeTransactionOuts = function(txInfo, fromOrphan, cb) {
var Self = this;
var addrs = [];
var is_new = true;
if (txInfo.hash) {
// adapt bitcore TX object to bitcoind JSON response
txInfo.txid = txInfo.hash;
var count = 0;
txInfo.vin = txInfo.in.map(function (txin) {
var i = {};
if (txin.coinbase) {
txInfo.isCoinBase = true;
}
else {
i.txid= txin.prev_out.hash;
i.vout= txin.prev_out.n;
};
i.n = count++;
return i;
});
count = 0;
txInfo.vout = txInfo.out.map(function (txout) {
var o = {};
o.value = txout.value;
o.n = count++;
if (txout.addrStr){
o.scriptPubKey = {};
o.scriptPubKey.addresses = [txout.addrStr];
}
return o;
});
}
var bTxId = new Buffer(txInfo.txid,'hex');
async.series([
// Input Outpoints (mark them as spended)
function(p_c) {
if (txInfo.isCoinBase) return p_c();
async.forEachLimit(txInfo.vin, CONCURRENCY,
function(i, next_out) {
var b = new Buffer(i.txid,'hex');
var data = {
txidBuf: b,
index: i.vout,
spendTxIdBuf: bTxId,
spendIndex: i.n,
};
if (fromOrphan) data.spendFromOrphan = true;
Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out);
},
function (err) {
if (err) {
if (!err.message.match(/E11000/)) {
console.log('ERR at TX %s: %s', txInfo.txid, err);
return cb(err);
}
}
return p_c();
});
},
// Parse Outputs
function(p_c) {
async.forEachLimit(txInfo.vout, CONCURRENCY,
function(o, next_out) {
if (o.value && o.scriptPubKey &&
o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] &&
! o.scriptPubKey.addresses[1] // TODO : not supported
){
// This is only to broadcast (WIP)
// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
// addrs.push(o.scriptPubKey.addresses[0]);
// }
var data = {
txidBuf: bTxId,
index : o.n,
value_sat : o.value * util.COIN,
addr : o.scriptPubKey.addresses[0],
};
if (fromOrphan) data.fromOrphan = true;
Self.update({txidBuf: bTxId, index: o.n}, data, {upsert: true}, next_out);
}
else {
console.log ('WARN in TX: %s could not parse OUTPUT %d', txInfo.txid, o.n);
return next_out();
}
},
function (err) {
if (err) {
if (err.message.match(/E11000/)) {
is_new = false;
}
else {
console.log('ERR at TX %s: %s', txInfo.txid, err);
return cb(err);
}
}
return p_c();
});
}], function(err) {
return cb(err, addrs, is_new);
});
};
// txs can be a [hashes] or [txObjects]
TransactionOutSchema.statics.createFromTxs = function(txs, fromOrphan, next) {
var Self = this;
if (typeof fromOrphan === 'function') {
next = fromOrphan;
fromOrphan = false;
}
if (!txs) return next();
var inserted_txs = [];
var updated_addrs = {};
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
var txInfo;
async.series([
function(a_cb) {
if (typeof t !== 'string') {
txInfo = t;
return a_cb();
}
// Is it from genesis block? (testnet==livenet)
// TODO: parse it from networks.genesisTX?
if (t === genesisTXID) return a_cb();
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
txInfo =inInfo;
return a_cb(err);
});
},
function(a_cb) {
if (!txInfo) return a_cb();
Self.storeTransactionOuts(txInfo, fromOrphan, function(err, addrs) {
if (err) return a_cb(err);
return a_cb();
});
}],
function(err) {
return each_cb(err);
});
},
function(err) {
return next(err, inserted_txs, updated_addrs);
});
};
module.exports = mongoose.model('TransactionOut', TransactionOutSchema);

View File

@ -24,6 +24,7 @@ module.exports = {
appName: 'Insight ' + env,
port: process.env.PORT || 3000,
db: 'mongodb://localhost/insight-' + env,
leveldb: './db',
bitcoind: {
protocol: process.env.BITCOIND_PROTO || 'http',
user: process.env.BITCOIND_USER || 'user',

25
dev-util/block-level.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env node
var
config = require('../config/config'),
levelup = require('levelup');
db = levelup(config.leveldb + '/blocks');
db.createReadStream({start: 'b-'})
.on('data', function (data) {
console.log('[block-level.js.11:data:]',data); //TODO
if (data==false) c++;
})
.on('error', function (err) {
return cb(err);
})
.on('close', function () {
return cb(null);
})
.on('end', function () {
return cb(null);
});

123
lib/BlockDb.js Normal file
View File

@ -0,0 +1,123 @@
'use strict';
require('classtool');
function spec() {
var TIMESTAMP_ROOT = 'b-ts-';
var ORPHAN_FLAG_ROOT = 'b-orphan-';
/**
* Module dependencies.
*/
var RpcClient = require('bitcore/RpcClient').class(),
util = require('bitcore/util/util'),
levelup = require('levelup'),
BitcoreBlock= require('bitcore/Block').class(),
TransactionDb = require('.//TransactionDb'),
config = require('../config/config'),
fs = require('fs');
var BlockDb = function() {
this.db = levelup(config.leveldb + '/blocks');
};
BlockDb.prototype.drop = function(cb) {
var self = this;
var path = config.leveldb + '/blocks';
require('leveldown').destroy(path, function () {
fs.mkdirSync(config.leveldb);
fs.mkdirSync(path);
self.db = levelup(path);
return cb();
});
};
BlockDb.prototype.add = function(b, cb) {
var self = this;
if (!b.hash) return cb(new Error('no Hash at Block.save'));
var time_key = TIMESTAMP_ROOT +
( b.timestamp || Math.round(new Date().getTime() / 1000) );
self.db.batch()
.put(time_key, b.hash)
.put(ORPHAN_FLAG_ROOT + b.hash, b.isOrphan || 0)
.write(cb);
};
BlockDb.prototype.countNotOrphan = function(hash, cb) {
var c = 0;
this.db.createReadStream({start: ORPHAN_FLAG_ROOT})
.on('data', function (data) {
if (data === false) c++;
})
.on('error', function (err) {
return cb(err);
})
.on('close', function () {
return cb(null);
})
.on('end', function () {
return cb(null);
});
};
BlockDb.prototype.has = function(hash, cb) {
var self = this;
var k = ORPHAN_FLAG_ROOT + hash;
self.db.get(k, function (err,val) {
var ret;
if (err && err.notFound) {
err = null;
ret = false;
}
if (typeof val !== 'undefined') {
ret = true;
}
return cb(err, ret);
});
};
BlockDb.prototype.fromHashWithInfo = function(hash, cb) {
var rpc = new RpcClient(config.bitcoind);
rpc.getBlock(hash, function(err, info) {
// Not found?
if (err && err.code === -5) return cb();
if (err) return cb(err);
info.reward = BitcoreBlock.getBlockValue(info.height) / util.COIN ;
return cb(null, {
hash: hash,
info: info.result,
});
});
};
BlockDb.blockIndex = function(height, cb) {
var rpc = new RpcClient(config.bitcoind);
rpc.getBlockHash(height, function(err, bh){
if (err) return cb(err);
cb(null, { blockHash: bh.result });
});
};
return BlockDb;
}
module.defineClass(spec);

View File

@ -9,11 +9,11 @@ function spec() {
var RpcClient = require('bitcore/RpcClient').class();
var bitutil = require('bitcore/util/util');
var Address = require('bitcore/Address').class();
var Deserialize = require('bitcore/Deserialize');
var Script = require('bitcore/Script').class();
var networks = require('bitcore/networks');
var async = require('async');
var config = require('../config/config');
var Block = require('../app/models/Block');
var Sync = require('./Sync').class();
var sockets = require('../app/controllers/socket.js');
var BlockExtractor = require('./BlockExtractor.js').class();
@ -36,7 +36,7 @@ function spec() {
this.syncPercentage = 0;
this.syncedBlocks = 0;
this.skippedBlocks = 0;
this.orphanBlocks = 0;
}
function p() {
@ -95,11 +95,12 @@ function spec() {
syncPercentage: this.syncPercentage,
skippedBlocks: this.skippedBlocks,
syncedBlocks: this.syncedBlocks,
orphanBlocks: this.orphanBlocks,
error: this.error,
};
};
HistoricSync.prototype.showProgress = function(height) {
HistoricSync.prototype.showProgress = function() {
var self = this;
if (self.error) {
@ -109,11 +110,17 @@ function spec() {
self.syncPercentage = parseFloat(100 * (self.syncedBlocks + self.skippedBlocks) / self.blockChainHeight).toFixed(3);
if (self.syncPercentage > 100) self.syncPercentage = 100;
p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks, height));
p(util.format('status: [%d%%] skipped: %d ', self.syncPercentage, self.skippedBlocks));
}
if (self.opts.shouldBroadcast) {
sockets.broadcastSyncInfo(self.info());
}
//TODO
if (self.syncPercentage>10) {
console.log(self.info());
process.exit(1);
}
};
HistoricSync.prototype.getPrevNextBlock = function(blockHash, blockEnd, scanOpts, cb) {
@ -128,14 +135,13 @@ function spec() {
async.series([
// Already got it?
function(c) {
Block.fromHash(blockHash, function(err, block) {
self.sync.hasBlock(blockHash, function(err, ret) {
if (err) {
p(err);
return c(err);
}
if (block) {
existed = true;
}
if (ret) existed = true;
return c();
});
},
@ -181,14 +187,10 @@ function spec() {
], function(err) {
if (err) {
self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks);
self.status = 'aborted';
self.showProgress();
p(self.err);
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockHash, err, self.syncedBlocks));
return cb(err);
}
else {
self.err = null;
self.status = 'syncing';
}
@ -259,30 +261,20 @@ function spec() {
return addrStrs;
};
var kk=0;
HistoricSync.prototype.getBlockFromFile = function(height, scanOpts, cb) {
HistoricSync.prototype.getBlockFromFile = function(scanOpts, cb) {
var self = this;
var nextHash;
var blockInfo;
var isMainChain;
var existed;
async.series([
// Is it in mainchain?
function(c) {
self.rpc.getBlockHash(height, function(err, res) {
if (err) return cb(err);
nextHash = res.result;
return c();
});
},
//show some (inacurate) status
function(c) {
if ( ( self.syncedBlocks + self.skippedBlocks) % self.step === 1) {
self.showProgress(height);
self.showProgress();
}
return c();
@ -294,6 +286,7 @@ function spec() {
if (err || ! b) return c(err);
blockInfo = b.getStandardizedObject(b.txs, self.network);
blockInfo.curWork = Deserialize.intFromCompact(b.bits);
var ti=0;
// Get TX Address
@ -320,10 +313,6 @@ function spec() {
//store it
function(c) {
isMainChain = blockInfo.hash === nextHash;
blockInfo.isOrphan = !isMainChain;
/*
* In file sync, orphan blocks are just ignored.
* This is to simplify our schema and the
@ -338,29 +327,25 @@ function spec() {
return c();
});
},
], function(err) {
function(c) {
if (err) {
self.err = util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks);
self.status = 'aborted';
self.showProgress();
p(err);
return cb(err);
if (self.prevHash && blockInfo.prev_block !== self.prevHash) {
self.setError('found orphan:' + self.prevHash + ' vs. ' + blockInfo.prev_block);
}
else {
// Continue
if (blockInfo && blockInfo.hash) {
self.prevHash = blockInfo.hash;
self.syncedBlocks++;
self.err = null;
self.status = 'syncing';
return cb(null, true, isMainChain);
}
else {
self.err = null;
} else
self.status = 'finished';
}
}
return c();
},
], function(err) {
if (err) {
self.setError(util.format('ERROR: @%s: %s [count: syncedBlocks: %d]', blockInfo.hash, err, self.syncedBlocks));
}
return cb(err);
});
@ -371,8 +356,6 @@ function spec() {
HistoricSync.prototype.importHistory = function(scanOpts, next) {
var self = this;
var retry_secs = 2;
var lastBlock;
async.series([
@ -404,10 +387,7 @@ function spec() {
function(cb) {
if (scanOpts.upToExisting) {
// should be isOrphan = true or null to be more accurate.
Block.count({
isOrphan: null
},
function(err, count) {
self.sync.countNotOrphan(function(err, count) {
if (err) return cb(err);
self.syncedBlocks = count || 0;
@ -455,25 +435,19 @@ function spec() {
if (scanOpts.fromFiles) {
var keepGoing = true;
var height = 0;
self.status = 'syncing';
async.whilst(function() {
return keepGoing;
return self.status === 'syncing';
}, function (w_cb) {
self.getBlockFromFile(height, scanOpts, function(err, inKeepGoing, wasMainChain) {
keepGoing = inKeepGoing;
if (wasMainChain) height++;
//Black magic from http://stackoverflow.com/questions/20936486/node-js-maximum-call-stack-size-exceeded
self.getBlockFromFile(scanOpts, function(err) {
setImmediate(function(){
return w_cb(err);
});
})
});
}, function(err) {
console.log('[HistoricSync.js.468]'); //TODO
return next();
return next(err);
});
}
else {
@ -489,11 +463,10 @@ console.log('[HistoricSync.js.468]'); //TODO
HistoricSync.prototype.smartImport = function(scanOpts, next) {
var self = this;
Block.fromHash(self.genesis, function(err, b) {
self.sync.hasBlock(self.genesis, function(err, b) {
if (err) return next(err);
if (!b || scanOpts.destroy) {
p('Could not find Genesis block. Running FULL SYNC');
if (config.bitcoind.dataDir) {

View File

@ -6,13 +6,15 @@ require('classtool');
function spec() {
var mongoose = require('mongoose');
var config = require('../config/config');
var Block = require('../app/models/Block');
var TransactionOut = require('../app/models/TransactionOut');
var sockets = require('../app/controllers/socket.js');
var BlockDb = require('./BlockDb').class();
var TransactionDb = require('./TransactionDb').class();
var async = require('async');
function Sync() {
this.blockDb = new BlockDb();
this.txDb = new TransactionDb();
}
Sync.prototype.init = function(opts, cb) {
@ -60,30 +62,46 @@ function spec() {
Sync.prototype.destroy = function(next) {
var self = this;
async.series([
function(b) { try {self.db.collections.blocks.drop(b);} catch (e) { return b(); } },
function(b) { try {self.blockDb.drop(b);} catch (e) { return b(); } },
function(b) { try {self.db.collections.transactionitems.drop(b);} catch (e) { return b(); } },
function(b) { try {self.db.collections.transactionouts.drop(b);} catch (e) { return b(); } },
], next);
};
Sync.prototype.hasBlock = function(hash, cb) {
var self = this;
return self.blockDb.has(hash, cb);
};
Sync.prototype.countNotOrphan = function(hash, cb) {
var self = this;
return self.blockDb.countNotOrphan(hash, cb);
};
Sync.prototype.storeBlock = function(block, cb) {
var self = this;
Block.customCreate(block, function(err, block, inserted_txs, updated_addrs){
self.txDb.createFromBlock(block, function(err, insertedTxs, updateAddrs) {
if (err) return cb(err);
self._handleBroadcast(block, inserted_txs, updated_addrs);
return cb();
self.blockDb.add(block, function(err){
if (err) return cb(err);
self._handleBroadcast(block, insertedTxs, updateAddrs);
return cb();
});
});
};
Sync.prototype._handleBroadcast = function(block, inserted_txs, updated_addrs) {
Sync.prototype._handleBroadcast = function(hash, inserted_txs, updated_addrs) {
var self = this;
if (block && self.opts.broadcast_blocks) {
sockets.broadcast_block(block);
if (hash && self.opts.broadcast_blocks) {
sockets.broadcast_block({hash: hash});
}
if (inserted_txs && self.opts.broadcast_txs) {
@ -107,7 +125,8 @@ function spec() {
Sync.prototype.storeTxs = function(txs, cb) {
var self = this;
TransactionOut.createFromTxs(txs, function(err, inserted_txs, updated_addrs) {
// TODO
self.txDb.createFromTxs(txs, function(err, inserted_txs, updated_addrs) {
if (err) return cb(err);
self._handleBroadcast(null, inserted_txs, updated_addrs);

282
lib/TransactionDb.js Normal file
View File

@ -0,0 +1,282 @@
'use strict';
require('classtool');
function spec() {
var ROOT = 'tx-'; //tx-<txid>-<n> => [addr, btc_sat]
var OUTS_ROOT = 'txouts-'; //txouts-<txid>-<n> => [addr, btc_sat]
var ADDR_ROOT = 'txouts-addr-'; //txouts-addr-<addr>-<ts>-<txid>-<n> => (+/-) btc_sat
// TODO: use bitcore networks module
var genesisTXID = '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b';
var CONCURRENCY = 100;
/**
* Module dependencies.
*/
var TransactionRpc = require('./TransactionRpc').class(),
util = require('bitcore/util/util'),
levelup = require('levelup'),
async = require('async'),
config = require('../config/config'),
fs = require('fs');
var TransactionDb = function() {
this.db = levelup(config.leveldb + '/txs');
};
TransactionDb.prototype.drop = function(cb) {
var self = this;
var path = config.leveldb + '/blocks';
require('leveldown').destroy(path, function () {
fs.mkdirSync(config.leveldb);
fs.mkdirSync(path);
self.db = levelup(path);
return cb();
});
};
/*
txidBuf: {
type: Buffer,
index: true,
},
index: Number,
addr: {
type: String,
index: true,
},
value_sat: Number,
fromOrphan: Boolean,
spendTxIdBuf: Buffer,
spendIndex: Number,
spendFromOrphan: Boolean,
*/
// TransactionDb.prototype.fromTxIdOne = function(txid, cb) { TODO
TransactionDb.prototype.has = function(txid, cb) {
var self = this;
var k = ROOT + txid;
self.db.get(k, function (err,val) {
var ret;
if (err && err.notFound) {
err = null;
ret = false;
}
if (typeof val !== undefined) {
ret = true;
}
return cb(err, ret);
});
};
TransactionDb.prototype.fromTxIdN = function(txid, n, cb) {
var self = this;
var k = OUTS_ROOT + txid + '-' + n;
self.db.get(k, function (err,val) {
if (err && err.notFound) {
err = null;
}
var a = val.split('-');
return cb(err, val, a[0], a[1]);
});
};
TransactionDb.prototype.adaptTxObject = function(txInfo) {
// adapt bitcore TX object to bitcoind JSON response
txInfo.txid = txInfo.hash;
var count = 0;
txInfo.vin = txInfo.in.map(function (txin) {
var i = {};
if (txin.coinbase) {
txInfo.isCoinBase = true;
}
else {
i.txid= txin.prev_out.hash;
i.vout= txin.prev_out.n;
}
i.n = count++;
return i;
});
count = 0;
txInfo.vout = txInfo.out.map(function (txout) {
var o = {};
o.value = txout.value;
o.n = count++;
if (txout.addrStr){
o.scriptPubKey = {};
o.scriptPubKey.addresses = [txout.addrStr];
}
return o;
});
};
TransactionDb.prototype.add = function(tx, fromOrphan, cb) {
var self = this;
var addrs = [];
var is_new = true;
if (tx.hash) self.adaptTxObject(tx);
//TODO
var ts = 1;
//TODO
if (fromOrphan) return cb();
async.series([
// Input Outpoints (mark them as spended)
function(p_c) {
if (tx.isCoinBase) return p_c();
async.forEachLimit(tx.vin, CONCURRENCY,
function(i, next_out) {
// TODO
return next_out();
/* self.db.batch()
.put()
var data = {
txidBuf: b,
index: i.vout,
spendTxIdBuf: bTxId,
spendIndex: i.n,
};
if (fromOrphan) data.spendFromOrphan = true;
Self.update({txidBuf: b, index: i.vout}, data, {upsert: true}, next_out);
*/
},
function (err) {
if (err) {
if (!err.message.match(/E11000/)) {
console.log('ERR at TX %s: %s', tx.txid, err);
return cb(err);
}
}
return p_c();
});
},
// Parse Outputs
function(p_c) {
async.forEachLimit(tx.vout, CONCURRENCY,
function(o, next_out) {
if (o.value && o.scriptPubKey &&
o.scriptPubKey.addresses &&
o.scriptPubKey.addresses[0] &&
! o.scriptPubKey.addresses[1] // TODO : not supported
){
// This is only to broadcast (WIP)
// if (addrs.indexOf(o.scriptPubKey.addresses[0]) === -1) {
// addrs.push(o.scriptPubKey.addresses[0]);
// }
//if (fromOrphan) data.fromOrphan = true; // TODO
var addr = o.scriptPubKey.addresses[0];
var sat = o.value * util.COIN;
self.db.batch()
.put( OUTS_ROOT + tx.txid + o.n, addr + ':' + sat)
.put( ADDR_ROOT + addr + '-' + ts + '-' + tx.txid +
'-' + o.n, sat)
.write(next_out);
}
else {
console.log ('WARN in TX: %s could not parse OUTPUT %d', tx.txid, o.n);
return next_out();
}
},
function (err) {
if (err) {
if (err.message.match(/E11000/)) {
is_new = false;
}
else {
console.log('ERR at TX %s: %s', tx.txid, err);
return cb(err);
}
}
return p_c();
});
}], function(err) {
return cb(err, addrs, is_new);
});
};
TransactionDb.prototype.createFromArray = function(txs, fromOrphan, blockHash, next) {
var self = this;
if (!txs) return next();
// TODO
var insertedTxs = [];
var updatedAddrs = {};
async.forEachLimit(txs, CONCURRENCY, function(t, each_cb) {
if (typeof t === 'string') {
// Is it from genesis block? (testnet==livenet)
// TODO: parse it from networks.genesisTX?
if (t === genesisTXID) return each_cb();
TransactionRpc.getRpcInfo(t, function(err, inInfo) {
if (!inInfo) return each_cb(err);
self.add(inInfo, fromOrphan, function(err) {
if (err) return each_cb(err);
self.db.put(ROOT + t, blockHash, function(err) {
return each_cb(err);
});
});
});
}
else {
self.add(t, fromOrphan, function(err) {
if (err) return each_cb(err);
self.db.put(ROOT + t.txid, blockHash, function(err) {
return each_cb(err);
});
});
}
},
function(err) {
return next(err, insertedTxs, updatedAddrs);
});
};
// txs can be a [hashes] or [txObjects]
TransactionDb.prototype.createFromBlock = function(b, next) {
var self = this;
if (!b.tx) return next();
return self.createFromArray(b.tx, b.isOrphan, b.hash, next);
};
return TransactionDb;
}
module.defineClass(spec);

View File

@ -52,6 +52,8 @@
},
"dependencies": {
"async": "*",
"leveldown": "*",
"levelup": "*",
"glob": "*",
"classtool": "*",
"commander": "*",