diff --git a/.gitignore b/.gitignore index b4d08d78..47fb7864 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ coverage/* **/*.creator *.log *.tmp +*.tmp.* .DS_Store bin/florincoin* bin/SHA256SUMS.asc diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 84ba9da0..f58ccb6f 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -16,7 +16,7 @@ var utils = require('../../utils'); var LRU = require('lru-cache'); var XXHash = require('xxhash'); - +const MAX_TX_QUERY_LIMIT = 1000; // See rationale about this cache at function getTxList(next) const TXID_LIST_CACHE_ITEMS = 250; // nr of items (this translates to: consecutive @@ -67,8 +67,8 @@ AddressService.dependencies = [ // then I would pass back [tx1, tx2] in that order // // Instead of passing addresses, with from>0, options.cacheKey can be used to define the address set. -// -AddressService.prototype.getAddressHistory = function(addresses, options, callback) { +//(old one: non-optimized for large data) +AddressService.prototype.__getAddressHistory = function(addresses, options, callback) { var self = this; var cacheUsed = false; @@ -173,8 +173,87 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba }; +AddressService.prototype.getAddressHistory = function(addresses, options, streamer, callback) { + var self = this; + + options = options || {}; + //options.from = options.from || 0; //Deprecated, use `after` option + //options.to = options.to || 0xffffffff; //Deprecated, use `after` option + + if(typeof callback !== 'function'){ //if only 3 args, then streamer is callback + callback = streamer; + streamer = () => null; //NULL fn + } + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) + if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { + options.old_support = true; + options.from = options.from || 0; + options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + } + + if (_.isString(addresses)) { + addresses = [addresses]; + } + + var results = { + totalCount: 0, + items: [], + } + + async.eachLimit(addresses, 4, function(address, next) { + + self._streamAddressSummary(address, options, function(err, tx){ + + if(err) + return log.error(err); + + if(!options.txNotNeeded) { + results.totalCount++; + + if(!results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array + results.items.unshift(tx); //using unshift, so that recent tx (low) are at front + + if(results.items.length > MAX_TX_QUERY_LIMIT) { //remove items from array when overflown + results.items.sort((a, b) => b.__height - a.__height || b.txid().localeCompare(a.txid())); + let del_count = options.old_support ? results.items.length : results.items.length - MAX_TX_QUERY_LIMIT; + let start_index = options.old_support ? MAX_TX_QUERY_LIMIT : 0; + results.items.splice(start_index, del_count); + } + + } + + streamer(null, tx); + + }, next); + + }, function(err) { + + if (err) { + return callback(err); + } + + //sort items in desc block-height, then asc txid (if same height) + results.items.sort((a, b) => b.__height - a.__height || b.txid().localeCompare(a.txid())); + results.totalCount = parseInt(results.totalCount.toFixed()); + + //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) + if(options.old_support) { + results.items = results.items.slice(options.from, options.to); + } + + callback(null, results); + + }) + +} // this is basically the same as _getAddressHistory apart from the summary -AddressService.prototype.getAddressSummary = function(address, options, callback) { +//(old one: non-optimized for large data) +AddressService.prototype.__getAddressSummary = function(address, options, callback) { var self = this; @@ -200,7 +279,7 @@ AddressService.prototype.getAddressSummary = function(address, options, callback txApperances: 0, }; - self.getAddressHistory(address, options, function(err, results) { + self.__getAddressHistory(address, options, function(err, results) { //old fn if (err) { return callback(err); @@ -218,6 +297,72 @@ AddressService.prototype.getAddressSummary = function(address, options, callback }; +AddressService.prototype.getAddressSummary = function(address, options, streamer, callback) { + + var self = this; + + options = options || {}; + //options.from = options.from || 0; //Deprecated + //options.to = options.to || 0xffffffff; //Deprecated + options.txNotNeeded = true; //no need to store tx details in result + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + if(typeof callback !== 'function'){ //if only 3 args, then streamer is callback + callback = streamer; + streamer = () => null; //NULL fn + } + + var result = { + addrStr: address, + balance: 0, + balanceSat: 0, + totalReceived: 0, + totalReceivedSat: 0, + totalSent: 0, + totalSentSat: 0, + unconfirmedBalance: 0, + unconfirmedBalanceSat: 0, + unconfirmedTxApperances: 0, + txApperances: 0, + }; + + self.getAddressHistory(address, options, function(err, tx) { + + if(err) + return log.error(err); + + if(tx) + self._aggregateAddressSummaryResult(tx, address, result, options); + + streamer(null, tx); + + }, function(err) { + + if (err) { + return callback(err); + } + + result.balanceSat = parseInt(result.balanceSat.toFixed()); + result.totalReceivedSat = parseInt(result.totalReceivedSat.toFixed()); + result.totalSentSat = parseInt(result.totalSentSat.toFixed()); + result.txApperances = parseInt(result.txApperances.toFixed()); + result.unconfirmedBalanceSat = parseInt(result.unconfirmedBalanceSat.toFixed()); + result.unconfirmedTxApperances = parseInt(result.unconfirmedTxApperances.toFixed()); + + result.balance = Unit.fromSatoshis(result.balanceSat).toBTC(); + result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC(); + result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC(); + result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC(); + + callback(null, result); + + }); + +} + AddressService.prototype._setOutputResults = function(tx, address, result) { for(var j = 0; j < tx.outputs.length; j++) { @@ -265,7 +410,6 @@ AddressService.prototype._getAddressSummaryResult = function(txs, address, resul var self = this; for(var i = 0; i < txs.length; i++) { - var tx = txs[i]; self._setOutputResults(tx, address, result); @@ -283,6 +427,107 @@ AddressService.prototype._getAddressSummaryResult = function(txs, address, resul return result; }; +AddressService.prototype._getOccurrenceCount = function(tx, address) { + let count = 0; + + for(var i = 0; i < tx.inputs.length; i++) { + + var input = tx.inputs[i]; + + if(utils.getAddress(input, this._network) === address) + count++; + + } + + for(var j = 0; j < tx.outputs.length; j++) { + + var output = tx.outputs[j]; + + if(utils.getAddress(output, this._network) === address) + count++; + + } + + return count; + +} + +AddressService.prototype._getOutputResults = function(tx, address) { + + let value = 0; + + for(var j = 0; j < tx.outputs.length; j++) { + + var output = tx.outputs[j]; + + if (utils.getAddress(output, this._network) === address) + value += output.value; + + } + + return value; + +}; + +AddressService.prototype._getInputResults = function(tx, address) { + + let value = 0; + + for(var i = 0; i < tx.inputs.length; i++) { + + var input = tx.inputs[i]; + + if (utils.getAddress(input, this._network) === address) + value += tx.__inputValues[i]; + + } + + return value; + +}; + +AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options) { + + var self = this; + + let output_val = self._getOutputResults(tx, address); + let input_val = self._getInputResults(tx, address); + + //aggregate the result + result.txApperances++; + + result.totalReceivedSat += output_val; + result.balanceSat += output_val; + + result.totalSentSat += input_val; + result.balanceSat -= input_val; + + if(!tx.confirmations){ + result.unconfirmedTxApperances++; + result.unconfirmedBalanceSat += output_val; + result.unconfirmedBalanceSat -= input_val; + } + + if (!options.noTxList) { + + if (!result.transactions) { + result.transactions = []; + } + + let txid = tx.txid(); + if(!result.transactions.includes(txid)) { //push txid only if its not in the array + + result.transactions.unshift(txid); //using unshift, so that recent tx (low confirmation) are at front + + if(result.transactions.length > MAX_TX_QUERY_LIMIT) + result.transactions.pop(); //pop the oldest tx in list (when list limit is maxed out) + + } + + } + +} + AddressService.prototype.getAddressUnspentOutputs = function(address, options, callback) { var self = this; @@ -365,6 +610,11 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c utxoStream.on('data', function(data) { + if(results.length >= MAX_TX_QUERY_LIMIT) { //Max array limit reached, end response + utxoStream.emit('end'); + return; + } + var key = self._encoding.decodeUtxoIndexKey(data.key); var value = self._encoding.decodeUtxoIndexValue(data.value); @@ -440,18 +690,28 @@ AddressService.prototype.stop = function(callback) { AddressService.prototype._getTxidStream = function(address, options) { - var start = this._encoding.encodeAddressIndexKey(address); + var start; + if(options.after) { + start = this._encoding.encodeAddressIndexKey(address, options.start, options.after, 0xffffffff, 1, 0xffffffff); //0xffffffff is for getting after the txid + } else { + start = this._encoding.encodeAddressIndexKey(address, options.start); + } + + var endHeightBuf = new Buffer(4); + endHeightBuf.writeUInt32BE(options.end); + var end = Buffer.concat([ start.slice(0, address.length + 4), - options.endHeightBuf, + endHeightBuf, new Buffer(new Array(83).join('f'), 'hex') ]); var criteria = { gte: start, - lte: end, - reverse: true // txids stream from low confirmations to high confirmations + lte: end + //reverse: true // txids stream from low confirmations to high confirmations }; + //NOTE: commentted reverse to keep the order in asc when reading to preserve continuity when using `after` option // txid stream var txidStream = this._db.createKeyStream(criteria); @@ -463,6 +723,7 @@ AddressService.prototype._getTxidStream = function(address, options) { return txidStream; }; +//(used by old fn) AddressService.prototype._getAddressTxHistory = function(options, callback) { var self = this; @@ -491,6 +752,7 @@ AddressService.prototype._getAddressTxHistory = function(options, callback) { }; +//(used by old fn) AddressService.prototype._getAddressTxidHistory = function(address, options, callback) { var self = this; @@ -500,9 +762,6 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal var results = []; - options.endHeightBuf = new Buffer(4); - options.endHeightBuf.writeUInt32BE(options.end); - if (_.isUndefined(options.queryMempool)) { options.queryMempool = true; } @@ -551,7 +810,15 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal txIdTransformStream._transform = function(chunk, enc, callback) { var txInfo = self._encoding.decodeAddressIndexKey(chunk); - results.push({ txid: txInfo.txid, height: txInfo.height }); + + if(results.length >= MAX_TX_QUERY_LIMIT) { //Max array limit reached, end response + txIdTransformStream.emit('end'); + return; + } + + if(!results.some(r => r.txid == txInfo.txid)) //add txid to array only if its not already there + results.push({ txid: txInfo.txid, height: txInfo.height }); + callback(); }; @@ -563,6 +830,179 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal }; +AddressService.prototype._streamAddressSummary = function(address, options, streamer, callback) { + var self = this; + + options = options || {}; + options.start = options.start || 0; + options.end = options.end || 0xffffffff; + + //options.from = options.from || 0; //Deprecated, use `after` option + //options.to = options.to || 0xffffffff; //Deprecated, use `after` option + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + //declare the queue to process tx data + var tmpTxList = {}; //store processed txid temporarily to ignore duplication + + var q = async.queue(function(id, cb) { + + //duplication finding + if(id.txid in tmpTxList){ + + tmpTxList[id.txid][0]++; + + if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid + delete tmpTxList[id.txid]; + + return cb(); + + } else tmpTxList[id.txid] = [1, null]; + + if (id.height === 0xffffffff) { + + return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { + + if (err || !tx) { + return cb(err || new Error('Address Service: could not find tx: ' + id.txid)); + } + self._transaction.setTxMetaInfo(tx, options, cb); + + }); + + } + + self._transaction.getDetailedTransaction(id.txid, options, cb); + + }, 4); + + //q.pause(); //pause and wait until queue is set (not needed) + + function chunkCallback(err, tx){ + + if(q.killed || (!err && !tx)) //no error or tx data (duplicate calls will have empty tx value) + return; + + if(tx){ + let txid = tx.txid(); + tmpTxList[txid][1] = self._getOccurrenceCount(tx, address); + + if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid + delete tmpTxList[txid]; + } + + streamer(err, tx); + + if((err || options.flag_stop) && !q.killed){ + + q.kill(); + q.killed = true; + + return callback(); + } + + } + + async.waterfall([ + + //Find start height if `after` option is passed + function(next){ + + if(_.isUndefined(options.after)) { + return next(); + } + + self._transaction.getTransaction(options.after, options, function(err, tx) { + + if(tx && tx.confirmations && tx.height >= options.start) { + + options.start = tx.height; + + } else { + + delete options.after; + + } + + next(); + + }); + + }, + + // stream the rest of the confirmed txids out of the address index + function(next) { + + var txIdTransformStream = new Transform({ objectMode: true }); + + txIdTransformStream._flush = function(cb) { + txIdTransformStream.emit('end'); + cb(); + }; + + txIdTransformStream.on('error', function(err) { + log.error('Address Service: txstream err: ' + err); + txIdTransformStream.unpipe(); + }); + + txIdTransformStream.on('end', function() { + next(); + }); + + txIdTransformStream._transform = function(chunk, enc, cb) { + + if(options.flag_stop)//stop data query + return txIdTransformStream.unpipe(); + + var txInfo = self._encoding.decodeAddressIndexKey(chunk); + q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback); + + cb(); + }; + + var txidStream = self._getTxidStream(address, options); + txidStream.pipe(txIdTransformStream); + + }, + + // query the mempool for relevant txs for this address + function(next) { + + if (!options.queryMempool) { + return next(null, []); + } + + self._mempool.getTxidsByAddress(address, 'both', next); + }, + + // add the meta data such as input values, etc. + function(mempoolTxids, next) { + + if (mempoolTxids.length <= 0) { + return next(); + } + + mempoolTxids.map(id => q.push(id, chunkCallback)); + next(); + }, + + //wait for queue to complete + function(next) { + + if(!q.started) //No tx in query + return next(); + + else + q.drain = () => next(); + + } + + ], callback); + +} + AddressService.prototype._removeBlock = function(block, callback) { var self = this; diff --git a/lib/services/web/index.js b/lib/services/web/index.js index d9a5b48f..bd28a786 100644 --- a/lib/services/web/index.js +++ b/lib/services/web/index.js @@ -4,6 +4,7 @@ var fs = require('fs'); var http = require('http'); var https = require('https'); var express = require('express'); +var express_ws = require('express-ws'); var bodyParser = require('body-parser'); var socketio = require('socket.io'); var inherits = require('util').inherits; @@ -105,7 +106,7 @@ WebService.prototype.setupAllRoutes = function() { if(service.getRoutePrefix && service.setupRoutes) { this.app.use('/' + this.node.services[key].getRoutePrefix(), subApp); - this.node.services[key].setupRoutes(subApp, express); + this.node.services[key].setupRoutes(subApp, express, express_ws); } else { log.debug('No routes defined for: ' + key); } diff --git a/package.json b/package.json index 47ae3fe0..e39e9222 100644 --- a/package.json +++ b/package.json @@ -5,12 +5,12 @@ "node": ">=8.0.0" }, "author": "BitPay ", - "version": "5.0.8", + "version": "5.0.9-beta-rm", "main": "./index.js", - "repository": "git://github.com/oipwg/flocore-node.git", - "homepage": "https://github.com/oipwg/flocore-node", + "repository": "git://github.com/ranchimall/flocore-node.git", + "homepage": "https://github.com/ranchimall/flocore-node", "bugs": { - "url": "https://github.com/oipwg/flocore-node/issues" + "url": "https://github.com/ranchimall/flocore-node/issues" }, "bin": { "flocore-node": "./bin/flocore-node" @@ -36,13 +36,14 @@ "commander": "^2.8.1", "errno": "^0.1.4", "express": "^4.13.3", + "express-ws": "^5.0.2", "fcoin": "^1.1.4", "flocore-lib": "^0.15.2", "flocore-message": "^1.0.7", "flocore-p2p": "^5.0.0-beta.8", "florincoind-rpc": "0.7.1", - "flosight-api": "^5.0.0-beta.75", - "flosight-ui": "^5.0.0-beta.72", + "flosight-api": "github:ranchimall/flosight-api", + "flosight-ui": "github:ranchimall/flosight-ui", "leveldown": "^2.0.0", "levelup": "^2.0.0", "liftoff": "^2.2.0",