Have peers.py use its own task group
This commit is contained in:
parent
9c5d59e997
commit
9b4276c68d
@ -16,8 +16,8 @@ from collections import defaultdict, Counter
|
|||||||
|
|
||||||
from aiorpcx import (ClientSession, SOCKSProxy,
|
from aiorpcx import (ClientSession, SOCKSProxy,
|
||||||
Notification, handler_invocation,
|
Notification, handler_invocation,
|
||||||
SOCKSError, RPCError, TaskTimeout,
|
SOCKSError, RPCError, TaskTimeout, TaskGroup, Event,
|
||||||
TaskGroup, run_in_thread, ignore_after, timeout_after)
|
sleep, run_in_thread, ignore_after, timeout_after)
|
||||||
|
|
||||||
from electrumx.lib.peer import Peer
|
from electrumx.lib.peer import Peer
|
||||||
from electrumx.lib.util import class_logger, protocol_tuple
|
from electrumx.lib.util import class_logger, protocol_tuple
|
||||||
@ -74,7 +74,7 @@ class PeerManager(object):
|
|||||||
self.peers = set()
|
self.peers = set()
|
||||||
self.permit_onion_peer_time = time.time()
|
self.permit_onion_peer_time = time.time()
|
||||||
self.proxy = None
|
self.proxy = None
|
||||||
self.task_group = None
|
self.group = TaskGroup()
|
||||||
|
|
||||||
def _my_clearnet_peer(self):
|
def _my_clearnet_peer(self):
|
||||||
'''Returns the clearnet peer representing this server, if any.'''
|
'''Returns the clearnet peer representing this server, if any.'''
|
||||||
@ -150,7 +150,7 @@ class PeerManager(object):
|
|||||||
self.logger.info(f'detected {proxy}')
|
self.logger.info(f'detected {proxy}')
|
||||||
return
|
return
|
||||||
self.logger.info('no proxy detected, will try later')
|
self.logger.info('no proxy detected, will try later')
|
||||||
await asyncio.sleep(900)
|
await sleep(900)
|
||||||
|
|
||||||
async def _note_peers(self, peers, limit=2, check_ports=False,
|
async def _note_peers(self, peers, limit=2, check_ports=False,
|
||||||
source=None):
|
source=None):
|
||||||
@ -178,9 +178,9 @@ class PeerManager(object):
|
|||||||
use_peers = new_peers
|
use_peers = new_peers
|
||||||
for peer in use_peers:
|
for peer in use_peers:
|
||||||
self.logger.info(f'accepted new peer {peer} from {source}')
|
self.logger.info(f'accepted new peer {peer} from {source}')
|
||||||
peer.retry_event = asyncio.Event()
|
peer.retry_event = Event()
|
||||||
self.peers.add(peer)
|
self.peers.add(peer)
|
||||||
await self.task_group.spawn(self._monitor_peer(peer))
|
await self.group.spawn(self._monitor_peer(peer))
|
||||||
|
|
||||||
async def _monitor_peer(self, peer):
|
async def _monitor_peer(self, peer):
|
||||||
# Stop monitoring if we were dropped (a duplicate peer)
|
# Stop monitoring if we were dropped (a duplicate peer)
|
||||||
@ -372,7 +372,7 @@ class PeerManager(object):
|
|||||||
#
|
#
|
||||||
# External interface
|
# External interface
|
||||||
#
|
#
|
||||||
async def discover_peers(self, task_group):
|
async def discover_peers(self):
|
||||||
'''Perform peer maintenance. This includes
|
'''Perform peer maintenance. This includes
|
||||||
|
|
||||||
1) Forgetting unreachable peers.
|
1) Forgetting unreachable peers.
|
||||||
@ -385,9 +385,14 @@ class PeerManager(object):
|
|||||||
|
|
||||||
self.logger.info(f'beginning peer discovery. Force use of '
|
self.logger.info(f'beginning peer discovery. Force use of '
|
||||||
f'proxy: {self.env.force_proxy}')
|
f'proxy: {self.env.force_proxy}')
|
||||||
self.task_group = task_group
|
forever = Event()
|
||||||
await task_group.spawn(self._detect_proxy())
|
async with self.group as group:
|
||||||
await task_group.spawn(self._import_peers())
|
await group.spawn(forever.wait())
|
||||||
|
await group.spawn(self._detect_proxy())
|
||||||
|
await group.spawn(self._import_peers())
|
||||||
|
# Consume tasks as they complete
|
||||||
|
async for task in group:
|
||||||
|
task.result()
|
||||||
|
|
||||||
def info(self):
|
def info(self):
|
||||||
'''The number of peers.'''
|
'''The number of peers.'''
|
||||||
|
|||||||
@ -430,8 +430,8 @@ class SessionManager(object):
|
|||||||
await self._start_external_servers()
|
await self._start_external_servers()
|
||||||
# Peer discovery should start after the external servers
|
# Peer discovery should start after the external servers
|
||||||
# because we connect to ourself
|
# because we connect to ourself
|
||||||
async with TaskGroup(wait=object) as group:
|
async with TaskGroup() as group:
|
||||||
await group.spawn(self.peer_mgr.discover_peers(group))
|
await group.spawn(self.peer_mgr.discover_peers())
|
||||||
await group.spawn(self._clear_stale_sessions())
|
await group.spawn(self._clear_stale_sessions())
|
||||||
await group.spawn(self._log_sessions())
|
await group.spawn(self._log_sessions())
|
||||||
await group.spawn(self._restart_if_paused())
|
await group.spawn(self._restart_if_paused())
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user