parent
e61490a5ba
commit
7126864052
@ -125,9 +125,8 @@ class Controller(ServerBase):
|
|||||||
return self.mempool.value(hashX)
|
return self.mempool.value(hashX)
|
||||||
|
|
||||||
def sent_tx(self, tx_hash):
|
def sent_tx(self, tx_hash):
|
||||||
'''Call when a TX is sent. Tells mempool to prioritize it.'''
|
'''Call when a TX is sent.'''
|
||||||
self.txs_sent += 1
|
self.txs_sent += 1
|
||||||
self.mempool.prioritize(tx_hash)
|
|
||||||
|
|
||||||
def setup_bands(self):
|
def setup_bands(self):
|
||||||
bands = []
|
bands = []
|
||||||
@ -206,14 +205,15 @@ class Controller(ServerBase):
|
|||||||
self.next_log_sessions = time.time() + self.env.log_sessions
|
self.next_log_sessions = time.time() + self.env.log_sessions
|
||||||
|
|
||||||
async def wait_for_bp_catchup(self):
|
async def wait_for_bp_catchup(self):
|
||||||
'''Wait for the block processor to catch up, then kick off server
|
'''Wait for the block processor to catch up, and for the mempool to
|
||||||
background processes.'''
|
synchronize, then kick off server background processes.'''
|
||||||
await self.bp.caught_up_event.wait()
|
await self.bp.caught_up_event.wait()
|
||||||
self.logger.info('block processor has caught up')
|
self.logger.info('block processor has caught up')
|
||||||
|
self.ensure_future(self.mempool.main_loop())
|
||||||
|
await self.mempool.synchronized_event.wait()
|
||||||
self.ensure_future(self.peer_mgr.main_loop())
|
self.ensure_future(self.peer_mgr.main_loop())
|
||||||
self.ensure_future(self.log_start_external_servers())
|
self.ensure_future(self.log_start_external_servers())
|
||||||
self.ensure_future(self.housekeeping())
|
self.ensure_future(self.housekeeping())
|
||||||
self.ensure_future(self.mempool.main_loop())
|
|
||||||
|
|
||||||
def close_servers(self, kinds):
|
def close_servers(self, kinds):
|
||||||
'''Close the servers of the given kinds (TCP etc.).'''
|
'''Close the servers of the given kinds (TCP etc.).'''
|
||||||
|
|||||||
@ -38,15 +38,10 @@ class MemPool(util.LoggedClass):
|
|||||||
self.coin = bp.coin
|
self.coin = bp.coin
|
||||||
self.db = bp
|
self.db = bp
|
||||||
self.touched = set()
|
self.touched = set()
|
||||||
self.prioritized = set()
|
|
||||||
self.stop = False
|
self.stop = False
|
||||||
self.txs = {}
|
self.txs = {}
|
||||||
self.hashXs = defaultdict(set) # None can be a key
|
self.hashXs = defaultdict(set) # None can be a key
|
||||||
|
self.synchronized_event = asyncio.Event()
|
||||||
def prioritize(self, tx_hash):
|
|
||||||
'''Prioritize processing the given hash. This is important during
|
|
||||||
initial mempool sync.'''
|
|
||||||
self.prioritized.add(tx_hash)
|
|
||||||
|
|
||||||
def _resync_daemon_hashes(self, unprocessed, unfetched):
|
def _resync_daemon_hashes(self, unprocessed, unfetched):
|
||||||
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
|
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
|
||||||
@ -91,9 +86,9 @@ class MemPool(util.LoggedClass):
|
|||||||
fetch_size = 800
|
fetch_size = 800
|
||||||
process_some = self._async_process_some(fetch_size // 2)
|
process_some = self._async_process_some(fetch_size // 2)
|
||||||
|
|
||||||
await self.daemon.mempool_refresh_event.wait()
|
|
||||||
self.logger.info('beginning processing of daemon mempool. '
|
self.logger.info('beginning processing of daemon mempool. '
|
||||||
'This can take some time...')
|
'This can take some time...')
|
||||||
|
await self.daemon.mempool_refresh_event.wait()
|
||||||
next_log = 0
|
next_log = 0
|
||||||
loops = -1 # Zero during initial catchup
|
loops = -1 # Zero during initial catchup
|
||||||
|
|
||||||
@ -111,6 +106,8 @@ class MemPool(util.LoggedClass):
|
|||||||
'({:,d} txs left)'.format(pct, todo))
|
'({:,d} txs left)'.format(pct, todo))
|
||||||
if not todo:
|
if not todo:
|
||||||
loops += 1
|
loops += 1
|
||||||
|
if loops > 0:
|
||||||
|
self.synchronized_event.set()
|
||||||
now = time.time()
|
now = time.time()
|
||||||
if now >= next_log and loops:
|
if now >= next_log and loops:
|
||||||
self.logger.info('{:,d} txs touching {:,d} addresses'
|
self.logger.info('{:,d} txs touching {:,d} addresses'
|
||||||
@ -119,7 +116,6 @@ class MemPool(util.LoggedClass):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
if not todo:
|
if not todo:
|
||||||
self.prioritized.clear()
|
|
||||||
await self.daemon.mempool_refresh_event.wait()
|
await self.daemon.mempool_refresh_event.wait()
|
||||||
|
|
||||||
self._resync_daemon_hashes(unprocessed, unfetched)
|
self._resync_daemon_hashes(unprocessed, unfetched)
|
||||||
@ -148,10 +144,6 @@ class MemPool(util.LoggedClass):
|
|||||||
|
|
||||||
raw_txs = {}
|
raw_txs = {}
|
||||||
|
|
||||||
for hex_hash in self.prioritized:
|
|
||||||
if hex_hash in unprocessed:
|
|
||||||
raw_txs[hex_hash] = unprocessed.pop(hex_hash)
|
|
||||||
|
|
||||||
while unprocessed and len(raw_txs) < limit:
|
while unprocessed and len(raw_txs) < limit:
|
||||||
hex_hash, raw_tx = unprocessed.popitem()
|
hex_hash, raw_tx = unprocessed.popitem()
|
||||||
raw_txs[hex_hash] = raw_tx
|
raw_txs[hex_hash] = raw_tx
|
||||||
@ -213,7 +205,7 @@ class MemPool(util.LoggedClass):
|
|||||||
db_utxo_lookup = self.db.db_utxo_lookup
|
db_utxo_lookup = self.db.db_utxo_lookup
|
||||||
txs = self.txs
|
txs = self.txs
|
||||||
|
|
||||||
# Deserialize each tx and put it in our priority queue
|
# Deserialize each tx and put it in a pending list
|
||||||
for tx_hash, raw_tx in raw_tx_map.items():
|
for tx_hash, raw_tx in raw_tx_map.items():
|
||||||
if tx_hash not in txs:
|
if tx_hash not in txs:
|
||||||
continue
|
continue
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user