Merge branch 'better-notifications' into devel
This commit is contained in:
commit
e8a025e428
@ -129,8 +129,8 @@ class SessionManager(object):
|
||||
self.state = self.CATCHING_UP
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self._history_cache = pylru.lrucache(256)
|
||||
self._hc_height = 0
|
||||
self.history_cache = pylru.lrucache(256)
|
||||
self.notified_height = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
# Masternode stuff only for such coins
|
||||
@ -332,6 +332,19 @@ class SessionManager(object):
|
||||
])
|
||||
return result
|
||||
|
||||
async def _electrum_and_raw_headers(self, height):
|
||||
raw_header = await self.raw_header(height)
|
||||
electrum_header = self.env.coin.electrum_header(raw_header, height)
|
||||
return electrum_header, raw_header
|
||||
|
||||
async def _refresh_hsub_results(self, height):
|
||||
'''Refresh the cached header subscription responses to be for height,
|
||||
and record that as notified_height.
|
||||
'''
|
||||
electrum, raw = await self._electrum_and_raw_headers(height)
|
||||
self.hsub_results = (electrum, {'hex': raw.hex(), 'height': height})
|
||||
self.notified_height = height
|
||||
|
||||
# --- LocalRPC command handlers
|
||||
|
||||
async def rpc_add_peer(self, real_name):
|
||||
@ -508,6 +521,19 @@ class SessionManager(object):
|
||||
except DaemonError as e:
|
||||
raise RPCError(DAEMON_ERROR, f'daemon error: {e!r}') from None
|
||||
|
||||
async def raw_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
try:
|
||||
return await self.db.raw_header(height)
|
||||
except IndexError:
|
||||
raise RPCError(BAD_REQUEST, f'height {height:,d} '
|
||||
'out of range') from None
|
||||
|
||||
async def electrum_header(self, height):
|
||||
'''Return the deserialized header at the given height.'''
|
||||
electrum_header, _ = await self._electrum_and_raw_headers(height)
|
||||
return electrum_header
|
||||
|
||||
async def broadcast_transaction(self, raw_tx):
|
||||
hex_hash = await self.daemon.sendrawtransaction([raw_tx])
|
||||
self.txs_sent += 1
|
||||
@ -515,7 +541,7 @@ class SessionManager(object):
|
||||
|
||||
async def limited_history(self, hashX):
|
||||
'''A caching layer.'''
|
||||
hc = self._history_cache
|
||||
hc = self.history_cache
|
||||
if hashX not in hc:
|
||||
# History DoS limit. Each element of history is about 99
|
||||
# bytes when encoded as JSON. This limits resource usage
|
||||
@ -527,16 +553,18 @@ class SessionManager(object):
|
||||
|
||||
async def _notify_sessions(self, height, touched):
|
||||
'''Notify sessions about height changes and touched addresses.'''
|
||||
# Invalidate our history cache for touched hashXs
|
||||
if height != self._hc_height:
|
||||
self._hc_height = height
|
||||
hc = self._history_cache
|
||||
height_changed = height != self.notified_height
|
||||
if height_changed:
|
||||
# Paranoia: a reorg could race and leave db_height lower
|
||||
await self._refresh_hsub_results(min(height, self.db.db_height))
|
||||
# Invalidate our history cache for touched hashXs
|
||||
hc = self.history_cache
|
||||
for hashX in set(hc).intersection(touched):
|
||||
del hc[hashX]
|
||||
|
||||
async with TaskGroup() as group:
|
||||
for session in self.sessions:
|
||||
await group.spawn(session.notify(height, touched))
|
||||
await group.spawn(session.notify(touched, height_changed))
|
||||
|
||||
def add_session(self, session):
|
||||
self.sessions.add(session)
|
||||
@ -597,7 +625,7 @@ class SessionBase(ServerSession):
|
||||
self._receive_message_orig = self.connection.receive_message
|
||||
self.connection.receive_message = self.receive_message
|
||||
|
||||
async def notify(self, height, touched):
|
||||
async def notify(self, touched, height_changed):
|
||||
pass
|
||||
|
||||
def peer_address_str(self, *, for_log=True):
|
||||
@ -682,7 +710,6 @@ class ElectrumX(SessionBase):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.subscribe_headers = False
|
||||
self.subscribe_headers_raw = False
|
||||
self.notified_height = None
|
||||
self.connection.max_response_size = self.env.max_send
|
||||
self.max_subs = self.env.max_session_subs
|
||||
self.hashX_subs = {}
|
||||
@ -723,86 +750,55 @@ class ElectrumX(SessionBase):
|
||||
def sub_count(self):
|
||||
return len(self.hashX_subs)
|
||||
|
||||
async def notify_touched(self, our_touched):
|
||||
changed = {}
|
||||
|
||||
for hashX in our_touched:
|
||||
alias = self.hashX_subs[hashX]
|
||||
status = await self.address_status(hashX)
|
||||
changed[alias] = status
|
||||
|
||||
# Check mempool hashXs - the status is a function of the
|
||||
# confirmed state of other transactions. Note: we cannot
|
||||
# iterate over mempool_statuses as it changes size.
|
||||
for hashX in tuple(self.mempool_statuses):
|
||||
# Items can be evicted whilst await-ing below; False
|
||||
# ensures such hashXs are notified
|
||||
old_status = self.mempool_statuses.get(hashX, False)
|
||||
status = await self.address_status(hashX)
|
||||
if status != old_status:
|
||||
alias = self.hashX_subs[hashX]
|
||||
changed[alias] = status
|
||||
|
||||
for alias, status in changed.items():
|
||||
if len(alias) == 64:
|
||||
method = 'blockchain.scripthash.subscribe'
|
||||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
await self.send_notification(method, (alias, status))
|
||||
|
||||
if changed:
|
||||
es = '' if len(changed) == 1 else 'es'
|
||||
self.logger.info('notified of {:,d} address{}'
|
||||
.format(len(changed), es))
|
||||
|
||||
async def notify(self, height, touched):
|
||||
async def notify(self, touched, height_changed):
|
||||
'''Notify the client about changes to touched addresses (from mempool
|
||||
updates or new blocks) and height.
|
||||
|
||||
Return the set of addresses the session needs to be
|
||||
asyncronously notified about. This can be empty if there are
|
||||
possible mempool status updates.
|
||||
|
||||
Returns None if nothing needs to be notified asynchronously.
|
||||
'''
|
||||
height_changed = height != self.notified_height
|
||||
if height_changed:
|
||||
self.notified_height = height
|
||||
if self.subscribe_headers:
|
||||
args = (await self.subscribe_headers_result(height), )
|
||||
await self.send_notification('blockchain.headers.subscribe',
|
||||
args)
|
||||
if height_changed and self.subscribe_headers:
|
||||
args = (await self.subscribe_headers_result(), )
|
||||
await self.send_notification('blockchain.headers.subscribe', args)
|
||||
|
||||
touched = touched.intersection(self.hashX_subs)
|
||||
if touched or (height_changed and self.mempool_statuses):
|
||||
await self.notify_touched(touched)
|
||||
changed = {}
|
||||
|
||||
async def raw_header(self, height):
|
||||
'''Return the binary header at the given height.'''
|
||||
try:
|
||||
return await self.db.raw_header(height)
|
||||
except IndexError:
|
||||
raise RPCError(BAD_REQUEST, f'height {height:,d} '
|
||||
'out of range') from None
|
||||
for hashX in touched:
|
||||
alias = self.hashX_subs[hashX]
|
||||
status = await self.address_status(hashX)
|
||||
changed[alias] = status
|
||||
|
||||
async def electrum_header(self, height):
|
||||
'''Return the deserialized header at the given height.'''
|
||||
raw_header = await self.raw_header(height)
|
||||
return self.coin.electrum_header(raw_header, height)
|
||||
# Check mempool hashXs - the status is a function of the
|
||||
# confirmed state of other transactions. Note: we cannot
|
||||
# iterate over mempool_statuses as it changes size.
|
||||
for hashX in tuple(self.mempool_statuses):
|
||||
# Items can be evicted whilst await-ing status; False
|
||||
# ensures such hashXs are notified
|
||||
old_status = self.mempool_statuses.get(hashX, False)
|
||||
status = await self.address_status(hashX)
|
||||
if status != old_status:
|
||||
alias = self.hashX_subs[hashX]
|
||||
changed[alias] = status
|
||||
|
||||
async def subscribe_headers_result(self, height):
|
||||
'''The result of a header subscription for the given height.'''
|
||||
if self.subscribe_headers_raw:
|
||||
raw_header = await self.raw_header(height)
|
||||
return {'hex': raw_header.hex(), 'height': height}
|
||||
return await self.electrum_header(height)
|
||||
for alias, status in changed.items():
|
||||
if len(alias) == 64:
|
||||
method = 'blockchain.scripthash.subscribe'
|
||||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
await self.send_notification(method, (alias, status))
|
||||
|
||||
if changed:
|
||||
es = '' if len(changed) == 1 else 'es'
|
||||
self.logger.info(f'notified of {len(changed):,d} address{es}')
|
||||
|
||||
async def subscribe_headers_result(self):
|
||||
'''The result of a header subscription or notification.'''
|
||||
return self.session_mgr.hsub_results[self.subscribe_headers_raw]
|
||||
|
||||
async def _headers_subscribe(self, raw):
|
||||
'''Subscribe to get headers of new blocks.'''
|
||||
self.subscribe_headers = True
|
||||
self.subscribe_headers_raw = assert_boolean(raw)
|
||||
self.notified_height = self.db.db_height
|
||||
return await self.subscribe_headers_result(self.notified_height)
|
||||
self.subscribe_headers = True
|
||||
return await self.subscribe_headers_result()
|
||||
|
||||
async def headers_subscribe(self):
|
||||
'''Subscribe to get raw headers of new blocks.'''
|
||||
@ -978,7 +974,7 @@ class ElectrumX(SessionBase):
|
||||
dictionary with a merkle proof.'''
|
||||
height = non_negative_integer(height)
|
||||
cp_height = non_negative_integer(cp_height)
|
||||
raw_header_hex = (await self.raw_header(height)).hex()
|
||||
raw_header_hex = (await self.session_mgr.raw_header(height)).hex()
|
||||
if cp_height == 0:
|
||||
return raw_header_hex
|
||||
result = {'header': raw_header_hex}
|
||||
@ -1029,7 +1025,7 @@ class ElectrumX(SessionBase):
|
||||
|
||||
height: the header's height'''
|
||||
height = non_negative_integer(height)
|
||||
return await self.electrum_header(height)
|
||||
return await self.session_mgr.electrum_header(height)
|
||||
|
||||
def is_tor(self):
|
||||
'''Try to detect if the connection is to a tor hidden service we are
|
||||
@ -1313,9 +1309,9 @@ class DashElectrumX(ElectrumX):
|
||||
'masternode.list': self.masternode_list
|
||||
})
|
||||
|
||||
async def notify(self, height, touched):
|
||||
async def notify(self, touched, height_changed):
|
||||
'''Notify the client about changes in masternode list.'''
|
||||
await super().notify(height, touched)
|
||||
await super().notify(touched, height_changed)
|
||||
for mn in self.mns:
|
||||
status = await self.daemon_request('masternode_list',
|
||||
['status', mn])
|
||||
|
||||
Loading…
Reference in New Issue
Block a user