Merge branch 'develop'

This commit is contained in:
Neil Booth 2016-12-15 12:08:39 +09:00
commit 83f996c7c1
6 changed files with 109 additions and 102 deletions

View File

@ -1,3 +1,16 @@
version 0.9.8
-------------
- cleanup up mempool handling, notify of addresses only once when a new block
comes in. Fixes issue 70.
version 0.9.7
-------------
- history and UTXO requests are now processed by the executor, i.e.,
properly asynchronously. This was the last of the potential latency
bottlenecks.
version 0.9.6 version 0.9.6
------------- -------------

View File

@ -85,9 +85,13 @@ class Prefetcher(LoggedClass):
async def _prefetch(self): async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.''' '''Prefetch blocks unless the prefetch queue is full.'''
# Refresh the mempool after updating the daemon height, if and
# only if we've caught up
daemon_height = await self.daemon.height() daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size if self.caught_up:
await self.daemon.refresh_mempool_hashes()
cache_room = self.target_cache_size // self.ave_size
with await self.semaphore: with await self.semaphore:
# Try and catch up all blocks but limit to room in cache. # Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless # Constrain count to between 0 and 4000 regardless
@ -132,7 +136,7 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env, touched, touched_event): def __init__(self, env):
super().__init__(env) super().__init__(env)
# The block processor reads its tasks from this queue # The block processor reads its tasks from this queue
@ -149,8 +153,6 @@ class BlockProcessor(server.db.DB):
self.caught_up = False self.caught_up = False
self._shutdown = False self._shutdown = False
self.event = asyncio.Event() self.event = asyncio.Event()
self.touched = touched
self.touched_event = touched_event
# Meta # Meta
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
@ -180,7 +182,7 @@ class BlockProcessor(server.db.DB):
self.logger.info('flushing history cache at {:,d} MB' self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB)) .format(self.hist_MB))
async def main_loop(self): async def main_loop(self, touched):
'''Main loop for block processing.''' '''Main loop for block processing.'''
# Simulate a reorg if requested # Simulate a reorg if requested
@ -195,7 +197,7 @@ class BlockProcessor(server.db.DB):
break break
blocks = self.prefetcher.get_blocks() blocks = self.prefetcher.get_blocks()
if blocks: if blocks:
await self.advance_blocks(blocks) await self.advance_blocks(blocks, touched)
elif not self.caught_up: elif not self.caught_up:
self.caught_up = True self.caught_up = True
self.first_caught_up() self.first_caught_up()
@ -209,7 +211,7 @@ class BlockProcessor(server.db.DB):
self._shutdown = True self._shutdown = True
self.tasks.put_nowait(None) self.tasks.put_nowait(None)
async def advance_blocks(self, blocks): async def advance_blocks(self, blocks, touched):
'''Strip the unspendable genesis coinbase.''' '''Strip the unspendable genesis coinbase.'''
if self.height == -1: if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
@ -218,7 +220,7 @@ class BlockProcessor(server.db.DB):
for block in blocks: for block in blocks:
if self._shutdown: if self._shutdown:
break break
self.advance_block(block, self.touched) self.advance_block(block, touched)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
@ -227,14 +229,13 @@ class BlockProcessor(server.db.DB):
else: else:
do_it() do_it()
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg(self.touched) await self.handle_chain_reorg(touched)
if self.caught_up: if self.caught_up:
# Flush everything as queries are performed on the DB and # Flush everything as queries are performed on the DB and
# not in-memory. # not in-memory.
await asyncio.sleep(0) await asyncio.sleep(0)
self.flush(True) self.flush(True)
self.touched_event.set()
elif time.time() > self.next_cache_check: elif time.time() > self.next_cache_check:
self.check_cache_size() self.check_cache_size()
self.next_cache_check = time.time() + 60 self.next_cache_check = time.time() + 60

View File

@ -36,6 +36,8 @@ class Daemon(util.LoggedClass):
self.urls = urls self.urls = urls
self.url_index = 0 self.url_index = 0
self._height = None self._height = None
self.mempool_hashes = set()
self.mempool_refresh_event = asyncio.Event()
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10) self.workqueue_semaphore = asyncio.Semaphore(value=10)
@ -148,9 +150,10 @@ class Daemon(util.LoggedClass):
# Convert hex string to bytes # Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks] return [bytes.fromhex(block) for block in blocks]
async def mempool_hashes(self): async def refresh_mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.''' '''Update our record of the daemon's mempool hashes.'''
return await self._send_single('getrawmempool') self.mempool_hashes = set(await self._send_single('getrawmempool'))
self.mempool_refresh_event.set()
async def estimatefee(self, params): async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.''' '''Return the fee estimate for the given parameters.'''

View File

@ -33,87 +33,31 @@ class MemPool(util.LoggedClass):
A pair is a (hash168, value) tuple. tx hashes are hex strings. A pair is a (hash168, value) tuple. tx hashes are hex strings.
''' '''
def __init__(self, daemon, coin, db, touched, touched_event): def __init__(self, daemon, coin, db):
super().__init__() super().__init__()
self.daemon = daemon self.daemon = daemon
self.coin = coin self.coin = coin
self.db = db self.db = db
self.touched = touched self.touched = set()
self.touched_event = touched_event self.touched_event = asyncio.Event()
self.stop = False self.stop = False
self.txs = {} self.txs = {}
self.hash168s = defaultdict(set) # None can be a key self.hash168s = defaultdict(set) # None can be a key
async def main_loop(self, caught_up): def resync_daemon_hashes(self, unprocessed, unfetched):
'''Asynchronously maintain mempool status with daemon. '''Re-sync self.txs with the list of hashes in the daemon's mempool.
Waits until the caught up event is signalled.''' Additionally, remove gone hashes from unprocessed and
await caught_up.wait() unfetched. Add new ones to unprocessed.
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
try:
await self.fetch_and_process()
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
async def fetch_and_process(self):
'''The inner loop unprotected by try / except.'''
unfetched = set()
unprocessed = {}
log_every = 150
log_secs = 0
fetch_size = 400
process_some = self.async_process_some(unfetched, fetch_size // 2)
next_refresh = 0
# The list of mempool hashes is fetched no more frequently
# than this number of seconds
refresh_secs = 5
while True:
try:
now = time.time()
if now >= next_refresh:
await self.new_hashes(unprocessed, unfetched)
next_refresh = now + refresh_secs
log_secs -= refresh_secs
# Fetch some txs if unfetched ones remain
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
# Process some txs if unprocessed ones remain
if unprocessed:
await process_some(unprocessed)
if self.touched:
self.touched_event.set()
if not unprocessed:
if log_secs <= 0:
log_secs = log_every
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs),
len(self.hash168s)))
await asyncio.sleep(1)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
async def new_hashes(self, unprocessed, unfetched):
'''Get the current list of hashes in the daemon's mempool.
Remove ones that have disappeared from self.txs and unprocessed.
''' '''
txs = self.txs txs = self.txs
hash168s = self.hash168s hash168s = self.hash168s
touched = self.touched touched = self.touched
hashes = set(await self.daemon.mempool_hashes()) hashes = self.daemon.mempool_hashes
new = hashes.difference(txs)
gone = set(txs).difference(hashes) gone = set(txs).difference(hashes)
for hex_hash in gone: for hex_hash in gone:
unfetched.discard(hex_hash)
unprocessed.pop(hex_hash, None) unprocessed.pop(hex_hash, None)
item = txs.pop(hex_hash) item = txs.pop(hex_hash)
if item: if item:
@ -126,18 +70,71 @@ class MemPool(util.LoggedClass):
del hash168s[hash168] del hash168s[hash168]
touched.update(tx_hash168s) touched.update(tx_hash168s)
new = hashes.difference(txs)
unfetched.update(new) unfetched.update(new)
for hex_hash in new: for hex_hash in new:
txs[hex_hash] = None txs[hex_hash] = None
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.
Processes the mempool each time the daemon's mempool refresh
event is signalled.
'''
unprocessed = {}
unfetched = set()
txs = self.txs
fetch_size = 400
process_some = self.async_process_some(unfetched, fetch_size // 2)
await self.daemon.mempool_refresh_event.wait()
self.logger.info ('beginning processing of daemon mempool. '
'This can take some time...')
next_log = time.time() + 0.1
while True:
try:
todo = len(unfetched) + len(unprocessed)
if todo:
pct = (len(txs) - todo) * 100 // len(txs) if txs else 0
self.logger.info('catchup {:d}% complete ({:,d} txs left)'
.format(pct, todo))
else:
now = time.time()
if now >= next_log:
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hash168s)))
next_log = now + 150
await self.daemon.mempool_refresh_event.wait()
self.resync_daemon_hashes(unprocessed, unfetched)
self.daemon.mempool_refresh_event.clear()
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
if unprocessed:
await process_some(unprocessed)
# Avoid double notifications if processing a block
if self.touched and not self.processing_new_block():
self.touched_event.set()
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
break
def async_process_some(self, unfetched, limit): def async_process_some(self, unfetched, limit):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
pending = [] pending = []
txs = self.txs txs = self.txs
first = True
async def process(unprocessed): async def process(unprocessed):
nonlocal first, pending nonlocal pending
raw_txs = {} raw_txs = {}
while unprocessed and len(raw_txs) < limit: while unprocessed and len(raw_txs) < limit:
@ -164,16 +161,12 @@ class MemPool(util.LoggedClass):
touched.add(hash168) touched.add(hash168)
hash168s[hash168].add(hex_hash) hash168s[hash168].add(hex_hash)
to_do = len(unfetched) + len(unprocessed)
if to_do and txs:
percent = max(0, len(txs) - to_do) * 100 // len(txs)
self.logger.info('catchup {:d}% complete'.format(percent))
elif first:
first = False
self.logger.info('caught up')
return process return process
def processing_new_block(self):
'''Return True if we're processing a new block.'''
return self.daemon.cached_height() > self.db.db_height
async def fetch_raw_txs(self, hex_hashes): async def fetch_raw_txs(self, hex_hashes):
'''Fetch a list of mempool transactions.''' '''Fetch a list of mempool transactions.'''
raw_txs = await self.daemon.getrawtransactions(hex_hashes) raw_txs = await self.daemon.getrawtransactions(hex_hashes)

View File

@ -53,12 +53,9 @@ class ServerManager(util.LoggedClass):
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.touched = set()
self.touched_event = asyncio.Event()
self.start = time.time() self.start = time.time()
self.bp = BlockProcessor(env, self.touched, self.touched_event) self.bp = BlockProcessor(env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self.mempool = MemPool(self.bp.daemon, env.coin, self.bp)
self.touched, self.touched_event)
self.irc = IRC(env) self.irc = IRC(env)
self.env = env self.env = env
self.servers = {} self.servers = {}
@ -178,11 +175,11 @@ class ServerManager(util.LoggedClass):
self.futures.append(asyncio.ensure_future(coro)) self.futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first # shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop()) add_future(self.bp.main_loop(self.mempool.touched))
add_future(self.bp.prefetcher.main_loop()) add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.event)) add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event)) add_future(self.start_servers(self.bp.event))
add_future(self.mempool.main_loop(self.bp.event)) add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions()) add_future(self.enqueue_delayed_sessions())
add_future(self.notify()) add_future(self.notify())
for n in range(4): for n in range(4):
@ -245,10 +242,10 @@ class ServerManager(util.LoggedClass):
async def notify(self): async def notify(self):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
while True: while True:
await self.touched_event.wait() await self.mempool.touched_event.wait()
touched = self.touched.copy() touched = self.mempool.touched.copy()
self.touched.clear() self.mempool.touched.clear()
self.touched_event.clear() self.mempool.touched_event.clear()
# Invalidate caches # Invalidate caches
hc = self.history_cache hc = self.history_cache
@ -589,9 +586,9 @@ class Session(JSONRPC):
def enqueue_request(self, request): def enqueue_request(self, request):
'''Add a request to the session's list.''' '''Add a request to the session's list.'''
if not self.requests:
self.manager.enqueue_session(self)
self.requests.append(request) self.requests.append(request)
if len(self.requests) == 1:
self.manager.enqueue_session(self)
async def serve_requests(self): async def serve_requests(self):
'''Serve requests in batches.''' '''Serve requests in batches.'''

View File

@ -1 +1 @@
VERSION = "ElectrumX 0.9.6" VERSION = "ElectrumX 0.9.8"