From 6209300e35597f84bec67f6ba819f22e9e754c82 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 5 Nov 2018 16:13:26 -0400 Subject: [PATCH] Use session.spawn() Fixes #632 properly Requires aiorpcX 0.10.x --- electrumx/server/controller.py | 4 ++-- electrumx/server/session.py | 27 ++++++++++++--------------- setup.py | 2 +- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 5e2dfbb..35dec33 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -82,8 +82,8 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - if not (0, 9, 1) <= aiorpcx_version < (0, 10): - raise RuntimeError('aiorpcX version 0.9.x with x >= 1 required') + if not (0, 10, 0) <= aiorpcx_version < (0, 11): + raise RuntimeError('aiorpcX version 0.10.x required') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 35c0d96..1e6d967 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -262,9 +262,8 @@ class SessionManager(object): for session in stale_sessions) self.logger.info(f'closing stale connections {text}') # Give the sockets some time to close gracefully - async with TaskGroup() as group: - for session in stale_sessions: - await group.spawn(session.close()) + for session in stale_sessions: + await session.spawn(session.close()) # Consolidate small groups bw_limit = self.env.bandwidth_limit @@ -512,9 +511,8 @@ class SessionManager(object): finally: # Close servers and sessions await self._close_servers(list(self.servers.keys())) - async with TaskGroup() as group: - for session in list(self.sessions): - await group.spawn(session.close(force_after=1)) + for session in self.sessions: + await session.spawn(session.close(force_after=1)) def session_count(self): '''The number of connections that we've sent something to.''' @@ -567,9 +565,8 @@ class SessionManager(object): for hashX in set(hc).intersection(touched): del hc[hashX] - async with TaskGroup() as group: - for session in self.sessions: - await group.spawn(session.notify(touched, height_changed)) + for session in self.sessions: + await session.spawn(session.notify, touched, height_changed) def add_session(self, session): self.sessions.add(session) @@ -649,7 +646,7 @@ class SessionBase(RPCSession): status += 'C' if self.log_me: status += 'L' - status += str(self.concurrency.max_concurrent) + status += str(self._concurrency.max_concurrent) return status def connection_made(self, transport): @@ -664,12 +661,11 @@ class SessionBase(RPCSession): def connection_lost(self, exc): '''Handle client disconnection.''' - super().connection_lost(exc) self.session_mgr.remove_session(self) msg = '' - if not self.can_send.is_set(): - msg += ' whilst paused' - if self.concurrency.max_concurrent != self.max_concurrent: + if not self._can_send.is_set(): + msg += ' with full socket buffer' + if self._concurrency.max_concurrent != self.max_concurrent: msg += ' whilst throttled' if self.send_size >= 1024*1024: msg += ('. Sent {:,d} bytes in {:,d} messages' @@ -677,12 +673,13 @@ class SessionBase(RPCSession): if msg: msg = 'disconnected' + msg self.logger.info(msg) + super().connection_lost(exc) def count_pending_items(self): return len(self.connection.pending_requests()) def semaphore(self): - return Semaphores([self.concurrency.semaphore, self.group.semaphore]) + return Semaphores([self._concurrency.semaphore, self.group.semaphore]) def sub_count(self): return 0 diff --git a/setup.py b/setup.py index 98b784a..897f4e7 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setuptools.setup( # "blake256" package is required to sync Decred network. # "xevan_hash" package is required to sync Xuez network. # "groestlcoin_hash" package is required to sync Groestlcoin network. - install_requires=['aiorpcX>=0.9.1,<0.10', 'attrs', + install_requires=['aiorpcX>=0.10.0,<0.11', 'attrs', 'plyvel', 'pylru', 'aiohttp >= 2'], packages=setuptools.find_packages(include=('electrumx*',)), description='ElectrumX Server',