lightning: enable usage through daemon

This commit is contained in:
Janus 2018-02-01 16:57:52 +01:00
parent 0d26188498
commit 6ed9348d4b
6 changed files with 49 additions and 32 deletions

View File

@ -71,15 +71,14 @@ XPUB_HEADERS = {
class NetworkConstants:
@classmethod
def set_simnet():
def set_simnet(cls):
cls.TESTNET = True
cls.ADDRTYPE_P2PKH = 0x3f
cls.ADDRTYPE_P2SH = 0x7b
cls.SEGWIT_HRP = "sb"
cls.HEADERS_URL = None
cls.GENESIS = "683e86bd5c6d110d91b94b97137ba6bfe02dbbdb8e3dff722a669b5d69d77af6"
cls.DEFAULT_PORTS = {'t':'50001', 's':'50002'}
cls.DEFAULT_SERVERS = { '127.0.0.1': DEFAULT_PORTS, }
cls.DEFAULT_SERVERS = { '127.0.0.1': cls.DEFAULT_PORTS, }
@classmethod
def set_mainnet(cls):

View File

@ -23,6 +23,7 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import queue
import sys
import datetime
import copy
@ -41,6 +42,7 @@ from .i18n import _
from .transaction import Transaction, multisig_script
from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED
from .plugins import run_hook
from .import lightning
known_commands = {}
@ -691,6 +693,18 @@ class Commands:
# for the python console
return sorted(known_commands.keys())
@command("wn")
def lightning(self, lcmd, *args):
q = queue.Queue()
class FakeQtSignal:
def emit(self, data):
q.put(data)
class MyConsole:
newResult = FakeQtSignal()
self.wallet.lightning.setConsole(MyConsole())
lightning.lightningCall(self.wallet.lightning, lcmd)(*args)
return q.get()
param_descriptions = {
'privkey': 'Private key. Type \'?\' to get a prompt.',
'destination': 'Bitcoin address, contact or alias',

View File

@ -134,7 +134,9 @@ class Interface(util.PrintError):
await asyncio.sleep(1)
continue
except:
if self.is_running(): traceback.print_exc()
if self.is_running():
traceback.print_exc()
print("Previous exception from _save_certificate")
continue
break
if not self.is_running(): return
@ -145,7 +147,7 @@ class Interface(util.PrintError):
writer.close()
except OSError as e: # not ConnectionError because we need socket.gaierror too
if self.is_running():
print(self.server, "Exception in _save_certificate", type(e))
self.print_error(self.server, "Exception in _save_certificate", type(e))
return
except TimeoutError:
return
@ -194,13 +196,13 @@ class Interface(util.PrintError):
context = get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_certs) if self.use_ssl else None
self.reader, self.writer = await asyncio.wait_for(self.conn_coro(context), 5)
except TimeoutError:
print("TimeoutError after getting certificate successfully...")
self.print_error("TimeoutError after getting certificate successfully...")
raise
except BaseException as e:
if self.is_running():
if not isinstance(e, OSError):
traceback.print_exc()
print("Previous exception will now be reraised")
self.print_error("Previous exception will now be reraised")
raise e
if self.use_ssl and is_new:
self.print_error("saving new certificate for", self.host)
@ -214,7 +216,7 @@ class Interface(util.PrintError):
w.write(json.dumps(i).encode("ascii") + b"\n")
await w.drain()
if time.time() - starttime > 2.5:
print("send_all: sending is taking too long. Used existing connection: ", usedExisting)
self.print_error("send_all: sending is taking too long. Used existing connection: ", usedExisting)
raise ConnectionError("sending is taking too long")
def close(self):
@ -279,7 +281,7 @@ class Interface(util.PrintError):
await self.send_all([make_dict(*request)])
except (SocksError, OSError, TimeoutError) as e:
if type(e) is SocksError:
print(e)
self.print_error(e)
await self.unsent_requests.put((prio, request))
return False
if self.debug:

View File

@ -694,7 +694,13 @@ class LightningWorker(ForeverCoroutineJob):
self.wallet = wallet
self.network = network
self.config = config
privateKeyHash = bitcoin.Hash(self.wallet().keystore.get_private_key([152,152,152,152], None)[0])
ks = self.wallet().keystore
assert hasattr(ks, "xprv"), "Wallet must have xprv, can't be e.g. imported"
xprv, xpub = bitcoin.bip32_private_derivation(ks.xprv, "m/", "m/152/152/152/152")
tupl = bitcoin.deserialize_xprv(xprv)
privKey = tupl[-1]
assert type(privKey) is type(bytes([]))
privateKeyHash = bitcoin.Hash(privKey)
deser = bitcoin.deserialize_xpub(wallet().keystore.xpub)
assert deser[0] == "p2wpkh", deser

View File

@ -394,7 +394,7 @@ class Network(util.DaemonThread):
async def start_interfaces(self):
await self.start_interface(self.default_server)
print("started default server interface")
self.print_error("started default server interface")
for i in range(self.num_server - 1):
await self.start_random_interface()
@ -459,16 +459,16 @@ class Network(util.DaemonThread):
async with self.restartLock:
# Restart the network defaulting to the given server
await self.stop_network()
print("STOOOOOOOOOOOOOOOOOOOOOOOOOOPPED")
self.print_error("STOOOOOOOOOOOOOOOOOOOOOOOOOOPPED")
self.default_server = server
async with self.all_server_locks("restart job"):
self.disconnected_servers = {}
await self.start_network(protocol, proxy)
except BaseException as e:
traceback.print_exc()
print("exception from restart job")
self.print_error("exception from restart job")
if self.restartLock.locked():
print("NOT RESTARTING, RESTART IN PROGRESS")
self.print_error("NOT RESTARTING, RESTART IN PROGRESS")
return
asyncio.run_coroutine_threadsafe(job(), self.loop)
elif self.default_server != server:
@ -534,7 +534,7 @@ class Network(util.DaemonThread):
try:
await asyncio.wait_for(asyncio.shield(interface.boot_job), 6) # longer than any timeout while connecting
except TimeoutError:
print("taking too long", interface.server)
self.print_error("taking too long", interface.server)
raise
interface.close()
@ -770,10 +770,6 @@ class Network(util.DaemonThread):
interface.print_error(error or 'bad response')
return
index = params[0]
# Ignore unsolicited chunks
if index not in self.requested_chunks:
return
self.requested_chunks.remove(index)
connect = interface.blockchain.connect_chunk(index, result)
if not connect:
await self.connection_down(interface.server, "could not connect chunk")
@ -940,7 +936,7 @@ class Network(util.DaemonThread):
except:
if not self.stopped:
traceback.print_exc()
print("FATAL ERROR ^^^")
self.print_error("FATAL ERROR ^^^")
return asyncio.ensure_future(job())
def make_process_responses_job(self, interface):
@ -951,11 +947,11 @@ class Network(util.DaemonThread):
pass
except OSError:
await self.connection_down(interface.server, "OSError in process_responses")
print("OS error, connection downed")
self.print_error("OS error, connection downed")
except BaseException:
if not self.stopped:
traceback.print_exc()
print("FATAL ERROR in process_responses")
self.print_error("FATAL ERROR in process_responses")
return asyncio.ensure_future(job())
def make_process_pending_sends_job(self):
@ -971,7 +967,7 @@ class Network(util.DaemonThread):
except BaseException as e:
if not self.stopped:
traceback.print_exc()
print("FATAL ERROR in process_pending_sends")
self.print_error("FATAL ERROR in process_pending_sends")
return asyncio.ensure_future(job())
def init_headers_file(self):
@ -1023,12 +1019,12 @@ class Network(util.DaemonThread):
return
#self.notify('interfaces')
except GeneratorExit:
print(interface.server, "GENERATOR EXIT")
self.print_error(interface.server, "GENERATOR EXIT")
pass
except BaseException as e:
if not self.stopped:
traceback.print_exc()
print("FATAL ERROR in boot_interface")
self.print_error("FATAL ERROR in boot_interface")
raise e
interface.boot_job = asyncio.ensure_future(job())
interface.boot_job.server = interface.server
@ -1037,7 +1033,7 @@ class Network(util.DaemonThread):
fut.exception()
except:
traceback.print_exc()
print("Previous exception in boot_job")
self.print_error("Previous exception in boot_job")
interface.boot_job.add_done_callback(boot_job_cb)
def make_ping_job(self, interface):
@ -1048,7 +1044,7 @@ class Network(util.DaemonThread):
# Send pings and shut down stale interfaces
# must use copy of values
if interface.has_timed_out():
print(interface.server, "timed out")
self.print_error(interface.server, "timed out")
await self.connection_down(interface.server, "time out in ping_job")
return
elif interface.ping_required():
@ -1059,7 +1055,7 @@ class Network(util.DaemonThread):
except:
if not self.stopped:
traceback.print_exc()
print("FATAL ERROR in ping_job")
self.print_error("FATAL ERROR in ping_job")
return asyncio.ensure_future(job())
def all_server_locks(self, ctx):
@ -1122,7 +1118,7 @@ class Network(util.DaemonThread):
self.process_pending_sends_job = self.make_process_pending_sends_job()
except:
traceback.print_exc()
print("Previous exception in start_network")
self.print_error("Previous exception in start_network")
raise
asyncio.ensure_future(job())
run_future = asyncio.Future()
@ -1138,7 +1134,7 @@ class Network(util.DaemonThread):
async def run_async(self, future):
try:
while self.is_running():
#print(len(asyncio.Task.all_tasks()))
#self.print_error(len(asyncio.Task.all_tasks()))
await asyncio.sleep(1)
await self.maintain_requests()
await self.maintain_interfaces()
@ -1150,7 +1146,7 @@ class Network(util.DaemonThread):
try:
await asyncio.wait_for(asyncio.shield(i), 2)
except TimeoutError:
print("TOO SLOW TO SHUT DOWN, CANCELLING", i)
self.print_error("TOO SLOW TO SHUT DOWN, CANCELLING", i)
i.cancel()
except CancelledError:
pass

View File

@ -169,7 +169,7 @@ class DaemonThread(threading.Thread, PrintError):
async def getFromQueueAndStart():
jobs = await self.forever_coroutines_queue.get()
await asyncio.gather(*[i.run(self.is_running) for i in jobs])
print("FOREVER JOBS DONE")
self.print_error("FOREVER JOBS DONE")
self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart())
return self.forever_coroutines_task