From a7444a92b9d1f177f22c542a055e669b1765e123 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 22 Sep 2015 17:03:02 -0400 Subject: [PATCH 1/2] Tx event firing - The ProcessMessages signal was scheduling/connecting its slots slightly out of order for what we need - First, the scan_messages needs to run and copy out the stream - Second, Bitcoin's ProcessMessages needs to run in order to run AcceptToMemoryPool - Third, scan_messages_after needs to run in order to signal our tx_monitor - In order to make this happen, we need to pair ProcessMessages slot to the scan_messages slot and order that - Then, AFTER those two complete, we can schedule scan_messages_after --- .travis.yml | 1 + etc/bitcoin.patch | 13 +++++++++++++ src/libbitcoind.cc | 4 ++-- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index e3812578..e2df2e3b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ before_install: script: - _mocha -R spec integration/regtest.js - _mocha -R spec integration/regtest-node.js + - _mocha -R spec integration/p2p.js - _mocha -R spec --recursive cache: directories: diff --git a/etc/bitcoin.patch b/etc/bitcoin.patch index 99b36feb..53530f67 100644 --- a/etc/bitcoin.patch +++ b/etc/bitcoin.patch @@ -381,6 +381,19 @@ index c65e842..0e44bb5 100644 CLevelDBWrapper(const boost::filesystem::path& path, size_t nCacheSize, bool fMemory = false, bool fWipe = false); ~CLevelDBWrapper(); +diff --git a/src/main.cpp b/src/main.cpp +index 8f82abf..b3784a7 100644 +--- a/src/main.cpp ++++ b/src/main.cpp +@@ -486,7 +486,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { + void RegisterNodeSignals(CNodeSignals& nodeSignals) + { + nodeSignals.GetHeight.connect(&GetHeight); +- nodeSignals.ProcessMessages.connect(&ProcessMessages); ++ nodeSignals.ProcessMessages.connect(0, &ProcessMessages, boost::signals2::at_back); + nodeSignals.SendMessages.connect(&SendMessages); + nodeSignals.InitializeNode.connect(&InitializeNode); + nodeSignals.FinalizeNode.connect(&FinalizeNode); diff --git a/src/net.cpp b/src/net.cpp index 3908be6..cf3ffd4 100644 --- a/src/net.cpp diff --git a/src/libbitcoind.cc b/src/libbitcoind.cc index cc7a9e7a..7258e0f0 100644 --- a/src/libbitcoind.cc +++ b/src/libbitcoind.cc @@ -278,8 +278,8 @@ NAN_METHOD(StartTxMon) { txmon_callback_available = true; CNodeSignals& nodeSignals = GetNodeSignals(); - nodeSignals.ProcessMessages.connect(&scan_messages, boost::signals2::at_front); - nodeSignals.ProcessMessages.connect(&scan_messages_after, boost::signals2::at_back); + nodeSignals.ProcessMessages.connect(0, &scan_messages, boost::signals2::at_front); + nodeSignals.ProcessMessages.connect(1, &scan_messages_after, boost::signals2::at_back); uv_async_init(uv_default_loop(), &txmon_async, tx_notifier); From dc390b9e9f5e5d56006071cc253ce1d5406098d3 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Wed, 23 Sep 2015 15:13:24 -0400 Subject: [PATCH 2/2] Tx Notifier - There is a new signal called TxToMemPool - This signal will connect to the bindings slot, queueTx - When a tx is accepted into the memory pool, the tx will be queued and the tx_notifier will be scheduled --- etc/bitcoin.patch | 33 ++++++--- src/libbitcoind.cc | 180 +++++++++------------------------------------ 2 files changed, 56 insertions(+), 157 deletions(-) diff --git a/etc/bitcoin.patch b/etc/bitcoin.patch index 53530f67..47a78285 100644 --- a/etc/bitcoin.patch +++ b/etc/bitcoin.patch @@ -382,18 +382,17 @@ index c65e842..0e44bb5 100644 ~CLevelDBWrapper(); diff --git a/src/main.cpp b/src/main.cpp -index 8f82abf..b3784a7 100644 +index 8f82abf..42bea1c 100644 --- a/src/main.cpp +++ b/src/main.cpp -@@ -486,7 +486,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { - void RegisterNodeSignals(CNodeSignals& nodeSignals) - { - nodeSignals.GetHeight.connect(&GetHeight); -- nodeSignals.ProcessMessages.connect(&ProcessMessages); -+ nodeSignals.ProcessMessages.connect(0, &ProcessMessages, boost::signals2::at_back); - nodeSignals.SendMessages.connect(&SendMessages); - nodeSignals.InitializeNode.connect(&InitializeNode); - nodeSignals.FinalizeNode.connect(&FinalizeNode); +@@ -1058,6 +1058,7 @@ bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransa + + // Store transaction in memory + pool.addUnchecked(hash, entry, !IsInitialBlockDownload()); ++ GetNodeSignals().TxToMemPool(tx); + } + + SyncWithWallets(tx, NULL); diff --git a/src/net.cpp b/src/net.cpp index 3908be6..cf3ffd4 100644 --- a/src/net.cpp @@ -408,3 +407,17 @@ index 3908be6..cf3ffd4 100644 - nLocalHostNonce, FormatSubVersion(CLIENT_NAME, CLIENT_VERSION, std::vector()), nBestHeight, true); + nLocalHostNonce, FormatSubVersion(CLIENT_NAME, CLIENT_VERSION, bitcore_node), nBestHeight, true); } + + +diff --git a/src/net.h b/src/net.h +index 17502b9..e181d68 100644 +--- a/src/net.h ++++ b/src/net.h +@@ -99,6 +99,7 @@ struct CNodeSignals + { + boost::signals2::signal GetHeight; + boost::signals2::signal ProcessMessages; ++ boost::signals2::signal TxToMemPool; + boost::signals2::signal SendMessages; + boost::signals2::signal InitializeNode; + boost::signals2::signal FinalizeNode; diff --git a/src/libbitcoind.cc b/src/libbitcoind.cc index 7258e0f0..6fefa711 100644 --- a/src/libbitcoind.cc +++ b/src/libbitcoind.cc @@ -81,10 +81,7 @@ static void async_get_tx_and_info_after(uv_work_t *req); static bool -scan_messages(CNode* pfrom); - -static bool -scan_messages_after(CNode* pfrom); +queueTx(const CTransaction&); extern "C" void init(Handle); @@ -93,7 +90,7 @@ init(Handle); * Private Global Variables * Used only by bitcoind functions. */ -static std::vector txmon_messages; +static std::vector txQueue; static uv_async_t txmon_async; static Eternal txmon_callback; static bool txmon_callback_available; @@ -278,8 +275,7 @@ NAN_METHOD(StartTxMon) { txmon_callback_available = true; CNodeSignals& nodeSignals = GetNodeSignals(); - nodeSignals.ProcessMessages.connect(0, &scan_messages, boost::signals2::at_front); - nodeSignals.ProcessMessages.connect(1, &scan_messages_after, boost::signals2::at_back); + nodeSignals.TxToMemPool.connect(&queueTx); uv_async_init(uv_default_loop(), &txmon_async, tx_notifier); @@ -291,133 +287,49 @@ tx_notifier(uv_async_t *handle) { Isolate* isolate = Isolate::GetCurrent(); HandleScope scope(isolate); - { + Local results = Array::New(isolate); + int arrayIndex = 0; - LOCK(cs_main); + LOCK(cs_main); + BOOST_FOREACH(const CTransaction& tx, txQueue) { - Local results = Array::New(isolate); - int arrayIndex = 0; + CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION); + ssTx << tx; + std::string stx = ssTx.str(); + Local txBuffer = node::Buffer::New(isolate, stx.c_str(), stx.size()); - BOOST_FOREACH(CDataStream& vRecvCopy, txmon_messages) { + uint256 hash = tx.GetHash(); - std::string vRecvStr = vRecvCopy.str(); + Local obj = NanNew(); - Local txBuffer = node::Buffer::New(isolate, vRecvStr.c_str(), vRecvStr.size()); - - CTransaction tx; - vRecvCopy >> tx; - uint256 hash = tx.GetHash(); - - Local obj = NanNew(); - - bool existsInMempool = false; - - CTransaction mtx; - - if (mempool.lookup(hash, mtx)) - { - existsInMempool = true; - } - - obj->Set(NanNew("buffer"), txBuffer); - obj->Set(NanNew("hash"), NanNew(hash.GetHex())); - obj->Set(NanNew("mempool"), NanNew(existsInMempool)); - - results->Set(arrayIndex, obj); - arrayIndex++; - } - - const unsigned argc = 1; - Local argv[argc] = { - Local::New(isolate, results) - }; - - Local cb = txmon_callback.Get(isolate); - - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - - txmon_messages.clear(); + obj->Set(NanNew("buffer"), txBuffer); + obj->Set(NanNew("hash"), NanNew(hash.GetHex())); + obj->Set(NanNew("mempool"), NanNew(true)); + results->Set(arrayIndex, obj); + arrayIndex++; } + const unsigned argc = 1; + Local argv[argc] = { + Local::New(isolate, results) + }; + + Local cb = txmon_callback.Get(isolate); + + cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); + + txQueue.clear(); + } - static bool -scan_messages_after(CNode* pfrom) { - if(txmon_messages.size() > 0) { - uv_async_send(&txmon_async); - } +queueTx(const CTransaction& tx) { + LOCK(cs_main); + txQueue.push_back(tx); + uv_async_send(&txmon_async); return true; } -static bool -scan_messages(CNode* pfrom) { - - bool fOk = true; - - std::deque::iterator it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { - // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= SendBufferSize()) { - break; - } - - // get next message - CNetMessage& msg = *it; - - // end, if an incomplete message is found - if (!msg.complete()) { - break; - } - - // at this point, any failure means we can delete the current message - it++; - - // Scan for message start - if (memcmp(msg.hdr.pchMessageStart, Params().MessageStart(), MESSAGE_START_SIZE) != 0) { - fOk = false; - break; - } - - // Read header - CMessageHeader& hdr = msg.hdr; - if (!hdr.IsValid(Params().MessageStart())) { - continue; - } - - std::string strCommand = hdr.GetCommand(); - - if (strCommand == (std::string)"tx") { - - // Message size - unsigned int nMessageSize = hdr.nMessageSize; - - // Checksum - CDataStream& vRecv = msg.vRecv; - uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); - unsigned int nChecksum = 0; - memcpy(&nChecksum, &hash, sizeof(nChecksum)); - if (nChecksum != hdr.nChecksum) { - continue; - } - - // Copy the stream so that it can later be processed into the mempool - CDataStream vRecvCopy(vRecv.begin(), vRecv.end(), vRecv.GetType(), vRecv.GetVersion()); - - { - LOCK(cs_main); - txmon_messages.push_back(vRecvCopy); - } - - } - - boost::this_thread::interruption_point(); - break; - } - - return fOk; -} - /** * Functions */ @@ -1556,32 +1468,6 @@ NAN_METHOD(SendTransaction) { // Relay the transaction connect peers RelayTransaction(tx); - // Notify any listeners about the transaction - if(txmon_callback_available) { - - Local results = Array::New(isolate); - Local obj = NanNew(); - - CDataStream ssTx(SER_NETWORK, PROTOCOL_VERSION); - ssTx << tx; - std::string stx = ssTx.str(); - Local txBuffer = node::Buffer::New(isolate, stx.c_str(), stx.size()); - - obj->Set(NanNew("buffer"), txBuffer); - obj->Set(NanNew("hash"), NanNew(hashTx.GetHex())); - obj->Set(NanNew("mempool"), NanNew(true)); - - results->Set(0, obj); - - const unsigned argc = 1; - Local argv[argc] = { - Local::New(isolate, results) - }; - Local cb = txmon_callback.Get(isolate); - - cb->Call(isolate->GetCurrentContext()->Global(), argc, argv); - } - NanReturnValue(Local::New(isolate, NanNew(hashTx.GetHex()))); }