Compare commits

...

10 Commits

7 changed files with 765 additions and 541 deletions

View File

@ -7,7 +7,7 @@ jsonrpclib-pelix==0.3.1
pbkdf2==1.3 pbkdf2==1.3
protobuf==3.5.1 protobuf==3.5.1
pyaes==1.6.1 pyaes==1.6.1
PySocks==1.6.8 aiosocks==0.2.6
qrcode==5.3 qrcode==5.3
requests==2.18.4 requests==2.18.4
six==1.11.0 six==1.11.0

View File

@ -6,4 +6,4 @@ qrcode
protobuf protobuf
dnspython dnspython
jsonrpclib-pelix jsonrpclib-pelix
PySocks>=1.6.6 aiosocks

View File

@ -4,7 +4,7 @@ from .wallet import Synchronizer, Wallet
from .storage import WalletStorage from .storage import WalletStorage
from .coinchooser import COIN_CHOOSERS from .coinchooser import COIN_CHOOSERS
from .network import Network, pick_random_server from .network import Network, pick_random_server
from .interface import Connection, Interface from .interface import Interface
from .simple_config import SimpleConfig, get_config, set_config from .simple_config import SimpleConfig, get_config, set_config
from . import bitcoin from . import bitcoin
from . import transaction from . import transaction

View File

@ -22,288 +22,257 @@
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
import aiosocks
import os import os
import stat
import re import re
import socket
import ssl import ssl
import sys import sys
import threading import threading
import time import time
import traceback import traceback
import asyncio
import json
import asyncio.streams
from asyncio.sslproto import SSLProtocol
import io
import requests import requests
from .util import print_error from aiosocks.errors import SocksError
from concurrent.futures import TimeoutError
ca_path = requests.certs.where() ca_path = requests.certs.where()
from .util import print_error
from .ssl_in_socks import sslInSocksReaderWriter
from . import util from . import util
from . import x509 from . import x509
from . import pem from . import pem
def get_ssl_context(cert_reqs, ca_certs):
def Connection(server, queue, config_path): context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs)
"""Makes asynchronous connections to a remote electrum server. context.check_hostname = False
Returns the running thread that is making the connection. context.verify_mode = cert_reqs
context.options |= ssl.OP_NO_SSLv2
Once the thread has connected, it finishes, placing a tuple on the context.options |= ssl.OP_NO_SSLv3
queue of the form (server, socket), where socket is None if context.options |= ssl.OP_NO_TLSv1
connection failed. return context
"""
host, port, protocol = server.rsplit(':', 2)
if not protocol in 'st':
raise Exception('Unknown protocol: %s' % protocol)
c = TcpConnection(server, queue, config_path)
c.start()
return c
class TcpConnection(threading.Thread, util.PrintError):
def __init__(self, server, queue, config_path):
threading.Thread.__init__(self)
self.config_path = config_path
self.queue = queue
self.server = server
self.host, self.port, self.protocol = self.server.rsplit(':', 2)
self.host = str(self.host)
self.port = int(self.port)
self.use_ssl = (self.protocol == 's')
self.daemon = True
def diagnostic_name(self):
return self.host
def check_host_name(self, peercert, name):
"""Simple certificate/host name checker. Returns True if the
certificate matches, False otherwise. Does not support
wildcards."""
# Check that the peer has supplied a certificate.
# None/{} is not acceptable.
if not peercert:
return False
if 'subjectAltName' in peercert:
for typ, val in peercert["subjectAltName"]:
if typ == "DNS" and val == name:
return True
else:
# Only check the subject DN if there is no subject alternative
# name.
cn = None
for attr, val in peercert["subject"]:
# Use most-specific (last) commonName attribute.
if attr == "commonName":
cn = val
if cn is not None:
return cn == name
return False
def get_simple_socket(self):
try:
l = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror:
self.print_error("cannot resolve hostname")
return
e = None
for res in l:
try:
s = socket.socket(res[0], socket.SOCK_STREAM)
s.settimeout(10)
s.connect(res[4])
s.settimeout(2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
return s
except BaseException as _e:
e = _e
continue
else:
self.print_error("failed to connect", str(e))
@staticmethod
def get_ssl_context(cert_reqs, ca_certs):
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs)
context.check_hostname = False
context.verify_mode = cert_reqs
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
return context
def get_socket(self):
if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host)
if not os.path.exists(cert_path):
is_new = True
s = self.get_simple_socket()
if s is None:
return
# try with CA first
try:
context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path)
s = context.wrap_socket(s, do_handshake_on_connect=True)
except ssl.SSLError as e:
print_error(e)
s = None
except:
return
if s and self.check_host_name(s.getpeercert(), self.host):
self.print_error("SSL certificate signed by CA")
return s
# get server certificate.
# Do not use ssl.get_server_certificate because it does not work with proxy
s = self.get_simple_socket()
if s is None:
return
try:
context = self.get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None)
s = context.wrap_socket(s)
except ssl.SSLError as e:
self.print_error("SSL error retrieving SSL certificate:", e)
return
except:
return
dercert = s.getpeercert(True)
s.close()
cert = ssl.DER_cert_to_PEM_cert(dercert)
# workaround android bug
cert = re.sub("([^\n])-----END CERTIFICATE-----","\\1\n-----END CERTIFICATE-----",cert)
temporary_path = cert_path + '.temp'
with open(temporary_path,"w") as f:
f.write(cert)
else:
is_new = False
s = self.get_simple_socket()
if s is None:
return
if self.use_ssl:
try:
context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED,
ca_certs=(temporary_path if is_new else cert_path))
s = context.wrap_socket(s, do_handshake_on_connect=True)
except socket.timeout:
self.print_error('timeout')
return
except ssl.SSLError as e:
self.print_error("SSL error:", e)
if e.errno != 1:
return
if is_new:
rej = cert_path + '.rej'
if os.path.exists(rej):
os.unlink(rej)
os.rename(temporary_path, rej)
else:
with open(cert_path) as f:
cert = f.read()
try:
b = pem.dePem(cert, 'CERTIFICATE')
x = x509.X509(b)
except:
traceback.print_exc(file=sys.stderr)
self.print_error("wrong certificate")
return
try:
x.check_date()
except:
self.print_error("certificate has expired:", cert_path)
os.unlink(cert_path)
return
self.print_error("wrong certificate")
if e.errno == 104:
return
return
except BaseException as e:
self.print_error(e)
traceback.print_exc(file=sys.stderr)
return
if is_new:
self.print_error("saving certificate")
os.rename(temporary_path, cert_path)
return s
def run(self):
socket = self.get_socket()
if socket:
self.print_error("connected")
self.queue.put((self.server, socket))
class Interface(util.PrintError): class Interface(util.PrintError):
"""The Interface class handles a socket connected to a single remote """The Interface class handles a socket connected to a single remote
electrum server. It's exposed API is: electrum server. It's exposed API is:
- Member functions close(), fileno(), get_responses(), has_timed_out(), - Member functions close(), fileno(), get_response(), has_timed_out(),
ping_required(), queue_request(), send_requests() ping_required(), queue_request(), send_request()
- Member variable server. - Member variable server.
""" """
def __init__(self, server, socket): def __init__(self, server, config_path, proxy_config, is_running, exception_handler):
self.server = server self.error_future = asyncio.Future()
self.host, _, _ = server.rsplit(':', 2) self.error_future.add_done_callback(exception_handler)
self.socket = socket self.is_running = lambda: is_running() and not self.error_future.done()
self.addr = self.auth = None
if proxy_config is not None:
if proxy_config["mode"] == "socks5":
self.addr = aiosocks.Socks5Addr(proxy_config["host"], proxy_config["port"])
self.auth = aiosocks.Socks5Auth(proxy_config["user"], proxy_config["password"]) if proxy_config["user"] != "" else None
elif proxy_config["mode"] == "socks4":
self.addr = aiosocks.Socks4Addr(proxy_config["host"], proxy_config["port"])
self.auth = aiosocks.Socks4Auth(proxy_config["password"]) if proxy_config["password"] != "" else None
else:
raise Exception("proxy mode not supported")
self.pipe = util.SocketPipe(socket) self.server = server
self.pipe.set_timeout(0.0) # Don't wait for data self.config_path = config_path
host, port, protocol = self.server.split(':')
self.host = host
self.port = int(port)
self.use_ssl = (protocol=='s')
self.reader = self.writer = None
self.lock = asyncio.Lock()
# Dump network messages. Set at runtime from the console. # Dump network messages. Set at runtime from the console.
self.debug = False self.debug = False
self.unsent_requests = [] self.unsent_requests = asyncio.PriorityQueue()
self.unanswered_requests = {} self.unanswered_requests = {}
# Set last ping to zero to ensure immediate ping
self.last_request = time.time()
self.last_ping = 0 self.last_ping = 0
self.closed_remotely = False self.closed_remotely = False
self.buf = bytes()
def conn_coro(self, context):
return asyncio.open_connection(self.host, self.port, ssl=context)
async def _save_certificate(self, cert_path, require_ca):
dercert = None
if require_ca:
context = get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path)
else:
context = get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None)
if self.addr is not None:
self.print_error("can't save certificate through socks!")
# just save the empty file to force use of PKI
# this will break all self-signed servers, of course
cert = ""
else:
reader, writer = await asyncio.wait_for(self.conn_coro(context), 3)
dercert = writer.get_extra_info('ssl_object').getpeercert(True)
# an exception will be thrown by now if require_ca is True (e.g. a certificate was supplied)
writer.close()
if not require_ca:
cert = ssl.DER_cert_to_PEM_cert(dercert)
else:
# Don't pin a CA signed certificate
cert = ""
temporary_path = cert_path + '.temp'
with open(temporary_path, "w") as f:
f.write(cert)
return temporary_path
async def _get_read_write(self):
async with self.lock:
if self.reader is not None and self.writer is not None:
return self.reader, self.writer, True
if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host)
if not os.path.exists(cert_path):
temporary_path = None
# first, we try to save a certificate signed through the PKI
try:
temporary_path = await self._save_certificate(cert_path, True)
except ssl.SSLError:
pass
except (TimeoutError, OSError) as e:
if not self.error_future.done(): self.error_future.set_result(e)
raise
# if the certificate verification failed, we try to save a self-signed certificate
if not temporary_path:
try:
temporary_path = await self._save_certificate(cert_path, False)
# we also catch SSLError here, but it shouldn't matter since no certificate is required,
# so the SSLError wouldn't mean certificate validation failed
except (TimeoutError, OSError) as e:
if not self.error_future.done(): self.error_future.set_result(e)
raise
if not temporary_path:
if not self.error_future.done(): self.error_future.set_result(ConnectionError("Could not get certificate"))
raise ConnectionError("Could not get certificate on second try")
is_new = True
else:
is_new = False
ca_certs = temporary_path if is_new else cert_path
size = os.stat(ca_certs)[stat.ST_SIZE]
self_signed = size != 0
if not self_signed:
ca_certs = ca_path
try:
if self.addr is not None:
if not self.use_ssl:
open_coro = aiosocks.open_connection(proxy=self.addr, proxy_auth=self.auth, dst=(self.host, self.port))
self.reader, self.writer = await asyncio.wait_for(open_coro, 5)
else:
ssl_in_socks_coro = sslInSocksReaderWriter(self.addr, self.auth, self.host, self.port, ca_certs)
self.reader, self.writer = await asyncio.wait_for(ssl_in_socks_coro, 5)
else:
context = get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_certs) if self.use_ssl else None
self.reader, self.writer = await asyncio.wait_for(self.conn_coro(context), 5)
except TimeoutError:
self.print_error("TimeoutError after getting certificate successfully...")
raise
except ssl.SSLError:
# FIXME TODO
assert not self_signed, "we shouldn't reject self-signed here since the certificate has been saved (has size {})".format(size)
raise
except BaseException as e:
if self.is_running():
if not isinstance(e, OSError):
traceback.print_exc()
self.print_error("Previous exception will now be reraised")
raise e
if self.use_ssl and is_new:
self.print_error("saving new certificate for", self.host)
os.rename(temporary_path, cert_path)
return self.reader, self.writer, False
async def send_all(self, list_of_requests):
_, w, usedExisting = await self._get_read_write()
starttime = time.time()
for i in list_of_requests:
w.write(json.dumps(i).encode("ascii") + b"\n")
await w.drain()
if time.time() - starttime > 2.5:
self.print_error("send_all: sending is taking too long. Used existing connection: ", usedExisting)
raise ConnectionError("sending is taking too long")
def close(self):
if self.writer:
self.writer.close()
def _try_extract(self):
try:
pos = self.buf.index(b"\n")
except ValueError:
return
obj = self.buf[:pos]
try:
obj = json.loads(obj.decode("ascii"))
except ValueError:
return
else:
self.buf = self.buf[pos+1:]
self.last_action = time.time()
return obj
async def get(self):
reader, _, _ = await self._get_read_write()
while self.is_running():
tried = self._try_extract()
if tried: return tried
temp = io.BytesIO()
try:
data = await asyncio.wait_for(reader.read(2**10), 1)
temp.write(data)
except asyncio.TimeoutError:
continue
self.buf += temp.getvalue()
def idle_time(self):
return time.time() - self.last_action
def diagnostic_name(self): def diagnostic_name(self):
return self.host return self.host
def fileno(self): async def queue_request(self, *args): # method, params, _id
# Needed for select
return self.socket.fileno()
def close(self):
if not self.closed_remotely:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.socket.close()
def queue_request(self, *args): # method, params, _id
'''Queue a request, later to be send with send_requests when the '''Queue a request, later to be send with send_requests when the
socket is available for writing. socket is available for writing.
''' '''
self.request_time = time.time() self.request_time = time.time()
self.unsent_requests.append(args) await self.unsent_requests.put((self.request_time, args))
await self.unsent_requests.join()
def num_requests(self): def num_requests(self):
'''Keep unanswered requests below 100''' '''Keep unanswered requests below 100'''
n = 100 - len(self.unanswered_requests) n = 100 - len(self.unanswered_requests)
return min(n, len(self.unsent_requests)) return min(n, self.unsent_requests.qsize())
def send_requests(self): async def send_request(self):
'''Sends queued requests. Returns False on failure.''' '''Sends a queued request.'''
make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i} make_dict = lambda m, p, i: {'method': m, 'params': p, 'id': i}
n = self.num_requests() n = self.num_requests()
wire_requests = self.unsent_requests[0:n] prio, request = await self.unsent_requests.get()
try: try:
self.pipe.send_all([make_dict(*r) for r in wire_requests]) await self.send_all([make_dict(*request)])
except socket.error as e: except (SocksError, OSError, TimeoutError) as e:
self.print_error("socket error:", e) if type(e) is SocksError:
return False self.print_error(e)
self.unsent_requests = self.unsent_requests[n:] await self.unsent_requests.put((prio, request))
for request in wire_requests: return
if self.debug: self.unsent_requests.task_done()
self.print_error("-->", request) if self.debug:
self.unanswered_requests[request[2]] = request self.print_error("-->", request)
return True self.unanswered_requests[request[2]] = request
self.last_action = time.time()
def ping_required(self): def ping_required(self):
'''Maintains time since last ping. Returns True if a ping should '''Maintains time since last ping. Returns True if a ping should
@ -317,14 +286,14 @@ class Interface(util.PrintError):
def has_timed_out(self): def has_timed_out(self):
'''Returns True if the interface has timed out.''' '''Returns True if the interface has timed out.'''
if self.error_future.done(): return True
if (self.unanswered_requests and time.time() - self.request_time > 10 if (self.unanswered_requests and time.time() - self.request_time > 10
and self.pipe.idle_time() > 10): and self.idle_time() > 10):
self.print_error("timeout", len(self.unanswered_requests)) self.print_error("timeout", len(self.unanswered_requests))
return True return True
return False return False
def get_responses(self): async def get_response(self):
'''Call if there is data available on the socket. Returns a list of '''Call if there is data available on the socket. Returns a list of
(request, response) pairs. Notifications are singleton (request, response) pairs. Notifications are singleton
unsolicited responses presumably as a result of prior unsolicited responses presumably as a result of prior
@ -333,34 +302,25 @@ class Interface(util.PrintError):
corresponding request. If the connection was closed remotely corresponding request. If the connection was closed remotely
or the remote server is misbehaving, a (None, None) will appear. or the remote server is misbehaving, a (None, None) will appear.
''' '''
responses = [] response = await self.get()
while True: if not type(response) is dict:
try: if response is None:
response = self.pipe.get() self.closed_remotely = True
except util.timeout: if self.is_running():
break
if not type(response) is dict:
responses.append((None, None))
if response is None:
self.closed_remotely = True
self.print_error("connection closed remotely") self.print_error("connection closed remotely")
break return None, None
if self.debug: if self.debug:
self.print_error("<--", response) self.print_error("<--", response)
wire_id = response.get('id', None) wire_id = response.get('id', None)
if wire_id is None: # Notification if wire_id is None: # Notification
responses.append((None, response)) return None, response
else:
request = self.unanswered_requests.pop(wire_id, None)
if request:
return request, response
else: else:
request = self.unanswered_requests.pop(wire_id, None) self.print_error("unknown wire ID", wire_id)
if request: return None, None # Signal
responses.append((request, response))
else:
self.print_error("unknown wire ID", wire_id)
responses.append((None, None)) # Signal
break
return responses
def check_cert(host, cert): def check_cert(host, cert):
try: try:

File diff suppressed because it is too large Load Diff

85
lib/ssl_in_socks.py Normal file
View File

@ -0,0 +1,85 @@
import traceback
import ssl
from asyncio.sslproto import SSLProtocol
import aiosocks
import asyncio
from . import interface
class AppProto(asyncio.Protocol):
def __init__(self, receivedQueue, connUpLock):
self.buf = bytearray()
self.receivedQueue = receivedQueue
self.connUpLock = connUpLock
def connection_made(self, transport):
self.connUpLock.release()
def data_received(self, data):
self.buf.extend(data)
NEWLINE = b"\n"[0]
for idx, val in enumerate(self.buf):
if NEWLINE == val:
asyncio.ensure_future(self.receivedQueue.put(bytes(self.buf[:idx+1])))
self.buf = self.buf[idx+1:]
def makeProtocolFactory(receivedQueue, connUpLock, ca_certs):
class MySSLProtocol(SSLProtocol):
def __init__(self):
context = interface.get_ssl_context(\
cert_reqs=ssl.CERT_REQUIRED if ca_certs is not None else ssl.CERT_NONE,\
ca_certs=ca_certs)
proto = AppProto(receivedQueue, connUpLock)
super().__init__(asyncio.get_event_loop(), proto, context, None)
return MySSLProtocol
class ReaderEmulator:
def __init__(self, receivedQueue):
self.receivedQueue = receivedQueue
async def read(self, _bufferSize):
return await self.receivedQueue.get()
class WriterEmulator:
def __init__(self, transport):
self.transport = transport
def write(self, data):
self.transport.write(data)
async def drain(self):
pass
def close(self):
self.transport.close()
async def sslInSocksReaderWriter(socksAddr, socksAuth, host, port, ca_certs):
receivedQueue = asyncio.Queue()
connUpLock = asyncio.Lock()
await connUpLock.acquire()
transport, protocol = await aiosocks.create_connection(\
makeProtocolFactory(receivedQueue, connUpLock, ca_certs),\
proxy=socksAddr,\
proxy_auth=socksAuth, dst=(host, port))
await connUpLock.acquire()
return ReaderEmulator(receivedQueue), WriterEmulator(protocol._app_transport)
if __name__ == "__main__":
async def l(fut):
try:
# aiosocks.Socks4Addr("127.0.0.1", 9050), None, "songbird.bauerj.eu", 50002, None)
args = aiosocks.Socks4Addr("127.0.0.1", 9050), None, "electrum.akinbo.org", 51002, None
reader, writer = await sslInSocksReaderWriter(*args)
writer.write(b'{"id":0,"method":"server.version","args":["3.0.2", "1.1"]}\n')
await writer.drain()
print(await reader.read(4096))
writer.write(b'{"id":0,"method":"server.version","args":["3.0.2", "1.1"]}\n')
await writer.drain()
print(await reader.read(4096))
writer.close()
fut.set_result("finished")
except BaseException as e:
fut.set_exception(e)
def f():
loop = asyncio.get_event_loop()
fut = asyncio.Future()
asyncio.ensure_future(l(fut))
loop.run_until_complete(fut)
print(fut.result())
loop.close()
f()

View File

@ -29,6 +29,7 @@ import traceback
import urllib import urllib
import threading import threading
import hmac import hmac
import asyncio
from .i18n import _ from .i18n import _
@ -148,6 +149,24 @@ class PrintError(object):
def print_msg(self, *msg): def print_msg(self, *msg):
print_msg("[%s]" % self.diagnostic_name(), *msg) print_msg("[%s]" % self.diagnostic_name(), *msg)
class ForeverCoroutineJob(PrintError):
"""A job that is run from a thread's main loop. run() is
called from that thread's context.
"""
async def run(self, is_running):
"""Called once from the thread"""
pass
class CoroutineJob(PrintError):
"""A job that is run periodically from a thread's main loop. run() is
called from that thread's context.
"""
async def run(self):
"""Called periodically from the thread"""
pass
class ThreadJob(PrintError): class ThreadJob(PrintError):
"""A job that is run periodically from a thread's main loop. run() is """A job that is run periodically from a thread's main loop. run() is
called from that thread's context. called from that thread's context.
@ -192,6 +211,44 @@ class DaemonThread(threading.Thread, PrintError):
self.running_lock = threading.Lock() self.running_lock = threading.Lock()
self.job_lock = threading.Lock() self.job_lock = threading.Lock()
self.jobs = [] self.jobs = []
self.coroutines = []
self.forever_coroutines_task = None
def add_coroutines(self, jobs):
for i in jobs: assert isinstance(i, CoroutineJob), i.__class__.__name__ + " does not inherit from CoroutineJob"
self.coroutines.extend(jobs)
def set_forever_coroutines(self, jobs):
for i in jobs: assert isinstance(i, ForeverCoroutineJob), i.__class__.__name__ + " does not inherit from ForeverCoroutineJob"
async def put():
await self.forever_coroutines_queue.put(jobs)
asyncio.run_coroutine_threadsafe(put(), self.loop)
def run_forever_coroutines(self):
self.forever_coroutines_queue = asyncio.Queue() # making queue here because __init__ is called from non-network thread
self.loop = asyncio.get_event_loop()
async def getFromQueueAndStart():
jobs = []
while True:
try:
jobs = await asyncio.wait_for(self.forever_coroutines_queue.get(), 1)
break
except asyncio.TimeoutError:
if not self.is_running(): break
continue
await asyncio.gather(*[i.run(self.is_running) for i in jobs])
self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart())
return self.forever_coroutines_task
async def run_coroutines(self):
for coroutine in self.coroutines:
assert isinstance(coroutine, CoroutineJob)
await coroutine.run()
def remove_coroutines(self, jobs):
for i in jobs: assert isinstance(i, CoroutineJob)
for job in jobs:
self.coroutines.remove(job)
def add_jobs(self, jobs): def add_jobs(self, jobs):
with self.job_lock: with self.job_lock:
@ -203,6 +260,7 @@ class DaemonThread(threading.Thread, PrintError):
# malformed or malicious server responses # malformed or malicious server responses
with self.job_lock: with self.job_lock:
for job in self.jobs: for job in self.jobs:
assert isinstance(job, ThreadJob)
try: try:
job.run() job.run()
except Exception as e: except Exception as e: