lightning: misc patches, launch asyncio loop on separate thread
This commit is contained in:
parent
cce6421340
commit
920371e350
@ -20,6 +20,7 @@
|
||||
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import asyncio
|
||||
import time
|
||||
import queue
|
||||
import os
|
||||
@ -171,6 +172,8 @@ class Network(util.DaemonThread):
|
||||
"""
|
||||
|
||||
def __init__(self, config=None):
|
||||
self.lightninglock = threading.Lock()
|
||||
self.lightninglock.acquire()
|
||||
if config is None:
|
||||
config = {} # Do not use mutables as default values!
|
||||
util.DaemonThread.__init__(self)
|
||||
@ -1059,12 +1062,27 @@ class Network(util.DaemonThread):
|
||||
|
||||
def run(self):
|
||||
self.init_headers_file()
|
||||
loop = asyncio.new_event_loop()
|
||||
def asyncioThread():
|
||||
asyncio.set_event_loop(loop)
|
||||
self.lightninglock.acquire()
|
||||
task = asyncio.ensure_future(asyncio.gather(self.lightningrpc.run(networkAndWalletLock), self.lightningworker.run(networkAndWalletLock)))
|
||||
loop.run_forever()
|
||||
threading.Thread(target=asyncioThread).start()
|
||||
networkAndWalletLock = threading.Lock()
|
||||
networkAndWalletLock.acquire()
|
||||
while self.is_running():
|
||||
self.maintain_sockets()
|
||||
self.wait_on_sockets()
|
||||
self.maintain_requests()
|
||||
self.run_jobs() # Synchronizer and Verifier
|
||||
self.process_pending_sends()
|
||||
networkAndWalletLock.release()
|
||||
networkAndWalletLock.acquire()
|
||||
loop.stop()
|
||||
while loop.is_running():
|
||||
pass
|
||||
loop.close()
|
||||
self.stop_network()
|
||||
self.on_stop()
|
||||
|
||||
|
||||
@ -64,6 +64,9 @@ from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED
|
||||
from .paymentrequest import InvoiceStore
|
||||
from .contacts import Contacts
|
||||
|
||||
from .lightning import LightningRPC
|
||||
from .lightning import LightningWorker
|
||||
|
||||
TX_STATUS = [
|
||||
_('Unconfirmed'),
|
||||
_('Unconfirmed parent'),
|
||||
@ -1298,6 +1301,9 @@ class Abstract_Wallet(PrintError):
|
||||
self.verifier = SPV(self.network, self)
|
||||
self.synchronizer = Synchronizer(self, network)
|
||||
network.add_jobs([self.verifier, self.synchronizer])
|
||||
network.lightningworker = LightningWorker(lambda: self, lambda: network, lambda: network.config)
|
||||
network.lightningrpc = LightningRPC()
|
||||
network.lightninglock.release()
|
||||
else:
|
||||
self.verifier = None
|
||||
self.synchronizer = None
|
||||
|
||||
@ -18,8 +18,6 @@ from . import keystore
|
||||
|
||||
import queue
|
||||
|
||||
from .util import ForeverCoroutineJob
|
||||
|
||||
import threading
|
||||
import json
|
||||
import base64
|
||||
@ -481,7 +479,7 @@ async def PublishTransaction(json):
|
||||
json_format.Parse(json, req)
|
||||
global NETWORK
|
||||
tx = transaction.Transaction(binascii.hexlify(req.tx).decode("utf-8"))
|
||||
suc, has = await NETWORK.broadcast_async(tx)
|
||||
suc, has = NETWORK.broadcast(tx)
|
||||
m = rpc_pb2.PublishTransactionResponse()
|
||||
m.success = suc
|
||||
m.error = str(has) if not suc else ""
|
||||
@ -593,15 +591,14 @@ def computeInputScript(tx, signdesc):
|
||||
from collections import namedtuple
|
||||
QueueItem = namedtuple("QueueItem", ["methodName", "args"])
|
||||
|
||||
class LightningRPC(ForeverCoroutineJob):
|
||||
class LightningRPC:
|
||||
def __init__(self):
|
||||
super(LightningRPC, self).__init__()
|
||||
self.queue = queue.Queue()
|
||||
self.subscribers = []
|
||||
# overridden
|
||||
async def run(self, is_running):
|
||||
print("RPC STARTED")
|
||||
while is_running():
|
||||
async def run(self, netAndWalLock):
|
||||
while True:
|
||||
try:
|
||||
qitem = self.queue.get(block=False)
|
||||
except queue.Empty:
|
||||
@ -654,7 +651,7 @@ class LightningUI():
|
||||
|
||||
privateKeyHash = None
|
||||
|
||||
class LightningWorker(ForeverCoroutineJob):
|
||||
class LightningWorker:
|
||||
def __init__(self, wallet, network, config):
|
||||
global privateKeyHash
|
||||
super(LightningWorker, self).__init__()
|
||||
@ -678,13 +675,13 @@ class LightningWorker(ForeverCoroutineJob):
|
||||
assert deser[0] == "p2wpkh", deser
|
||||
self.subscribers = []
|
||||
|
||||
async def run(self, is_running):
|
||||
async def run(self, netAndWalLock):
|
||||
global WALLET, NETWORK
|
||||
global CONFIG
|
||||
|
||||
wasAlreadyUpToDate = False
|
||||
|
||||
while is_running():
|
||||
while True:
|
||||
WALLET = self.wallet()
|
||||
NETWORK = self.network()
|
||||
CONFIG = self.config()
|
||||
|
||||
@ -1,21 +0,0 @@
|
||||
import asyncio
|
||||
|
||||
async def handler(reader, writer):
|
||||
magic = await reader.read(5+6)
|
||||
await asyncio.sleep(5)
|
||||
print("in five sec!")
|
||||
await asyncio.sleep(5)
|
||||
writer.write(b'{\n "r_preimage": "6UNoNhDZ/0awtaDTM7KuCtlYcNkNljscxMLleoJv9+o=",\n "r_hash": "lQDtsJlLe8IzSRk0hrJcgglwRdtkHzX6mIwOhJrN7Ck=",\n "value": "8192",\n "settled": true,\n "creation_date": "1519994196",\n "settle_date": "1519994199",\n "payment_request": "lntb81920n1pdfj325pp5k7erq3avatceq8ca43h5uulxrhw2ma3a442a7c8fxrsw059c3m3sdqqcqzysdpwv4dn2xd74lfmea3taxj6pjfxrdl42t8w7ceptgv5ds0td0ypk47llryl6t4a48x54d7mnwremgcmljced4dhwty9g3pfywr307aqpwtkzf4",\n "expiry": "3600",\n "cltv_expiry": "144"\n}\n'.replace(b"\n",b""))
|
||||
await writer.drain()
|
||||
print(magic)
|
||||
|
||||
async def handler2(reader, writer):
|
||||
while True:
|
||||
data = await reader.read(2048)
|
||||
if data != b'':
|
||||
writer.write(b"HTTP/1.0 200 OK\r\nContent-length: 16\r\n\r\n{\"result\":\"lol\"}")
|
||||
await writer.drain()
|
||||
|
||||
asyncio.ensure_future(asyncio.start_server(handler, "127.0.0.1", 1080))
|
||||
asyncio.ensure_future(asyncio.start_server(handler2, "127.0.0.1", 8090))
|
||||
asyncio.get_event_loop().run_forever()
|
||||
Loading…
Reference in New Issue
Block a user