This commit is contained in:
Chris Kleeschulte 2017-01-20 17:28:25 -05:00
parent bc5c3f54e8
commit 5a372f268c
5 changed files with 266 additions and 64 deletions

View File

@ -3,32 +3,46 @@
var levelup = require('levelup');
var leveldown = require('leveldown');
var Encoding = require('../lib/services/address/encoding');
var dbPath = '/Users/patrick/.bitcore/bitcore-node.db';
var dbPath = '/Users/chrisk/.bwdb/bitcore-node.db';
var bitcore = require('bitcore-lib');
var db = levelup(dbPath, {keyEncoding: 'binary', valueEncoding: 'binary'});
var prefix = new Buffer('0002', 'hex');
var encoding = new Encoding(prefix);
var address = '19k8nToWwMGuF4HkNpzgoVAYk4viBnEs5D';
var address = '1MfDRRVVKXUe5KNVZzu8CBzUZDHTTYZM94';
var addressLength = new Buffer(1);
addressLength.writeUInt8(address.length);
//var startBuffer = prefix;
//var endBuffer = Buffer.concat([prefix, new Buffer('ff', 'hex')]);
var startBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('00', 'hex')]);
var endBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('ff', 'hex')]);
//var startBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('00', 'hex')]);
//var endBuffer = Buffer.concat([prefix, addressLength, new Buffer(address, 'utf8'), new Buffer('01', 'hex')]);
var start = Buffer.concat([prefix, new Buffer('0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9', 'hex')]);
var end = Buffer.concat([prefix, new Buffer('0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9', 'hex'), new Buffer('01', 'hex')]);
var stream = db.createReadStream({
gte: startBuffer,
lt: endBuffer
gte: start,
lt: end
});
stream.on('data', function(data) {
console.log(encoding.decodeAddressIndexKey(data.key), encoding.decodeAddressIndexValue(data.value));
var txkey = data.key.slice(2).toString('hex');
var height = data.value.readUInt32BE();
var timestamp = data.value.readDoubleBE(4);
var inputValues = [];
var inputValuesLength = data.value.readUInt16BE(12);
for(var i = 0; i < inputValuesLength / 8; i++) {
inputValues.push(buffer.readDoubleBE(i * 8 + 14));
}
var transaction = new bitcore.Transaction(data.value.slice(inputValues.length * 8 + 14));
transaction.__height = height;
transaction.__inputValues = inputValues;
transaction.__timestamp = timestamp;
//console.log(txkey, transaction.toObject());
console.log(data.value);
console.log(transaction.__height, transaction.__inputValues, transaction.__timestamp);
//console.log(data.key.toString('hex'), data.value.toString('hex'));
});
stream.on('end', function() {
console.log('end');
});
});

View File

@ -466,12 +466,12 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options
txidBuffer = new Buffer(txid, 'hex');
}
if (options.queryMempool) {
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex);
var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex);
if (this.mempoolSpentIndex[spentIndexSyncKey]) {
return this._getSpentMempool(txidBuffer, outputIndex, callback);
}
}
var key = encoding.encodeInputKeyMap(txidBuffer, outputIndex);
var key = self.encoding.encodeInputKeyMap(txidBuffer, outputIndex);
var dbOptions = {
valueEncoding: 'binary',
keyEncoding: 'binary'
@ -482,7 +482,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options
} else if (err) {
return callback(err);
}
var value = encoding.decodeInputValueMap(buffer);
var value = self.encoding.decodeInputValueMap(buffer);
callback(null, {
inputTxId: value.inputTxId.toString('hex'),
inputIndex: value.inputIndex
@ -519,7 +519,7 @@ AddressService.prototype.createInputsStream = function(addressStr, options) {
AddressService.prototype.createInputsDBStream = function(addressStr, options) {
var stream;
var addrObj = encoding.getAddressInfo(addressStr);
var addrObj = this.encoding.getAddressInfo(addressStr);
var hashBuffer = addrObj.hashBuffer;
var hashTypeBuffer = addrObj.hashTypeBuffer;
@ -588,7 +588,7 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) {
var inputs = [];
var addrObj = encoding.getAddressInfo(addressStr);
var addrObj = self.encoding.getAddressInfo(addressStr);
var hashBuffer = addrObj.hashBuffer;
var hashTypeBuffer = addrObj.hashTypeBuffer;
@ -733,7 +733,7 @@ AddressService.prototype.createOutputsStream = function(addressStr, options) {
AddressService.prototype.createOutputsDBStream = function(addressStr, options) {
var addrObj = encoding.getAddressInfo(addressStr);
var addrObj = this.encoding.getAddressInfo(addressStr);
var hashBuffer = addrObj.hashBuffer;
var hashTypeBuffer = addrObj.hashTypeBuffer;
var stream;
@ -805,7 +805,7 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) {
$.checkArgument(_.isObject(options), 'Second argument is expected to be an options object.');
$.checkArgument(_.isFunction(callback), 'Third argument is expected to be a callback function.');
var addrObj = encoding.getAddressInfo(addressStr);
var addrObj = self.encoding.getAddressInfo(addressStr);
var hashBuffer = addrObj.hashBuffer;
var hashTypeBuffer = addrObj.hashTypeBuffer;
if (!hashTypeBuffer) {
@ -880,7 +880,7 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h
// prefix: 1, hashBuffer: 20, hashTypeBuffer: 1, txid: 32, outputIndex: 4
var txid = data.key.slice(22, 54);
var outputIndex = data.key.readUInt32BE(54);
var value = encoding.decodeOutputMempoolValue(data.value);
var value = self.encoding.decodeOutputMempoolValue(data.value);
var output = {
address: addressStr,
hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')],
@ -953,25 +953,36 @@ AddressService.prototype.getUnspentOutputsForAddress = function(address, queryMe
var self = this;
this.getOutputs(address, {queryMempool: queryMempool}, function(err, outputs) {
if (err) {
return callback(err);
} else if(!outputs.length) {
return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []);
}
var addressLengthBuffer = new Buffer(1);
addressLengthBuffer.writeUInt8(address.length);
var start = Buffer.concat([ self.prefix, addressLengthBuffer, new Buffer(address, 'utf8'), new Buffer('00', 'hex') ]);
var end = Buffer.concat([ self.prefix, addressLengthBuffer, new Buffer(address, 'utf8'), new Buffer('01', 'hex') ]);
var stream = self.store.createReadStream({
gte: start,
lt: end
});
var opts = {
queryMempool: queryMempool
};
var isUnspent = function(output, callback) {
self.isUnspent(output, opts, callback);
};
async.filter(outputs, isUnspent, function(results) {
callback(null, results);
var utxos = [];
stream.on('data', function(data) {
var key = self.encoding.decodeAddressIndexKey(data.key);
var value = self.encoding.decodeAddressIndexValue(data.value);
utxos.push({
address: key.address,
txid: key.txid,
outputIndex: key.index,
satoshis: value.satoshis,
height: key.height
});
});
stream.on('end', function() {
return callback(null, utxos);
});
stream.on('error', function(err) {
if(err) {
return callback(err);
}
});
};
/**
@ -1003,7 +1014,7 @@ AddressService.prototype.isSpent = function(output, options, callback) {
var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex);
if (!spent && queryMempool) {
var txidBuffer = new Buffer(txid, 'hex');
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex);
var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex);
spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false;
}
setImmediate(function() {
@ -1047,12 +1058,13 @@ AddressService.prototype.isSpent = function(output, options, callback) {
* @param {Function} callback
*/
AddressService.prototype.getAddressHistory = function(addresses, options, callback) {
var history = new AddressHistory({
node: this.node,
options: options,
addresses: addresses
});
history.get(callback);
//var history = new AddressHistory({
// node: this.node,
// options: options,
// addresses: addresses
//});
//history.get(callback);
};
/**
@ -1193,7 +1205,7 @@ AddressService.prototype._getAddressConfirmedOutputsSummary = function(address,
if(options.queryMempool) {
// Check to see if this output is spent in the mempool and if so
// we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta)
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(
var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(
new Buffer(txid, 'hex'), // TODO: get buffer directly
outputIndex
);
@ -1252,7 +1264,7 @@ AddressService.prototype._getAddressMempoolSummary = function(address, options,
var addressStr = address.toString();
var hashBuffer = address.hashBuffer;
var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type];
var addressIndexKey = encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer);
var addressIndexKey = self.encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer);
if(!this.mempoolAddressIndex[addressIndexKey]) {
return callback(null, result);
@ -1282,7 +1294,7 @@ AddressService.prototype._getAddressMempoolSummary = function(address, options,
result.unconfirmedAppearanceIds[output.txid] = output.timestamp;
if(!options.noBalance) {
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(
var spentIndexSyncKey = self.encoding.encodeSpentIndexSyncKey(
new Buffer(output.txid, 'hex'), // TODO: get buffer directly
output.outputIndex
);

View File

@ -4,7 +4,8 @@ var inherits = require('util').inherits;
function TimestampService(options) {
BaseService.call(this, options);
this.currentBlock = null;
this.currentTimestamp = null;
}
inherits(TimestampService, BaseService);
@ -65,6 +66,9 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
timestamp = lastTimestamp + 1;
}
self.currentBlock = block.hash;
self.currentTimestamp = timestamp;
operations = operations.concat(
[
{
@ -87,6 +91,12 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
TimestampService.prototype.getTimestamp = function(hash, callback) {
var self = this;
if (hash === self.currentBlock) {
return setImmediate(function() {
callback(null, self.currentTimestamp);
});
}
var key = self._encodeBlockTimestampKey(hash);
self.store.get(key, function(err, buffer) {
if(err) {
@ -133,4 +143,4 @@ TimestampService.prototype._decodeTimestampBlockValue = function(buffer) {
return buffer.toString('hex');
};
module.exports = TimestampService;
module.exports = TimestampService;

View File

@ -1,18 +1,21 @@
'use strict';
var async = require('async');
var BaseService = require('../service');
var inherits = require('util').inherits;
var bitcore = require('bitcore-lib');
function TransactionService(options) {
BaseService.call(this, options);
this.concurrency = options.concurrency || 20;
this.currentTransactions = {};
}
inherits(TransactionService, BaseService);
TransactionService.dependencies = [
'db'
'db',
'timestamp'
];
TransactionService.prototype.start = function(callback) {
@ -36,6 +39,7 @@ TransactionService.prototype.stop = function(callback) {
};
TransactionService.prototype.blockHandler = function(block, connectBlock, callback) {
var self = this;
var action = 'put';
if (!connectBlock) {
action = 'del';
@ -45,22 +49,68 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba
this.currentTransactions = {};
for(var i = 0; i < block.transactions.length; i++) {
var tx = block.transactions[i];
tx.__height = block.__height;
async.series([
function(next) {
self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) {
if(err) {
return next(err);
}
block.__timestamp = timestamp;
next();
});
}, function(next) {
async.eachSeries(block.transactions, function(tx, next) {
tx.__timestamp = block.__timestamp;
tx.__height = block.__height;
this.currentTransactions[tx.id] = tx;
self._getInputValues(tx, function(err, inputValues) {
if(err) {
return next(err);
}
tx.__inputValues = inputValues;
self.currentTransactions[tx.id] = tx;
operations.push({
type: action,
key: this._encodeTransactionKey(tx.id),
value: this._encodeTransactionValue(tx)
operations.push({
type: action,
key: self._encodeTransactionKey(tx.id),
value: self._encodeTransactionValue(tx)
});
next();
});
}, function(err) {
if(err) {
return next(err);
}
next();
});
}], function(err) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
TransactionService.prototype._getInputValues = function(tx, callback) {
var self = this;
if (tx.isCoinbase()) {
return callback(null, []);
}
setImmediate(function() {
callback(null, operations);
});
async.mapLimit(tx.inputs, this.concurrency, function(input, next) {
self.getTransaction(input.prevTxId.toString('hex'), function(err, prevTx) {
if(err) {
return next(err);
}
if (!prevTx.outputs[input.outputIndex]) {
return next(new Error('Input did not have utxo.'));
}
var satoshis = prevTx.outputs[input.outputIndex].satoshis;
next(null, satoshis);
});
}, callback);
};
TransactionService.prototype.getTransaction = function(txid, callback) {
@ -92,16 +142,39 @@ TransactionService.prototype._decodeTransactionKey = function(buffer) {
return buffer.slice(2).toString('hex');
};
TransactionService.prototype._encodeTransactionValue = function(transaction, height) {
TransactionService.prototype._encodeTransactionValue = function(transaction) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE();
return new Buffer.concat([heightBuffer, transaction.toBuffer()]);
heightBuffer.writeUInt32BE(transaction.__height);
var timestampBuffer = new Buffer(8);
timestampBuffer.writeDoubleBE(transaction.__timestamp);
var inputValues = transaction.__inputValues;
var inputValuesBuffer = new Buffer(8 * inputValues.length);
for(var i = 0; i < inputValues.length; i++) {
inputValuesBuffer.writeDoubleBE(inputValues[i], i * 8);
}
var inputValuesLengthBuffer = new Buffer(2);
inputValuesLengthBuffer.writeUInt16BE(inputValues.length * 8);
return new Buffer.concat([heightBuffer, timestampBuffer, inputValuesLengthBuffer, inputValuesBuffer, transaction.toBuffer()]);
};
TransactionService.prototype._decodeTransactionValue = function(buffer) {
var height = buffer.readUInt32BE();
var transaction = new bitcore.Transaction(buffer.slice(4));
var timestamp = buffer.readDoubleBE(4);
var inputValues = [];
var inputValuesLength = buffer.readUInt16BE(12);
for(var i = 0; i < inputValuesLength / 8; i++) {
inputValues.push(buffer.readDoubleBE(i * 8 + 14));
}
var transaction = new bitcore.Transaction(buffer.slice(inputValues.length * 8 + 14));
transaction.__height = height;
transaction.__inputValues = inputValues;
transaction.__timestamp = timestamp;
return transaction;
};

93
livetest/index.js Normal file
View File

@ -0,0 +1,93 @@
'use strict';
var assert = require('assert');
var async = require('async');
var BitcoreNode = require('../');
var db = require('../lib/services/db');
var config = {
"network": "livenet",
"port": 3001,
"datadir": "/Users/chrisk/.bwdb/",
"services": [
{
"name": "bitcoind",
"config": {
"connect": [
{
"rpcport": 8332,
"rpcuser": "bitcoin",
"rpcpassword": "local321",
"zmqpubrawtx": "tcp://127.0.0.1:28332"
}
]
},
"module": require('../lib/services/bitcoind')
},
{
"name": "db",
"config": {},
"module": db
},
{
"name": "transaction",
"config": {},
"module": require('../lib/services/transaction')
},
{
"name": "address",
"config": {},
"module": require('../lib/services/address')
},
{
"name": "timestamp",
"config": {},
"module": require('../lib/services/timestamp')
}
],
"servicesConfig": {
"bitcoind": {
"connect": [
{
"rpcport": 8332,
"rpcuser": "bitcoin",
"rpcpassword": "local321",
"zmqpubrawtx": "tcp://127.0.0.1:28332"
}
]
}
},
"path": "/Users/chrisk/source/zzbitcore_node/bitcore-node.json"
}
db.prototype.sync = function(){};
var node = new BitcoreNode.Node(config);
node.start(function(err) {
if(err) {
throw err;
}
var addresses = [ '1MfDRRVVKXUe5KNVZzu8CBzUZDHTTYZM94' ];
async.series([function(next) {
node.services.address.getUnspentOutputs(addresses, false, function(err, results) {
if(err) {
throw err;
}
console.log(results);
next();
});
}, function(next) {
node.services.address.getAddressHistory(addresses, false, function(err, results) {
if(err) {
return callback(err);
}
console.log(results);
next();
});
}], function(err) {
node.stop(function(err) {
if(err) {
return callback(err);
}
process.exit(0);
});
});
});