lightning: use queueing lock
This commit is contained in:
parent
a13fb91ea4
commit
ee576d3ead
@ -28,7 +28,7 @@ import errno
|
||||
import random
|
||||
import re
|
||||
import select
|
||||
from collections import defaultdict
|
||||
from collections import defaultdict, deque
|
||||
import threading
|
||||
import socket
|
||||
import json
|
||||
@ -157,6 +157,34 @@ def deserialize_server(server_str):
|
||||
def serialize_server(host, port, protocol):
|
||||
return str(':'.join([host, port, protocol]))
|
||||
|
||||
class QLock:
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
self.waiters = deque()
|
||||
self.count = 0
|
||||
|
||||
def acquire(self):
|
||||
self.lock.acquire()
|
||||
if self.count:
|
||||
new_lock = threading.Lock()
|
||||
new_lock.acquire()
|
||||
self.waiters.append(new_lock)
|
||||
self.lock.release()
|
||||
new_lock.acquire()
|
||||
self.lock.acquire()
|
||||
self.count += 1
|
||||
self.lock.release()
|
||||
|
||||
def release(self):
|
||||
with self.lock:
|
||||
if not self.count:
|
||||
raise ValueError("lock not acquired")
|
||||
self.count -= 1
|
||||
if self.waiters:
|
||||
self.waiters.popleft().release()
|
||||
|
||||
def locked(self):
|
||||
return self.count > 0
|
||||
|
||||
class Network(util.DaemonThread):
|
||||
"""The Network class manages a set of connections to remote electrum
|
||||
@ -1063,13 +1091,13 @@ class Network(util.DaemonThread):
|
||||
def run(self):
|
||||
self.init_headers_file()
|
||||
loop = asyncio.new_event_loop()
|
||||
networkAndWalletLock = QLock()
|
||||
def asyncioThread():
|
||||
asyncio.set_event_loop(loop)
|
||||
self.lightninglock.acquire()
|
||||
task = asyncio.ensure_future(asyncio.gather(self.lightningrpc.run(networkAndWalletLock), self.lightningworker.run(networkAndWalletLock)))
|
||||
loop.run_forever()
|
||||
threading.Thread(target=asyncioThread).start()
|
||||
networkAndWalletLock = threading.Lock()
|
||||
networkAndWalletLock.acquire()
|
||||
while self.is_running():
|
||||
self.maintain_sockets()
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import functools
|
||||
import datetime
|
||||
import sys
|
||||
import struct
|
||||
import traceback
|
||||
@ -602,7 +603,7 @@ class LightningRPC:
|
||||
try:
|
||||
qitem = self.queue.get(block=False)
|
||||
except queue.Empty:
|
||||
await asyncio.sleep(1)
|
||||
await asyncio.sleep(5)
|
||||
pass
|
||||
else:
|
||||
def lightningRpcNetworkRequestThreadTarget(qitem):
|
||||
@ -686,31 +687,22 @@ class LightningWorker:
|
||||
NETWORK = self.network()
|
||||
CONFIG = self.config()
|
||||
|
||||
netAndWalLock.acquire()
|
||||
synced, local, server = isSynced()
|
||||
netAndWalLock.release()
|
||||
if not synced:
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
else:
|
||||
if not wasAlreadyUpToDate:
|
||||
print("UP TO DATE FOR THE FIRST TIME")
|
||||
print(NETWORK.get_status_value("updated"))
|
||||
wasAlreadyUpToDate = True
|
||||
|
||||
writer = None
|
||||
print("OPENING CONNECTION")
|
||||
try:
|
||||
reader, writer = await asyncio.wait_for(asyncio.open_connection(machine, 1080), 5)
|
||||
writer.write(b"MAGIC")
|
||||
writer.write(privateKeyHash[:6])
|
||||
await asyncio.wait_for(writer.drain(), 5)
|
||||
while asyncio.get_event_loop().is_running():
|
||||
print(datetime.datetime.now(), "READING REQUEST")
|
||||
obj = await readJson(reader)
|
||||
if not obj: continue
|
||||
if "id" not in obj:
|
||||
print("Invoice update?", obj)
|
||||
for i in self.subscribers: i(obj)
|
||||
continue
|
||||
print(datetime.datetime.now(), "making reply")
|
||||
await asyncio.wait_for(readReqAndReply(obj, writer, netAndWalLock), 10)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user