p2p block and transaction importing

This commit is contained in:
Manuel Araoz 2014-01-10 16:02:33 -03:00
parent ec0c78d914
commit df397936ea
14 changed files with 369 additions and 440 deletions

View File

@ -34,7 +34,7 @@
"inject", "inject",
"expect" "expect"
], ],
"indent": 2, // Specify indentation spacing "indent": false, // Specify indentation spacing
"devel": true, // Allow development statements e.g. `console.log();`. "devel": true, // Allow development statements e.g. `console.log();`.
"noempty": true // Prohibit use of empty blocks. "noempty": true // Prohibit use of empty blocks.
} }

View File

@ -13,10 +13,10 @@ var mongoose = require('mongoose'),
* Find block by hash ... * Find block by hash ...
*/ */
exports.block = function(req, res, next, hash) { exports.block = function(req, res, next, hash) {
Block.fromHash(hash, function(err, block) { Block.fromHashWithInfo(hash, function(err, block) {
if (err) return next(err); if (err) return next(err);
if (!block) return next(new Error('Failed to load block ' + hash)); if (!block) return next(new Error('Failed to load block ' + hash));
req.block = block; req.block = block.info;
next(); next();
}); });
}; };

View File

@ -15,10 +15,10 @@ var Transaction = require('../models/Transaction');
* Find block by hash ... * Find block by hash ...
*/ */
exports.transaction = function(req, res, next, txid) { exports.transaction = function(req, res, next, txid) {
Transaction.fromID(txid, function(err, tx) { Transaction.fromIdWithInfo(txid, function(err, tx) {
if (err) return next(err); if (err) return next(err);
if (!tx) return next(new Error('Failed to load TX ' + txid)); if (!tx) return next(new Error('Failed to load TX ' + txid));
req.transaction = tx; req.transaction = tx.info;
next(); next();
}); });
}; };

View File

@ -5,7 +5,6 @@
*/ */
var mongoose = require('mongoose'), var mongoose = require('mongoose'),
Schema = mongoose.Schema, Schema = mongoose.Schema,
async = require('async'),
RpcClient = require('bitcore/RpcClient').class(), RpcClient = require('bitcore/RpcClient').class(),
config = require('../../config/config') config = require('../../config/config')
; ;
@ -23,6 +22,7 @@ var BlockSchema = new Schema({
index: true, index: true,
unique: true, unique: true,
}, },
time: Number,
}); });

View File

@ -5,7 +5,10 @@
*/ */
var mongoose = require('mongoose'), var mongoose = require('mongoose'),
Schema = mongoose.Schema, Schema = mongoose.Schema,
async = require('async'); async = require('async'),
RpcClient = require('bitcore/RpcClient').class(),
config = require('../../config/config');
/** /**
*/ */
@ -31,14 +34,14 @@ TransactionSchema.statics.load = function(id, cb) {
}; };
TransactionSchema.statics.fromID = function(txid, cb) { TransactionSchema.statics.fromId = function(txid, cb) {
this.findOne({ this.findOne({
txid: txid, txid: txid,
}).exec(cb); }).exec(cb);
}; };
TransactionSchema.statics.fromIDWithInfo = function(txid, cb) { TransactionSchema.statics.fromIdWithInfo = function(txid, cb) {
this.fromHash(hash, function(err, tx) { this.fromId(txid, function(err, tx) {
if (err) return cb(err); if (err) return cb(err);
tx.getInfo(function(err) { return cb(err,tx); } ); tx.getInfo(function(err) { return cb(err,tx); } );
@ -46,31 +49,20 @@ TransactionSchema.statics.fromIDWithInfo = function(txid, cb) {
}; };
TransactionSchema.statics.createFromArray = function(txs, next) { TransactionSchema.statics.createFromArray = function(txs, next) {
var that = this; var that = this;
if (!txs) return next(); if (!txs) return next();
// console.log('exploding ', txs);
async.forEach( txs, async.forEach( txs,
function(tx, callback) { function(tx, callback) {
// console.log('procesing TX %s', tx);
that.create({ txid: tx }, function(err) { that.create({ txid: tx }, function(err) {
if (err && ! err.toString().match(/E11000/)) { if (err && ! err.toString().match(/E11000/)) {
return callback();
}
if (err) {
return callback(err); return callback(err);
} }
return callback(); return callback();
}); });
}, },
function(err) { function(err) {
if (err) return next(err); return next(err);
return next();
} }
); );
}; };
@ -82,7 +74,7 @@ TransactionSchema.methods.getInfo = function (next) {
var that = this; var that = this;
var rpc = new RpcClient(config.bitcoind); var rpc = new RpcClient(config.bitcoind);
rpc.getRawTransaction(this.txid, function(err, txInfo) { rpc.getRawTransaction(this.txid, 1, function(err, txInfo) {
if (err) return next(err); if (err) return next(err);
that.info = txInfo.result; that.info = txInfo.result;

View File

@ -10,6 +10,6 @@ module.exports = {
pass: 'real_mystery', pass: 'real_mystery',
protocol: 'http', protocol: 'http',
host: process.env.BITCOIND_HOST || '127.0.0.1', host: process.env.BITCOIND_HOST || '127.0.0.1',
port: process.env.BITCOIND_PORT || '8332', port: process.env.BITCOIND_PORT || '18332',
} }
} }

View File

@ -1,8 +1,6 @@
rpcuser=mystery rpcuser=mystery
rpcpassword=real_mystery rpcpassword=real_mystery
server=1 server=1
rpcport=8332
testnet=3
txindex=1 txindex=1
# Allow connections outsite localhost? # Allow connections outsite localhost?

View File

@ -1,11 +1,12 @@
require('classtool'); 'use strict';
require('classtool');
/* We dont sync any contents from TXs, only their IDs are stored */ /* We dont sync any contents from TXs, only their IDs are stored */
var isSyncTxEnabled = 0; var isSyncTxEnabled = 0;
function spec(b) { function spec() {
var mongoose = require('mongoose'); var mongoose = require('mongoose');
var util = require('util'); var util = require('util');
@ -18,74 +19,78 @@ function spec(b) {
var Transaction = require('../app/models/Transaction'); var Transaction = require('../app/models/Transaction');
function Sync(config) { function Sync(config) {
this.network = config.networkName == 'testnet' ? networks.testnet : networks.livenet; this.network = config.networkName === 'testnet' ? networks.testnet: networks.livenet;
} }
var progress_bar = function(string, current, total) { var progress_bar = function(string, current, total) {
console.log( util.format("\t%s %d/%d [%d%%]", console.log(util.format('\t%s %d/%d [%d%%]', string, current, total, parseInt(100 * current / total)));
string, current, total, parseInt(100 * current/total)) };
);
}
Sync.prototype.getNextBlock = function(blockHash, cb) { Sync.prototype.getNextBlock = function(blockHash, cb) {
var that = this; var that = this;
if (!blockHash) { if (!blockHash) {
return cb(); return cb();
} }
this.rpc.getBlock(blockHash, function(err, blockInfo) { this.rpc.getBlock(blockHash, function(err, blockInfo) {
if (err) return cb(err); if (err) return cb(err);
if (blockInfo.result.height % 1000 === 0) {
if ( ! ( blockInfo.result.height % 1000) ) {
var h = blockInfo.result.height, var h = blockInfo.result.height,
d = blockInfo.result.confirmations; d = blockInfo.result.confirmations;
progress_bar('height', h, h + d); progress_bar('height', h, h + d);
} }
Block.create( blockInfo.result, function(err, inBlock) { that.storeBlock(blockInfo.result, function(err) {
if (!err) {
var txs = blockInfo.result.tx;
that.storeTxs(txs, function(err) {
if (!err) {
return that.getNextBlock(blockInfo.result.nextblockhash, cb);
}
});
}
});
});
};
Sync.prototype.storeBlock = function(block, cb) {
Block.create(block, function(err, inBlock) {
// E11000 => already exists // E11000 => already exists
if (err && ! err.toString().match(/E11000/)) { if (err && ! err.toString().match(/E11000/)) {
return cb(err); return cb(err);
} }
cb();
});
};
if (inBlock) { Sync.prototype.storeTxs = function(txs, cb) {
Transaction.createFromArray( blockInfo.result.tx,function (err) { Transaction.createFromArray(txs, cb);
return that.getNextBlock(blockInfo.result.nextblockhash, cb); };
});
}
else
return that.getNextBlock(blockInfo.result.nextblockhash, cb);
});
});
}
Sync.prototype.syncBlocks = function(reindex, cb) { Sync.prototype.syncBlocks = function(reindex, cb) {
var that = this; var that = this;
var genesisHash = this.network.genesisBlock.hash.reverse().toString('hex'); var genesisHash = this.network.genesisBlock.hash.reverse().toString('hex');
console.log("Syncing Blocks..."); console.log('Syncing Blocks... '+reindex);
if (reindex) if (reindex) {
return this.getNextBlock(genesisHash, cb); return this.getNextBlock(genesisHash, cb);
}
Block.findOne({},
Block.findOne({}, {}, { sort: { 'confirmations' : 1 } }, function(err, block) { {},
{
sort: {
'time': - 1
}
},
function(err, block) {
if (err) return cb(err); if (err) return cb(err);
var nextHash = var nextHash = block && block.hash ? block.hash: genesisHash;
block && block.hash
? block.hash
: genesisHash
;
console.log('\tStarting at hash: ' + nextHash); console.log('\tStarting at hash: ' + nextHash);
return that.getNextBlock(nextHash, cb); return that.getNextBlock(nextHash, cb);
}); });
} };
// This is not currently used. Transactions are represented by txid only // This is not currently used. Transactions are represented by txid only
// in mongodb // in mongodb
@ -93,45 +98,45 @@ function spec(b) {
var that = this; var that = this;
console.log("Syncing TXs..."); console.log('Syncing TXs...');
if (reindex) { if (reindex) {
// TODO? // TODO?
} }
Transaction.find({
Transaction.find({blockhash: null}, function(err, txs) { blockhash: null
},
function(err, txs) {
if (err) return cb(err); if (err) return cb(err);
var read = 0; var read = 0;
var pull = 0; var pull = 0;
var write = 0; var write = 0;
var total = txs.length; var total = txs.length;
console.log("\tneed to pull %d txs", total); console.log('\tneed to pull %d txs', total);
if (!total) return cb(); if (!total) return cb();
async.each(txs, async.each(txs, function(tx, next) {
function(tx, next){
if (!tx.txid) { if (!tx.txid) {
console.log("NO TXID skipping...", tx); console.log('NO TXID skipping...', tx);
return next(); return next();
} }
if ( ! ( read++ % 1000) ) if (read++ % 1000 === 0) progress_bar('read', read, total);
progress_bar('read', read, total);
that.rpc.getRawTransaction(tx.txid, 1, function(err, txInfo) { that.rpc.getRawTransaction(tx.txid, 1, function(err, txInfo) {
if ( ! ( pull++ % 1000) ) if (pull++ % 1000 === 0) progress_bar('\tpull', pull, total);
progress_bar('\tpull', pull, total);
if (!err && txInfo) { if (!err && txInfo) {
Transaction.update({txid: tx.txid}, txInfo.result, function(err) { Transaction.update({
txid: tx.txid
},
txInfo.result, function(err) {
if (err) return next(err); if (err) return next(err);
if ( ! ( write++ % 1000) ) if (write++ % 1000 === 0) progress_bar('\t\twrite', write, total);
progress_bar('\t\twrite', write, total);
return next(); return next();
}); });
@ -142,85 +147,67 @@ function spec(b) {
function(err) { function(err) {
if (err) return cb(err); if (err) return cb(err);
return cb(err); return cb(err);
}
);
}); });
} });
};
Sync.prototype.start = function (opts, next) {
Sync.prototype.init = function(opts) {
mongoose.connect(config.db); mongoose.connect(config.db);
var db = mongoose.connection; this.db = mongoose.connection;
this.rpc = new RpcClient(config.bitcoind); this.rpc = new RpcClient(config.bitcoind);
this.db.on('error', console.error.bind(console, 'connection error:'));
};
Sync.prototype.import_history = function(opts, next) {
var that = this; var that = this;
this.db.once('open', function() {
db.on('error', console.error.bind(console, 'connection error:'));
db.once('open', function (){
async.series([ async.series([
function(cb) { function(cb) {
if (opts.destroy) { if (opts.destroy) {
console.log("Deleting Blocks..."); console.log('Deleting Blocks...');
return db.collections['blocks'].drop(cb); that.db.collections.blocks.drop(cb);
} else {
cb();
} }
return cb();
}, },
function(cb) { function(cb) {
if (opts.destroy) { if (opts.destroy) {
console.log("Deleting TXs..."); console.log('Deleting TXs...');
return db.collections['transactions'].drop(cb); that.db.collections.transactions.drop(cb);
} else {
cb();
} }
return cb();
}, },
function(cb) { function(cb) {
if (!opts.skip_blocks) { if (!opts.skip_blocks) {
that.syncBlocks(opts.reindex, function(err) { that.syncBlocks(opts.reindex, cb);
if (err) { } else {
return cb(err); cb();
}
console.log("\tBlocks done.");
return cb();
});
}
else {
return cb();
} }
}, },
function(cb) { function(cb) {
if (isSyncTxEnabled && ! opts.skip_txs) { if (isSyncTxEnabled && ! opts.skip_txs) {
that.syncTXs(opts.reindex, function(err) { that.syncTXs(opts.reindex, cb);
if (err) {
return cb(err);
}
return cb();
});
} }
else { else {
return cb(); return cb();
} }
}, }], function(err) {
function(cb) {
db.close();
return cb();
},
],
function(err) {
if (err) {
db.close();
return next(err); return next(err);
}
return next();
}); });
}); });
}
return Sync;
}; };
Sync.prototype.close = function() {
console.log("closing connection");
this.db.close();
};
return Sync;
}
module.defineClass(spec); module.defineClass(spec);

150
p2p.js
View File

@ -1,5 +1,8 @@
'use strict'; 'use strict';
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
var fs = require('fs'); var fs = require('fs');
var HeaderDB = require('./HeaderDB').class(); var HeaderDB = require('./HeaderDB').class();
var Block = require('bitcore/Block').class(); var Block = require('bitcore/Block').class();
@ -7,6 +10,8 @@ var CoinConst = require('bitcore/const');
var coinUtil = require('bitcore/util/util'); var coinUtil = require('bitcore/util/util');
var networks = require('bitcore/networks'); var networks = require('bitcore/networks');
var Parser = require('bitcore/util/BinaryParser').class(); var Parser = require('bitcore/util/BinaryParser').class();
var async = require('async');
var Sync = require('./lib/Sync').class();
var peerdb_fn = 'peerdb.json'; var peerdb_fn = 'peerdb.json';
@ -29,15 +34,17 @@ function peerdb_load() {
peerdb = [{ peerdb = [{
ipv4: '127.0.0.1', ipv4: '127.0.0.1',
port: 18333 port: 18333
}, ]; },
];
fs.writeFileSync(peerdb_fn, JSON.stringify(peerdb)); fs.writeFileSync(peerdb_fn, JSON.stringify(peerdb));
} }
} }
function hdrdb_load() function hdrdb_load() {
{ hdrdb = new HeaderDB({
hdrdb = new HeaderDB({network: network}); network: network
});
} }
function get_more_headers(info) { function get_more_headers(info) {
@ -66,12 +73,10 @@ function handle_headers(info) {
// We persist the header DB after each batch // We persist the header DB after each batch
//hdrdb.writeFile(hdrdb_fn); //hdrdb.writeFile(hdrdb_fn);
// Only one request per batch of headers we receive. // Only one request per batch of headers we receive.
get_more_headers(info); get_more_headers(info);
} }
function handle_verack(info) { function handle_verack(info) {
var inv = { var inv = {
type: CoinConst.MSG.BLOCK, type: CoinConst.MSG.BLOCK,
@ -82,31 +87,53 @@ function handle_verack(info) {
// Asks for the genesis block // Asks for the genesis block
// console.log('p2psync: Asking for the genesis block'); // console.log('p2psync: Asking for the genesis block');
// info.conn.sendGetData(invs); // info.conn.sendGetData(invs);
} }
function handle_inv(info) { function handle_inv(info) {
console.log('handle inv');
// TODO: should limit the invs to objects we haven't seen yet // TODO: should limit the invs to objects we haven't seen yet
var invs = info.message.invs; var invs = info.message.invs;
invs.forEach(function(inv) { invs.forEach(function(inv) {
console.log('Received inv for a '+CoinConst.MSG.to_str(inv.type)); console.log('Handle inv for a ' + CoinConst.MSG.to_str(inv.type));
} });
);
console.log('requesting getData');
info.conn.sendGetData(invs); info.conn.sendGetData(invs);
} }
var sync = new Sync({
networkName: networks.testnet
});
sync.init();
function handle_tx(info) { function handle_tx(info) {
var tx = info.message.tx.getStandardizedObject(); var tx = info.message.tx.getStandardizedObject();
console.log('handle tx: '+JSON.stringify(tx)); console.log('Handle tx: ' + tx.hash);
sync.storeTxs([tx.hash], function(err) {
if (err) {
console.log('error in handle TX: '+err);
}
});
} }
function handle_block(info) { function handle_block(info) {
console.log('handle block');
var block = info.message.block; var block = info.message.block;
add_header(info, block); var now = Math.round(new Date().getTime() / 1000);
var blockHash = coinUtil.formatHashFull(block.calcHash());
console.log('Handle block: '+ blockHash);
sync.storeBlock({
'hash': blockHash,
'time': now
},
function(err) {
if (err) {
console.log('error in handle Block: ' + err);
} else {
var hashes = block.txs.map(function(tx) {
return coinUtil.formatHashFull(tx.hash);
});
sync.storeTxs(hashes, function(){});
}
});
} }
function handle_connected(data) { function handle_connected(data) {
@ -135,96 +162,6 @@ function p2psync() {
peerman.start(); peerman.start();
} }
function filesync_block_buf(blkdir, fn, buf) {
var parser = new Parser(buf);
var block = new Block();
block.parse(parser, true);
var hashStr = coinUtil.formatHashFull(block.calcHash());
try {
hdrdb.add(block);
} catch (e) {
var height = hdrdb.size();
console.log('HeaderDB failed adding block #' + height + ', ' + hashStr);
console.log(' Reason: ' + e);
return;
}
var height = hdrdb.size() - 1;
if ((height % 1000) == 0)
console.log('HeaderDB added block #' + height + ', ' + hashStr);
}
function filesync_open_cb(err, fd, blkdir, fn) {
if (err)
throw err;
var hdrbuf = new Buffer(4 * 2);
while (1) {
// read 2x 32-bit header
var bread = fs.readSync(fd, hdrbuf, 0, 4 * 2, null);
if (bread < (4 * 2)) {
console.log('Short read/EOF, ending scan of ' + fn);
break;
}
// check magic matches
var magic = hdrbuf.slice(0, 4);
if (magic.toString() != network.magic.toString()) {
console.log('Invalid network magic, ending scan of ' + fn);
break;
}
// block size
var blkSize = hdrbuf.readUInt32LE(4);
if (blkSize > (1 * 1024 * 1024))
throw new Error('Invalid block size ' + blkSize);
// read raw block data
var blkBuf = new Buffer(blkSize);
bread = fs.readSync(fd, blkBuf, 0, blkSize, null);
if (bread != blkSize)
throw new Error('Failed to read block');
// process block
filesync_block_buf(blkdir, fn, blkBuf);
}
fs.closeSync(fd);
hdrdb.writeFile(hdrdb_fn);
console.log('Wrote header db');
}
function filesync_block_file(blkdir, fn) {
console.log('Scanning ' + fn + ' for block data.');
var pathname = blkdir + '/' + fn;
fs.open(pathname, 'r', function(err, fd) {
filesync_open_cb(err, fd, blkdir, fn);
});
}
function cmd_filesync_rd(err, files, blkdir) {
if (err)
throw err;
files = files.sort();
var scanned = 0;
files.forEach(function(fn) {
var re = /^blk\d+\.dat$/;
if (fn.match(re)) {
filesync_block_file(blkdir, fn);
scanned++;
}
});
console.log('Scanned ' + scanned + ' of ' + files.length + ' files in '
+ blkdir);
}
function main() { function main() {
peerdb_load(); peerdb_load();
hdrdb_load(); hdrdb_load();
@ -233,3 +170,4 @@ function main() {
} }
main(); main();

View File

@ -11,7 +11,12 @@
{ {
"name": "Matias Alejo Garcia", "name": "Matias Alejo Garcia",
"email": "ematiu@gmail.com" "email": "ematiu@gmail.com"
},
{
"name": "Manuel Araoz",
"email": "manuelaraoz@gmail.com"
} }
], ],
"bugs": { "bugs": {
"url": "https://github.com/bitpay/mystery/issues" "url": "https://github.com/bitpay/mystery/issues"

View File

@ -1,4 +1,5 @@
--require should --require should
-R spec -R spec
--ui bdd --ui bdd
--recursive

View File

@ -11,31 +11,37 @@ var
config = require('../../config/config'), config = require('../../config/config'),
Block = require('../../app/models/Block'); Block = require('../../app/models/Block');
mongoose.connection.on('error', function(err) { console.log(err); });
describe('Block getInfo', function(){
before(function(done) {
mongoose.connect(config.db); mongoose.connect(config.db);
var db = mongoose.connection;
describe('getInfo', function(){
var block_hash = TESTING_BLOCK;
db.on('error', console.error.bind(console, 'connection error:'));
db.once('open', function (){
var block2 = Block.fromHashWithInfo(block_hash, function(err, b2) {
if (err) done(err);
console.log("Block obj:");
console.log(b2);
console.log("Block.info:");
console.log(b2.info);
db.close();
done(); done();
}); });
after(function(done) {
mongoose.connection.close();
done();
});
it('should poll block\'s info from mongoose', function(done) {
var block2 = Block.fromHashWithInfo(TESTING_BLOCK, function(err, b2) {
if (err) done(err);
assert.equal(b2.hash, TESTING_BLOCK);
done();
});
});
it('should poll block\'s info from bitcoind', function(done) {
var block2 = Block.fromHashWithInfo(TESTING_BLOCK, function(err, b2) {
if (err) done(err);
assert.equal(b2.info.hash, TESTING_BLOCK);
assert.equal(b2.info.chainwork, '00000000000000000000000000000000000000000000000000446af21d50acd3');
done();
});
}); });
}); });

View File

@ -1,9 +0,0 @@
var assert = require("assert")
describe('Array', function(){
describe('#indexOf()', function(){
it('should return -1 when the value is not present', function(){
assert.equal(-1, [1,2,3].indexOf(5));
assert.equal(-1, [1,2,3].indexOf(0));
})
})
})

View File

@ -1,5 +1,7 @@
#! /usr/bin / env node #! /usr/bin / env node
'use strict';
process.env.NODE_ENV = process.env.NODE_ENV || 'development'; process.env.NODE_ENV = process.env.NODE_ENV || 'development';
require('buffertools').extend(); require('buffertools').extend();
@ -7,28 +9,37 @@ require('buffertools').extend();
var SYNC_VERSION = '0.1'; var SYNC_VERSION = '0.1';
var program = require('commander'); var program = require('commander');
var Sync = require('../lib/Sync').class(); var Sync = require('../lib/Sync').class();
var async = require('async');
program program.version(SYNC_VERSION).option('-N --network [livenet]', 'Set bitcoin network [testnet]', 'testnet').option('-R --reindex', 'Force reindexing', '0').option('-D --destroy', 'Remove current DB', '0').option('--skip_blocks', 'Sync blocks').option('--skip_txs', 'Sync transactions').parse(process.argv);
.version(SYNC_VERSION)
.option('-N --network [livenet]', 'Set bitcoin network [testnet]', 'testnet')
.option('-R --reindex', 'Force reindexing', '0')
.option('-D --destroy', 'Remove current DB', '0')
.option('--skip_blocks', 'Sync blocks')
.option('--skip_txs', 'Sync transactions')
.parse(process.argv);
var sync = new Sync({ networkName: program.network }); var sync = new Sync({
networkName: program.network
});
if (program.remove) { if (program.remove) {
} }
sync.start( program, function(err){
async.series([
function(cb) {
sync.init(program);
cb();
},
function(cb) {
sync.import_history(program, function(err) {
if (err) { if (err) {
console.log("CRITICAL ERROR: ", err); console.log('CRITICAL ERROR: ', err);
} }
else { else {
console.log('Done!'); console.log('Done!');
} }
cb();
}); });
},
function(cb) {
sync.close();
cb();
}]);