From ee576d3ead5d191068997a7baf5c4cd5f12bade5 Mon Sep 17 00:00:00 2001 From: Janus Date: Mon, 19 Mar 2018 12:14:39 +0100 Subject: [PATCH] lightning: use queueing lock --- electrum/network.py | 32 ++++++++++++++++++++++++++++++-- lib/lightning.py | 18 +++++------------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/electrum/network.py b/electrum/network.py index 0f06a425..be0ccd41 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -28,7 +28,7 @@ import errno import random import re import select -from collections import defaultdict +from collections import defaultdict, deque import threading import socket import json @@ -157,6 +157,34 @@ def deserialize_server(server_str): def serialize_server(host, port, protocol): return str(':'.join([host, port, protocol])) +class QLock: + def __init__(self): + self.lock = threading.Lock() + self.waiters = deque() + self.count = 0 + + def acquire(self): + self.lock.acquire() + if self.count: + new_lock = threading.Lock() + new_lock.acquire() + self.waiters.append(new_lock) + self.lock.release() + new_lock.acquire() + self.lock.acquire() + self.count += 1 + self.lock.release() + + def release(self): + with self.lock: + if not self.count: + raise ValueError("lock not acquired") + self.count -= 1 + if self.waiters: + self.waiters.popleft().release() + + def locked(self): + return self.count > 0 class Network(util.DaemonThread): """The Network class manages a set of connections to remote electrum @@ -1063,13 +1091,13 @@ class Network(util.DaemonThread): def run(self): self.init_headers_file() loop = asyncio.new_event_loop() + networkAndWalletLock = QLock() def asyncioThread(): asyncio.set_event_loop(loop) self.lightninglock.acquire() task = asyncio.ensure_future(asyncio.gather(self.lightningrpc.run(networkAndWalletLock), self.lightningworker.run(networkAndWalletLock))) loop.run_forever() threading.Thread(target=asyncioThread).start() - networkAndWalletLock = threading.Lock() networkAndWalletLock.acquire() while self.is_running(): self.maintain_sockets() diff --git a/lib/lightning.py b/lib/lightning.py index 65b93b6c..5261da90 100644 --- a/lib/lightning.py +++ b/lib/lightning.py @@ -1,4 +1,5 @@ import functools +import datetime import sys import struct import traceback @@ -602,7 +603,7 @@ class LightningRPC: try: qitem = self.queue.get(block=False) except queue.Empty: - await asyncio.sleep(1) + await asyncio.sleep(5) pass else: def lightningRpcNetworkRequestThreadTarget(qitem): @@ -686,31 +687,22 @@ class LightningWorker: NETWORK = self.network() CONFIG = self.config() - netAndWalLock.acquire() - synced, local, server = isSynced() - netAndWalLock.release() - if not synced: - await asyncio.sleep(5) - continue - else: - if not wasAlreadyUpToDate: - print("UP TO DATE FOR THE FIRST TIME") - print(NETWORK.get_status_value("updated")) - wasAlreadyUpToDate = True - writer = None + print("OPENING CONNECTION") try: reader, writer = await asyncio.wait_for(asyncio.open_connection(machine, 1080), 5) writer.write(b"MAGIC") writer.write(privateKeyHash[:6]) await asyncio.wait_for(writer.drain(), 5) while asyncio.get_event_loop().is_running(): + print(datetime.datetime.now(), "READING REQUEST") obj = await readJson(reader) if not obj: continue if "id" not in obj: print("Invoice update?", obj) for i in self.subscribers: i(obj) continue + print(datetime.datetime.now(), "making reply") await asyncio.wait_for(readReqAndReply(obj, writer, netAndWalLock), 10) except: traceback.print_exc()