From 6ed9348d4b058876cca3fd8fafac36c72c3ad230 Mon Sep 17 00:00:00 2001 From: Janus Date: Thu, 1 Feb 2018 16:57:52 +0100 Subject: [PATCH] lightning: enable usage through daemon --- lib/bitcoin.py | 5 ++--- lib/commands.py | 14 ++++++++++++++ lib/interface.py | 14 ++++++++------ lib/lightning.py | 8 +++++++- lib/network.py | 38 +++++++++++++++++--------------------- lib/util.py | 2 +- 6 files changed, 49 insertions(+), 32 deletions(-) diff --git a/lib/bitcoin.py b/lib/bitcoin.py index 3a0dd069..1328c2f0 100644 --- a/lib/bitcoin.py +++ b/lib/bitcoin.py @@ -71,15 +71,14 @@ XPUB_HEADERS = { class NetworkConstants: @classmethod - def set_simnet(): + def set_simnet(cls): cls.TESTNET = True cls.ADDRTYPE_P2PKH = 0x3f cls.ADDRTYPE_P2SH = 0x7b cls.SEGWIT_HRP = "sb" - cls.HEADERS_URL = None cls.GENESIS = "683e86bd5c6d110d91b94b97137ba6bfe02dbbdb8e3dff722a669b5d69d77af6" cls.DEFAULT_PORTS = {'t':'50001', 's':'50002'} - cls.DEFAULT_SERVERS = { '127.0.0.1': DEFAULT_PORTS, } + cls.DEFAULT_SERVERS = { '127.0.0.1': cls.DEFAULT_PORTS, } @classmethod def set_mainnet(cls): diff --git a/lib/commands.py b/lib/commands.py index 63f63c76..13892701 100644 --- a/lib/commands.py +++ b/lib/commands.py @@ -23,6 +23,7 @@ # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import queue import sys import datetime import copy @@ -41,6 +42,7 @@ from .i18n import _ from .transaction import Transaction, multisig_script from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED from .plugins import run_hook +from .import lightning known_commands = {} @@ -691,6 +693,18 @@ class Commands: # for the python console return sorted(known_commands.keys()) + @command("wn") + def lightning(self, lcmd, *args): + q = queue.Queue() + class FakeQtSignal: + def emit(self, data): + q.put(data) + class MyConsole: + newResult = FakeQtSignal() + self.wallet.lightning.setConsole(MyConsole()) + lightning.lightningCall(self.wallet.lightning, lcmd)(*args) + return q.get() + param_descriptions = { 'privkey': 'Private key. Type \'?\' to get a prompt.', 'destination': 'Bitcoin address, contact or alias', diff --git a/lib/interface.py b/lib/interface.py index ae413ec3..3f2516f2 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -134,7 +134,9 @@ class Interface(util.PrintError): await asyncio.sleep(1) continue except: - if self.is_running(): traceback.print_exc() + if self.is_running(): + traceback.print_exc() + print("Previous exception from _save_certificate") continue break if not self.is_running(): return @@ -145,7 +147,7 @@ class Interface(util.PrintError): writer.close() except OSError as e: # not ConnectionError because we need socket.gaierror too if self.is_running(): - print(self.server, "Exception in _save_certificate", type(e)) + self.print_error(self.server, "Exception in _save_certificate", type(e)) return except TimeoutError: return @@ -194,13 +196,13 @@ class Interface(util.PrintError): context = get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_certs) if self.use_ssl else None self.reader, self.writer = await asyncio.wait_for(self.conn_coro(context), 5) except TimeoutError: - print("TimeoutError after getting certificate successfully...") + self.print_error("TimeoutError after getting certificate successfully...") raise except BaseException as e: if self.is_running(): if not isinstance(e, OSError): traceback.print_exc() - print("Previous exception will now be reraised") + self.print_error("Previous exception will now be reraised") raise e if self.use_ssl and is_new: self.print_error("saving new certificate for", self.host) @@ -214,7 +216,7 @@ class Interface(util.PrintError): w.write(json.dumps(i).encode("ascii") + b"\n") await w.drain() if time.time() - starttime > 2.5: - print("send_all: sending is taking too long. Used existing connection: ", usedExisting) + self.print_error("send_all: sending is taking too long. Used existing connection: ", usedExisting) raise ConnectionError("sending is taking too long") def close(self): @@ -279,7 +281,7 @@ class Interface(util.PrintError): await self.send_all([make_dict(*request)]) except (SocksError, OSError, TimeoutError) as e: if type(e) is SocksError: - print(e) + self.print_error(e) await self.unsent_requests.put((prio, request)) return False if self.debug: diff --git a/lib/lightning.py b/lib/lightning.py index 39751f22..5561d7ad 100644 --- a/lib/lightning.py +++ b/lib/lightning.py @@ -694,7 +694,13 @@ class LightningWorker(ForeverCoroutineJob): self.wallet = wallet self.network = network self.config = config - privateKeyHash = bitcoin.Hash(self.wallet().keystore.get_private_key([152,152,152,152], None)[0]) + ks = self.wallet().keystore + assert hasattr(ks, "xprv"), "Wallet must have xprv, can't be e.g. imported" + xprv, xpub = bitcoin.bip32_private_derivation(ks.xprv, "m/", "m/152/152/152/152") + tupl = bitcoin.deserialize_xprv(xprv) + privKey = tupl[-1] + assert type(privKey) is type(bytes([])) + privateKeyHash = bitcoin.Hash(privKey) deser = bitcoin.deserialize_xpub(wallet().keystore.xpub) assert deser[0] == "p2wpkh", deser diff --git a/lib/network.py b/lib/network.py index 42594284..954c4f7d 100644 --- a/lib/network.py +++ b/lib/network.py @@ -394,7 +394,7 @@ class Network(util.DaemonThread): async def start_interfaces(self): await self.start_interface(self.default_server) - print("started default server interface") + self.print_error("started default server interface") for i in range(self.num_server - 1): await self.start_random_interface() @@ -459,16 +459,16 @@ class Network(util.DaemonThread): async with self.restartLock: # Restart the network defaulting to the given server await self.stop_network() - print("STOOOOOOOOOOOOOOOOOOOOOOOOOOPPED") + self.print_error("STOOOOOOOOOOOOOOOOOOOOOOOOOOPPED") self.default_server = server async with self.all_server_locks("restart job"): self.disconnected_servers = {} await self.start_network(protocol, proxy) except BaseException as e: traceback.print_exc() - print("exception from restart job") + self.print_error("exception from restart job") if self.restartLock.locked(): - print("NOT RESTARTING, RESTART IN PROGRESS") + self.print_error("NOT RESTARTING, RESTART IN PROGRESS") return asyncio.run_coroutine_threadsafe(job(), self.loop) elif self.default_server != server: @@ -534,7 +534,7 @@ class Network(util.DaemonThread): try: await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting except TimeoutError: - print("taking too long", interface.server) + self.print_error("taking too long", interface.server) raise interface.close() @@ -770,10 +770,6 @@ class Network(util.DaemonThread): interface.print_error(error or 'bad response') return index = params[0] - # Ignore unsolicited chunks - if index not in self.requested_chunks: - return - self.requested_chunks.remove(index) connect = interface.blockchain.connect_chunk(index, result) if not connect: await self.connection_down(interface.server, "could not connect chunk") @@ -940,7 +936,7 @@ class Network(util.DaemonThread): except: if not self.stopped: traceback.print_exc() - print("FATAL ERROR ^^^") + self.print_error("FATAL ERROR ^^^") return asyncio.ensure_future(job()) def make_process_responses_job(self, interface): @@ -951,11 +947,11 @@ class Network(util.DaemonThread): pass except OSError: await self.connection_down(interface.server, "OSError in process_responses") - print("OS error, connection downed") + self.print_error("OS error, connection downed") except BaseException: if not self.stopped: traceback.print_exc() - print("FATAL ERROR in process_responses") + self.print_error("FATAL ERROR in process_responses") return asyncio.ensure_future(job()) def make_process_pending_sends_job(self): @@ -971,7 +967,7 @@ class Network(util.DaemonThread): except BaseException as e: if not self.stopped: traceback.print_exc() - print("FATAL ERROR in process_pending_sends") + self.print_error("FATAL ERROR in process_pending_sends") return asyncio.ensure_future(job()) def init_headers_file(self): @@ -1023,12 +1019,12 @@ class Network(util.DaemonThread): return #self.notify('interfaces') except GeneratorExit: - print(interface.server, "GENERATOR EXIT") + self.print_error(interface.server, "GENERATOR EXIT") pass except BaseException as e: if not self.stopped: traceback.print_exc() - print("FATAL ERROR in boot_interface") + self.print_error("FATAL ERROR in boot_interface") raise e interface.boot_job = asyncio.ensure_future(job()) interface.boot_job.server = interface.server @@ -1037,7 +1033,7 @@ class Network(util.DaemonThread): fut.exception() except: traceback.print_exc() - print("Previous exception in boot_job") + self.print_error("Previous exception in boot_job") interface.boot_job.add_done_callback(boot_job_cb) def make_ping_job(self, interface): @@ -1048,7 +1044,7 @@ class Network(util.DaemonThread): # Send pings and shut down stale interfaces # must use copy of values if interface.has_timed_out(): - print(interface.server, "timed out") + self.print_error(interface.server, "timed out") await self.connection_down(interface.server, "time out in ping_job") return elif interface.ping_required(): @@ -1059,7 +1055,7 @@ class Network(util.DaemonThread): except: if not self.stopped: traceback.print_exc() - print("FATAL ERROR in ping_job") + self.print_error("FATAL ERROR in ping_job") return asyncio.ensure_future(job()) def all_server_locks(self, ctx): @@ -1122,7 +1118,7 @@ class Network(util.DaemonThread): self.process_pending_sends_job = self.make_process_pending_sends_job() except: traceback.print_exc() - print("Previous exception in start_network") + self.print_error("Previous exception in start_network") raise asyncio.ensure_future(job()) run_future = asyncio.Future() @@ -1138,7 +1134,7 @@ class Network(util.DaemonThread): async def run_async(self, future): try: while self.is_running(): - #print(len(asyncio.Task.all_tasks())) + #self.print_error(len(asyncio.Task.all_tasks())) await asyncio.sleep(1) await self.maintain_requests() await self.maintain_interfaces() @@ -1150,7 +1146,7 @@ class Network(util.DaemonThread): try: await asyncio.wait_for(asyncio.shield(i), 2) except TimeoutError: - print("TOO SLOW TO SHUT DOWN, CANCELLING", i) + self.print_error("TOO SLOW TO SHUT DOWN, CANCELLING", i) i.cancel() except CancelledError: pass diff --git a/lib/util.py b/lib/util.py index 62951ef1..29be30e6 100644 --- a/lib/util.py +++ b/lib/util.py @@ -169,7 +169,7 @@ class DaemonThread(threading.Thread, PrintError): async def getFromQueueAndStart(): jobs = await self.forever_coroutines_queue.get() await asyncio.gather(*[i.run(self.is_running) for i in jobs]) - print("FOREVER JOBS DONE") + self.print_error("FOREVER JOBS DONE") self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart()) return self.forever_coroutines_task