diff --git a/electrum/network.py b/electrum/network.py index ee6c7af2..e56765bd 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -20,6 +20,7 @@ # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import asyncio import time import queue import os @@ -171,6 +172,8 @@ class Network(util.DaemonThread): """ def __init__(self, config=None): + self.lightninglock = threading.Lock() + self.lightninglock.acquire() if config is None: config = {} # Do not use mutables as default values! util.DaemonThread.__init__(self) @@ -1059,12 +1062,27 @@ class Network(util.DaemonThread): def run(self): self.init_headers_file() + loop = asyncio.new_event_loop() + def asyncioThread(): + asyncio.set_event_loop(loop) + self.lightninglock.acquire() + task = asyncio.ensure_future(asyncio.gather(self.lightningrpc.run(networkAndWalletLock), self.lightningworker.run(networkAndWalletLock))) + loop.run_forever() + threading.Thread(target=asyncioThread).start() + networkAndWalletLock = threading.Lock() + networkAndWalletLock.acquire() while self.is_running(): self.maintain_sockets() self.wait_on_sockets() self.maintain_requests() self.run_jobs() # Synchronizer and Verifier self.process_pending_sends() + networkAndWalletLock.release() + networkAndWalletLock.acquire() + loop.stop() + while loop.is_running(): + pass + loop.close() self.stop_network() self.on_stop() diff --git a/electrum/wallet.py b/electrum/wallet.py index 416deffd..3e5c5ada 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -64,6 +64,9 @@ from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED from .paymentrequest import InvoiceStore from .contacts import Contacts +from .lightning import LightningRPC +from .lightning import LightningWorker + TX_STATUS = [ _('Unconfirmed'), _('Unconfirmed parent'), @@ -1298,6 +1301,9 @@ class Abstract_Wallet(PrintError): self.verifier = SPV(self.network, self) self.synchronizer = Synchronizer(self, network) network.add_jobs([self.verifier, self.synchronizer]) + network.lightningworker = LightningWorker(lambda: self, lambda: network, lambda: network.config) + network.lightningrpc = LightningRPC() + network.lightninglock.release() else: self.verifier = None self.synchronizer = None diff --git a/lib/lightning.py b/lib/lightning.py index 50b3ad74..8606d663 100644 --- a/lib/lightning.py +++ b/lib/lightning.py @@ -18,8 +18,6 @@ from . import keystore import queue -from .util import ForeverCoroutineJob - import threading import json import base64 @@ -481,7 +479,7 @@ async def PublishTransaction(json): json_format.Parse(json, req) global NETWORK tx = transaction.Transaction(binascii.hexlify(req.tx).decode("utf-8")) - suc, has = await NETWORK.broadcast_async(tx) + suc, has = NETWORK.broadcast(tx) m = rpc_pb2.PublishTransactionResponse() m.success = suc m.error = str(has) if not suc else "" @@ -593,15 +591,14 @@ def computeInputScript(tx, signdesc): from collections import namedtuple QueueItem = namedtuple("QueueItem", ["methodName", "args"]) -class LightningRPC(ForeverCoroutineJob): +class LightningRPC: def __init__(self): super(LightningRPC, self).__init__() self.queue = queue.Queue() self.subscribers = [] # overridden - async def run(self, is_running): - print("RPC STARTED") - while is_running(): + async def run(self, netAndWalLock): + while True: try: qitem = self.queue.get(block=False) except queue.Empty: @@ -654,7 +651,7 @@ class LightningUI(): privateKeyHash = None -class LightningWorker(ForeverCoroutineJob): +class LightningWorker: def __init__(self, wallet, network, config): global privateKeyHash super(LightningWorker, self).__init__() @@ -678,13 +675,13 @@ class LightningWorker(ForeverCoroutineJob): assert deser[0] == "p2wpkh", deser self.subscribers = [] - async def run(self, is_running): + async def run(self, netAndWalLock): global WALLET, NETWORK global CONFIG wasAlreadyUpToDate = False - while is_running(): + while True: WALLET = self.wallet() NETWORK = self.network() CONFIG = self.config() diff --git a/testserver.py b/testserver.py deleted file mode 100644 index 25c92ca4..00000000 --- a/testserver.py +++ /dev/null @@ -1,21 +0,0 @@ -import asyncio - -async def handler(reader, writer): - magic = await reader.read(5+6) - await asyncio.sleep(5) - print("in five sec!") - await asyncio.sleep(5) - writer.write(b'{\n "r_preimage": "6UNoNhDZ/0awtaDTM7KuCtlYcNkNljscxMLleoJv9+o=",\n "r_hash": "lQDtsJlLe8IzSRk0hrJcgglwRdtkHzX6mIwOhJrN7Ck=",\n "value": "8192",\n "settled": true,\n "creation_date": "1519994196",\n "settle_date": "1519994199",\n "payment_request": "lntb81920n1pdfj325pp5k7erq3avatceq8ca43h5uulxrhw2ma3a442a7c8fxrsw059c3m3sdqqcqzysdpwv4dn2xd74lfmea3taxj6pjfxrdl42t8w7ceptgv5ds0td0ypk47llryl6t4a48x54d7mnwremgcmljced4dhwty9g3pfywr307aqpwtkzf4",\n "expiry": "3600",\n "cltv_expiry": "144"\n}\n'.replace(b"\n",b"")) - await writer.drain() - print(magic) - -async def handler2(reader, writer): - while True: - data = await reader.read(2048) - if data != b'': - writer.write(b"HTTP/1.0 200 OK\r\nContent-length: 16\r\n\r\n{\"result\":\"lol\"}") - await writer.drain() - -asyncio.ensure_future(asyncio.start_server(handler, "127.0.0.1", 1080)) -asyncio.ensure_future(asyncio.start_server(handler2, "127.0.0.1", 8090)) -asyncio.get_event_loop().run_forever()