diff --git a/example/index.js b/example/index.js index 7f618458..131acd15 100755 --- a/example/index.js +++ b/example/index.js @@ -23,6 +23,10 @@ bitcoind.start(function(err) { // print('Found tx:'); // print(tx); // }); + bitcoind.on('mptx', function(mptx) { + print('Found mempool tx:'); + print(mptx); + }); }); }); diff --git a/lib/bitcoind.js b/lib/bitcoind.js index 94b800fd..b97f274d 100644 --- a/lib/bitcoind.js +++ b/lib/bitcoind.js @@ -132,6 +132,38 @@ Bitcoin.prototype.start = function(callback) { this._emitted = {}; + this.on('newListener', function(name) { + if (name === 'block') { + self._pollBlocks(); + return; + } + if (name === 'tx') { + self._pollBlocks(); + self._pollMempool(); + return; + } + if (name === 'mptx') { + self._pollMempool(); + return; + } + }); + + if (this.log_pipe !== -1) { + this.log('log pipe opened: %d', this.log_pipe); + this._pipe = new net.Socket(this.log_pipe); + this._pipe.on('data', function(data) { + return process.stdout.write('bitcoind: ' + data + '\n'); + }); + this._pipe.on('error', function(err) { + ; // ignore for now + }); + this._pipe.resume(); + } +}; + +Bitcoin.prototype._pollBlocks = function() { + if (this._pollingBlocks) return; + this._pollingBlocks = true; (function next() { return bitcoindjs.pollBlocks(function(err, blocks) { if (err) return setTimeout(next, self.pollInterval); @@ -161,18 +193,34 @@ Bitcoin.prototype.start = function(callback) { }); }); })(); +}; - if (this.log_pipe !== -1) { - this.log('log pipe opened: %d', this.log_pipe); - this._pipe = new net.Socket(this.log_pipe); - this._pipe.on('data', function(data) { - return process.stdout.write('bitcoind: ' + data + '\n'); +Bitcoin.prototype._pollMempool = function() { + if (this._pollingMempool) return; + this._pollingMempool = true; + (function next() { + return bitcoindjs.pollMempool(function(err, txs) { + if (err) return setTimeout(next, self.pollInterval); + return utils.forEach(txs, function(tx, nextTx) { + // XXX Bad workaround + if (self._emitted[tx.hash]) { + return setImmediate(function() { + return nextTx(); + }); + } + self._emitted[tx.hash] = true; + + self.emit('mptx', tx); + self.emit('tx', tx); + + return setImmediate(function() { + return nextTx(); + }); + }, function() { + return setTimeout(next, self.pollInterval); + }); }); - this._pipe.on('error', function(err) { - ; // ignore for now - }); - this._pipe.resume(); - } + })(); }; Bitcoin.prototype.getBlock = function(blockHash, callback) { diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index 03a50e4f..5aeb427c 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -124,6 +124,7 @@ NAN_METHOD(StopBitcoind); NAN_METHOD(GetBlock); NAN_METHOD(GetTx); NAN_METHOD(PollBlocks); +NAN_METHOD(PollMempool); static void async_start_node_work(uv_work_t *req); @@ -162,9 +163,15 @@ static void async_poll_blocks_after(uv_work_t *req); static void -ctx_to_js(const CTransaction& tx, uint256 hashBlock, Local entry); +async_poll_mempool(uv_work_t *req); static void +async_poll_mempool_after(uv_work_t *req); + +static inline void +ctx_to_js(const CTransaction& tx, uint256 hashBlock, Local entry); + +static inline void cblock_to_js(const CBlock& block, const CBlockIndex* blockindex, Local obj); extern "C" void @@ -236,6 +243,19 @@ struct async_poll_blocks_data { Persistent callback; }; +/** + * async_poll_mempool_data + */ + +struct async_poll_mempool_data { + std::string err_msg; + int poll_saved_height; + int poll_top_height; + Persistent result_array; + Persistent callback; +}; + + /** * StartBitcoind * bitcoind.start(callback) @@ -809,6 +829,107 @@ async_poll_blocks_after(uv_work_t *req) { delete req; } +/** + * PollMempool(callback) + * bitcoind.pollMempool(callback) + */ + +NAN_METHOD(PollMempool) { + NanScope(); + + if (args.Length() < 1 || !args[0]->IsFunction()) { + return NanThrowError( + "Usage: bitcoindjs.pollMempool(callback)"); + } + + Local callback = Local::Cast(args[0]); + + async_poll_mempool_data *data = new async_poll_mempool_data(); + data->poll_saved_height = -1; + data->poll_top_height = -1; + data->err_msg = std::string(""); + data->callback = Persistent::New(callback); + + uv_work_t *req = new uv_work_t(); + req->data = data; + + int status = uv_queue_work(uv_default_loop(), + req, async_poll_mempool, + (uv_after_work_cb)async_poll_mempool_after); + + assert(status == 0); + + NanReturnValue(Undefined()); +} + +static void +async_poll_mempool(uv_work_t *req) { + // async_poll_blocks_data* data = static_cast(req->data); + // Nothing really async to do here. It's all in memory. Placeholder for now. + useconds_t usec = 20 * 1000; + usleep(usec); +} + +static void +async_poll_mempool_after(uv_work_t *req) { + NanScope(); + async_poll_blocks_data* data = static_cast(req->data); + + if (!data->err_msg.empty()) { + Local err = Exception::Error(String::New(data->err_msg.c_str())); + const unsigned argc = 1; + Local argv[argc] = { err }; + TryCatch try_catch; + data->callback->Call(Context::GetCurrent()->Global(), argc, argv); + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + } else { + int ti = 0; + Local txs = NanNew(); + + { + std::map::const_iterator it = mempool.mapTx.begin(); + for (; it != mempool.mapTx.end(); it++) { + const CTransaction& tx = it->second.GetTx(); + // uint256 hash = it->second.GetTx().GetHash(); + Local entry = NanNew(); + ctx_to_js(tx, 0, entry); + txs->Set(ti, entry); + ti++; + } + } + + { + std::map::const_iterator it = mempool.mapNextTx.begin(); + for (; it != mempool.mapNextTx.end(); it++) { + const CTransaction tx = *it->second.ptx; + // uint256 hash = it->second.ptx->GetHash(); + Local entry = NanNew(); + ctx_to_js(tx, 0, entry); + txs->Set(ti, entry); + ti++; + } + } + + const unsigned argc = 2; + Local argv[argc] = { + Local::New(Null()), + Local::New(txs) + }; + TryCatch try_catch; + data->callback->Call(Context::GetCurrent()->Global(), argc, argv); + if (try_catch.HasCaught()) { + node::FatalException(try_catch); + } + } + + data->callback.Dispose(); + + delete data; + delete req; +} + /** * Conversions */ @@ -1034,6 +1155,7 @@ init(Handle target) { NODE_SET_METHOD(target, "getBlock", GetBlock); NODE_SET_METHOD(target, "getTx", GetTx); NODE_SET_METHOD(target, "pollBlocks", PollBlocks); + NODE_SET_METHOD(target, "pollMempool", PollMempool); } NODE_MODULE(bitcoindjs, init)