diff --git a/contrib/build-osx/make_osx b/contrib/build-osx/make_osx index f02f9f5d..599480e2 100755 --- a/contrib/build-osx/make_osx +++ b/contrib/build-osx/make_osx @@ -30,7 +30,7 @@ fail "Unable to use Python $PYTHON_VERSION" info "Installing pyinstaller" -python3 -m pip install git+https://github.com/ecdsa/pyinstaller@fix_2952 -I --user || fail "Could not install pyinstaller" +python3 -m pip install -I --user pyinstaller==3.4 || fail "Could not install pyinstaller" info "Using these versions for building $PACKAGE:" sw_vers diff --git a/contrib/build-wine/build-electrum-git.sh b/contrib/build-wine/build-electrum-git.sh index 0a059a5f..19a42f41 100755 --- a/contrib/build-wine/build-electrum-git.sh +++ b/contrib/build-wine/build-electrum-git.sh @@ -34,7 +34,7 @@ if ! which msgfmt > /dev/null 2>&1; then exit 1 fi for i in ./locale/*; do - dir=$WINEPREFIX/drive_c/electrum/electrum/locale/$i/LC_MESSAGES + dir=$WINEPREFIX/drive_c/electrum/electrum/$i/LC_MESSAGES mkdir -p $dir msgfmt --output-file=$dir/electrum.mo $i/electrum.po || true done diff --git a/contrib/build-wine/prepare-wine.sh b/contrib/build-wine/prepare-wine.sh index ffa31e62..3b5c49be 100755 --- a/contrib/build-wine/prepare-wine.sh +++ b/contrib/build-wine/prepare-wine.sh @@ -111,16 +111,13 @@ done # upgrade pip $PYTHON -m pip install pip --upgrade -# Install pywin32-ctypes (needed by pyinstaller) -$PYTHON -m pip install pywin32-ctypes==0.1.2 - # install PySocks $PYTHON -m pip install win_inet_pton==1.0.1 $PYTHON -m pip install -r $here/../deterministic-build/requirements-binaries.txt # Install PyInstaller -$PYTHON -m pip install https://github.com/ecdsa/pyinstaller/archive/fix_2952.zip +$PYTHON -m pip install pyinstaller==3.4 # Install ZBar download_if_not_exist $ZBAR_FILENAME "$ZBAR_URL" @@ -141,9 +138,6 @@ verify_hash $LIBUSB_FILENAME "$LIBUSB_SHA256" cp libusb/MS32/dll/libusb-1.0.dll $WINEPREFIX/drive_c/python$PYTHON_VERSION/ -# add dlls needed for pyinstaller: -cp $WINEPREFIX/drive_c/python$PYTHON_VERSION/Lib/site-packages/PyQt5/Qt/bin/* $WINEPREFIX/drive_c/python$PYTHON_VERSION/ - mkdir -p $WINEPREFIX/drive_c/tmp cp secp256k1/libsecp256k1.dll $WINEPREFIX/drive_c/tmp/ diff --git a/contrib/make_tgz b/contrib/make_tgz index 9e53dd28..6e5e173a 100755 --- a/contrib/make_tgz +++ b/contrib/make_tgz @@ -1 +1,11 @@ +#!/bin/bash + +contrib=$(dirname "$0") +packages="$contrib"/../packages/ + +if [ ! -d "$packages" ]; then + echo "Run make_packages first!" + exit 1 +fi + python3 setup.py sdist --format=zip,gztar diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt index 3fdf9eac..59fb22b4 100644 --- a/contrib/requirements/requirements.txt +++ b/contrib/requirements/requirements.txt @@ -6,6 +6,6 @@ protobuf dnspython jsonrpclib-pelix qdarkstyle<3.0 -aiorpcx>=0.7.1,<0.8 +aiorpcx>=0.8.1,<0.9 aiohttp aiohttp_socks diff --git a/electrum/address_synchronizer.py b/electrum/address_synchronizer.py index 0fbf4c3e..d4471de9 100644 --- a/electrum/address_synchronizer.py +++ b/electrum/address_synchronizer.py @@ -28,7 +28,7 @@ from collections import defaultdict from . import bitcoin from .bitcoin import COINBASE_MATURITY, TYPE_ADDRESS, TYPE_PUBKEY -from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus, aiosafe, CustomTaskGroup +from .util import PrintError, profiler, bfh, VerifiedTxInfo, TxMinedStatus, aiosafe, SilentTaskGroup from .transaction import Transaction, TxOutput from .synchronizer import Synchronizer from .verifier import SPV @@ -80,6 +80,12 @@ class AddressSynchronizer(PrintError): self.load_and_cleanup() + def with_transaction_lock(func): + def func_wrapper(self, *args, **kwargs): + with self.transaction_lock: + return func(self, *args, **kwargs) + return func_wrapper + def load_and_cleanup(self): self.load_transactions() self.load_local_history() @@ -140,7 +146,7 @@ class AddressSynchronizer(PrintError): @aiosafe async def on_default_server_changed(self, event): async with self.sync_restart_lock: - self.stop_threads() + self.stop_threads(write_to_disk=False) await self._start_threads() def start_network(self, network): @@ -157,7 +163,7 @@ class AddressSynchronizer(PrintError): self.verifier = SPV(self.network, self) self.synchronizer = synchronizer = Synchronizer(self) assert self.group is None, 'group already exists' - self.group = CustomTaskGroup() + self.group = SilentTaskGroup() async def job(): async with self.group as group: @@ -169,7 +175,7 @@ class AddressSynchronizer(PrintError): interface.session.unsubscribe(synchronizer.status_queue) await interface.group.spawn(job) - def stop_threads(self): + def stop_threads(self, write_to_disk=True): if self.network: self.synchronizer = None self.verifier = None @@ -177,9 +183,10 @@ class AddressSynchronizer(PrintError): asyncio.run_coroutine_threadsafe(self.group.cancel_remaining(), self.network.asyncio_loop) self.group = None self.storage.put('stored_height', self.get_local_height()) - self.save_transactions() - self.save_verified_tx() - self.storage.write() + if write_to_disk: + self.save_transactions() + self.save_verified_tx() + self.storage.write() def add_address(self, address): if address not in self.history: @@ -188,7 +195,7 @@ class AddressSynchronizer(PrintError): if self.synchronizer: self.synchronizer.add(address) - def get_conflicting_transactions(self, tx): + def get_conflicting_transactions(self, tx_hash, tx): """Returns a set of transaction hashes from the wallet history that are directly conflicting with tx, i.e. they have common outpoints being spent with tx. If the tx is already in wallet history, that will not be @@ -207,18 +214,18 @@ class AddressSynchronizer(PrintError): # this outpoint has already been spent, by spending_tx assert spending_tx_hash in self.transactions conflicting_txns |= {spending_tx_hash} - txid = tx.txid() - if txid in conflicting_txns: + if tx_hash in conflicting_txns: # this tx is already in history, so it conflicts with itself if len(conflicting_txns) > 1: raise Exception('Found conflicting transactions already in wallet history.') - conflicting_txns -= {txid} + conflicting_txns -= {tx_hash} return conflicting_txns def add_transaction(self, tx_hash, tx, allow_unrelated=False): assert tx_hash, tx_hash assert tx, tx assert tx.is_complete() + # assert tx_hash == tx.txid() # disabled as expensive; test done by Synchronizer. # we need self.transaction_lock but get_tx_height will take self.lock # so we need to take that too here, to enforce order of locks with self.lock, self.transaction_lock: @@ -243,7 +250,7 @@ class AddressSynchronizer(PrintError): # When this method exits, there must NOT be any conflict, so # either keep this txn and remove all conflicting (along with dependencies) # or drop this txn - conflicting_txns = self.get_conflicting_transactions(tx) + conflicting_txns = self.get_conflicting_transactions(tx_hash, tx) if conflicting_txns: existing_mempool_txn = any( self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) @@ -521,8 +528,7 @@ class AddressSynchronizer(PrintError): delta = tx_deltas[tx_hash] tx_mined_status = self.get_tx_height(tx_hash) history.append((tx_hash, tx_mined_status, delta)) - history.sort(key = lambda x: self.get_txpos(x[0])) - history.reverse() + history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True) # 3. add balance c, u, x = self.get_balance(domain) balance = c + u + x @@ -570,9 +576,12 @@ class AddressSynchronizer(PrintError): with self.lock: # tx will be verified only if height > 0 self.unverified_tx[tx_hash] = tx_height - # to remove pending proof requests: - if self.verifier: - self.verifier.remove_spv_proof_for_tx(tx_hash) + + def remove_unverified_tx(self, tx_hash, tx_height): + with self.lock: + new_height = self.unverified_tx.get(tx_hash) + if new_height == tx_height: + self.unverified_tx.pop(tx_hash, None) def add_verified_tx(self, tx_hash: str, info: VerifiedTxInfo): # Remove from the unverified map and add to the verified map @@ -580,7 +589,7 @@ class AddressSynchronizer(PrintError): self.unverified_tx.pop(tx_hash, None) self.verified_tx[tx_hash] = info tx_mined_status = self.get_tx_height(tx_hash) - self.network.trigger_callback('verified', tx_hash, tx_mined_status) + self.network.trigger_callback('verified', self, tx_hash, tx_mined_status) def get_unverified_txs(self): '''Returns a map from tx hash to transaction height''' @@ -651,6 +660,8 @@ class AddressSynchronizer(PrintError): def set_up_to_date(self, up_to_date): with self.lock: self.up_to_date = up_to_date + if self.network: + self.network.notify('status') if up_to_date: self.save_transactions(write=True) # if the verifier is also up to date, persist that too; @@ -661,8 +672,9 @@ class AddressSynchronizer(PrintError): def is_up_to_date(self): with self.lock: return self.up_to_date + @with_transaction_lock def get_tx_delta(self, tx_hash, address): - "effect of tx on address" + """effect of tx on address""" delta = 0 # substract the value of coins sent from address d = self.txi.get(tx_hash, {}).get(address, []) @@ -674,8 +686,9 @@ class AddressSynchronizer(PrintError): delta += v return delta + @with_transaction_lock def get_tx_value(self, txid): - " effect of tx on the entire domain" + """effect of tx on the entire domain""" delta = 0 for addr, d in self.txi.get(txid, {}).items(): for n, v in d: @@ -738,17 +751,18 @@ class AddressSynchronizer(PrintError): return is_relevant, is_mine, v, fee def get_addr_io(self, address): - h = self.get_address_history(address) - received = {} - sent = {} - for tx_hash, height in h: - l = self.txo.get(tx_hash, {}).get(address, []) - for n, v, is_cb in l: - received[tx_hash + ':%d'%n] = (height, v, is_cb) - for tx_hash, height in h: - l = self.txi.get(tx_hash, {}).get(address, []) - for txi, v in l: - sent[txi] = height + with self.lock, self.transaction_lock: + h = self.get_address_history(address) + received = {} + sent = {} + for tx_hash, height in h: + l = self.txo.get(tx_hash, {}).get(address, []) + for n, v, is_cb in l: + received[tx_hash + ':%d'%n] = (height, v, is_cb) + for tx_hash, height in h: + l = self.txi.get(tx_hash, {}).get(address, []) + for txi, v in l: + sent[txi] = height return received, sent def get_addr_utxo(self, address): diff --git a/electrum/blockchain.py b/electrum/blockchain.py index d1397318..b55a1522 100644 --- a/electrum/blockchain.py +++ b/electrum/blockchain.py @@ -22,6 +22,7 @@ # SOFTWARE. import os import threading +from typing import Optional from . import util from .bitcoin import Hash, hash_encode, int_to_hex, rev_hex @@ -36,29 +37,32 @@ except ImportError: util.print_msg("Warning: package scrypt not available; synchronization could be very slow") from .scrypt import scrypt_1024_1_1_80 as getPoWHash +HEADER_SIZE = 80 # bytes MAX_TARGET = 0x00000fffffffffffffffffffffffffffffffffffffffffffffffffffffffffff - class MissingHeader(Exception): pass + class InvalidHeader(Exception): pass -def serialize_header(res): - s = int_to_hex(res.get('version'), 4) \ - + rev_hex(res.get('prev_block_hash')) \ - + rev_hex(res.get('merkle_root')) \ - + int_to_hex(int(res.get('timestamp')), 4) \ - + int_to_hex(int(res.get('bits')), 4) \ - + int_to_hex(int(res.get('nonce')), 4) + +def serialize_header(header_dict: dict) -> str: + s = int_to_hex(header_dict['version'], 4) \ + + rev_hex(header_dict['prev_block_hash']) \ + + rev_hex(header_dict['merkle_root']) \ + + int_to_hex(int(header_dict['timestamp']), 4) \ + + int_to_hex(int(header_dict['bits']), 4) \ + + int_to_hex(int(header_dict['nonce']), 4) return s -def deserialize_header(s, height): + +def deserialize_header(s: bytes, height: int) -> dict: if not s: raise InvalidHeader('Invalid header: {}'.format(s)) - if len(s) != 80: + if len(s) != HEADER_SIZE: raise InvalidHeader('Invalid header length: {}'.format(len(s))) hex_to_int = lambda s: int('0x' + bh2u(s[::-1]), 16) h = {} @@ -71,24 +75,29 @@ def deserialize_header(s, height): h['block_height'] = height return h -def hash_header(header): + +def hash_header(header: dict) -> str: if header is None: return '0' * 64 if header.get('prev_block_hash') is None: - header['prev_block_hash'] = '00'*32 + header['prev_block_hash'] = '00' * 32 return hash_encode(Hash(bfh(serialize_header(header)))) + def pow_hash_header(header): return hash_encode(getPoWHash(bfh(serialize_header(header)))) + blockchains = {} +blockchains_lock = threading.Lock() + def read_blockchains(config): blockchains[0] = Blockchain(config, 0, None) fdir = os.path.join(util.get_headers_dir(config), 'forks') util.make_dir(fdir) l = filter(lambda x: x.startswith('fork_'), os.listdir(fdir)) - l = sorted(l, key = lambda x: int(x.split('_')[1])) + l = sorted(l, key=lambda x: int(x.split('_')[1])) for filename in l: forkpoint = int(filename.split('_')[2]) parent_id = int(filename.split('_')[1]) @@ -100,29 +109,14 @@ def read_blockchains(config): util.print_error("cannot connect", filename) return blockchains -def check_header(header): - if type(header) is not dict: - return False - for b in blockchains.values(): - if b.check_header(header): - return b - return False - -def can_connect(header): - for b in blockchains.values(): - if b.can_connect(header): - return b - return False - class Blockchain(util.PrintError): """ Manages blockchain headers and their verification """ - def __init__(self, config, forkpoint, parent_id): + def __init__(self, config, forkpoint: int, parent_id: int): self.config = config - self.catch_up = None # interface catching up self.forkpoint = forkpoint self.checkpoints = constants.net.CHECKPOINTS self.parent_id = parent_id @@ -137,24 +131,25 @@ class Blockchain(util.PrintError): return func(self, *args, **kwargs) return func_wrapper - def parent(self): + def parent(self) -> 'Blockchain': return blockchains[self.parent_id] - def get_max_child(self): - children = list(filter(lambda y: y.parent_id==self.forkpoint, blockchains.values())) + def get_max_child(self) -> Optional[int]: + with blockchains_lock: chains = list(blockchains.values()) + children = list(filter(lambda y: y.parent_id == self.forkpoint, chains)) return max([x.forkpoint for x in children]) if children else None - def get_forkpoint(self): + def get_forkpoint(self) -> int: mc = self.get_max_child() return mc if mc is not None else self.forkpoint - def get_branch_size(self): + def get_branch_size(self) -> int: return self.height() - self.get_forkpoint() + 1 - def get_name(self): + def get_name(self) -> str: return self.get_hash(self.get_forkpoint()).lstrip('00')[0:10] - def check_header(self, header): + def check_header(self, header: dict) -> bool: header_hash = hash_header(header) height = header.get('block_height') try: @@ -162,25 +157,25 @@ class Blockchain(util.PrintError): except MissingHeader: return False - def fork(parent, header): + def fork(parent, header: dict) -> 'Blockchain': forkpoint = header.get('block_height') self = Blockchain(parent.config, forkpoint, parent.forkpoint) open(self.path(), 'w+').close() self.save_header(header) return self - def height(self): + def height(self) -> int: return self.forkpoint + self.size() - 1 - def size(self): + def size(self) -> int: with self.lock: return self._size - def update_size(self): + def update_size(self) -> None: p = self.path() - self._size = os.path.getsize(p)//80 if os.path.exists(p) else 0 + self._size = os.path.getsize(p) // HEADER_SIZE if os.path.exists(p) else 0 - def verify_header(self, header, prev_hash, target, expected_header_hash=None): + def verify_header(self, header: dict, prev_hash: str, target: int, expected_header_hash: str=None) -> None: _hash = hash_header(header) _powhash = pow_hash_header(header) if expected_header_hash and expected_header_hash != _hash: @@ -189,8 +184,8 @@ class Blockchain(util.PrintError): raise Exception("prev hash mismatch: %s vs %s" % (prev_hash, header.get('prev_block_hash'))) if constants.net.TESTNET: return - #print("I'm inside verify_header") - #bits = self.target_to_bits(target) + # print("I'm inside verify_header") + # bits = self.target_to_bits(target) bits = target if bits != header.get('bits'): raise Exception("bits mismatch: %s vs %s" % (bits, header.get('bits'))) @@ -198,13 +193,14 @@ class Blockchain(util.PrintError): target_val = self.bits_to_target(bits) if int('0x' + _powhash, 16) > target_val: raise Exception("insufficient proof of work: %s vs target %s" % (int('0x' + _hash, 16), target_val)) - #print("I passed verify_header(). Calc target values have been matched") + # print("I passed verify_header(). Calc target values have been matched") + def verify_chunk(self, index, data): - num = len(data) // 80 + num = len(data) // HEADER_SIZE current_header = (index * 2016) # last = (index * 2016 + 2015) - print(index*2016) + print(index * 2016) prev_hash = self.get_hash(current_header - 1) for i in range(num): target = self.get_target(current_header - 1) @@ -212,7 +208,8 @@ class Blockchain(util.PrintError): expected_header_hash = self.get_hash(current_header) except MissingHeader: expected_header_hash = None - raw_header = data[i*80:(i+1) * 80] + + raw_header = data[i * HEADER_SIZE: (i + 1) * HEADER_SIZE] header = deserialize_header(raw_header, current_header) print(i) self.verify_header(header, prev_hash, target, expected_header_hash) @@ -222,31 +219,37 @@ class Blockchain(util.PrintError): def path(self): d = util.get_headers_dir(self.config) - filename = 'blockchain_headers' if self.parent_id is None else os.path.join('forks', 'fork_%d_%d'%(self.parent_id, self.forkpoint)) + if self.parent_id is None: + filename = 'blockchain_headers' + else: + basename = 'fork_%d_%d' % (self.parent_id, self.forkpoint) + filename = os.path.join('forks', basename) return os.path.join(d, filename) @with_lock - def save_chunk(self, index, chunk): + def save_chunk(self, index: int, chunk: bytes): #chunk_within_checkpoint_region = index < len(self.checkpoints) + # chunks in checkpoint region are the responsibility of the 'main chain' - #if chunk_within_checkpoint_region and self.parent_id is not None: + # if chunk_within_checkpoint_region and self.parent_id is not None: # main_chain = blockchains[0] # main_chain.save_chunk(index, chunk) # return #delta_height = (index * 2016 - self.forkpoint) - #delta_bytes = delta_height * 80 + #delta_bytes = delta_height * HEADER_SIZE + # if this chunk contains our forkpoint, only save the part after forkpoint # (the part before is the responsibility of the parent) - #if delta_bytes < 0: + # if delta_bytes < 0: # chunk = chunk[-delta_bytes:] # delta_bytes = 0 - #truncate = not chunk_within_checkpoint_region - #self.write(chunk, delta_bytes, truncate) + # truncate = not chunk_within_checkpoint_region + # self.write(chunk, delta_bytes, truncate) self.swap_with_parent() @with_lock - def swap_with_parent(self): + def swap_with_parent(self) -> None: if self.parent_id is None: return parent_branch_size = self.parent().height() - self.forkpoint + 1 @@ -261,26 +264,28 @@ class Blockchain(util.PrintError): my_data = f.read() self.assert_headers_file_available(parent.path()) with open(parent.path(), 'rb') as f: - f.seek((forkpoint - parent.forkpoint)*80) - parent_data = f.read(parent_branch_size*80) + f.seek((forkpoint - parent.forkpoint)*HEADER_SIZE) + parent_data = f.read(parent_branch_size*HEADER_SIZE) self.write(parent_data, 0) - parent.write(my_data, (forkpoint - parent.forkpoint)*80) + parent.write(my_data, (forkpoint - parent.forkpoint)*HEADER_SIZE) # store file path - for b in blockchains.values(): + with blockchains_lock: chains = list(blockchains.values()) + for b in chains: b.old_path = b.path() # swap parameters self.parent_id = parent.parent_id; parent.parent_id = parent_id self.forkpoint = parent.forkpoint; parent.forkpoint = forkpoint self._size = parent._size; parent._size = parent_branch_size # move files - for b in blockchains.values(): + for b in chains: if b in [self, parent]: continue if b.old_path != b.path(): self.print_error("renaming", b.old_path, b.path()) os.rename(b.old_path, b.path()) # update pointers - blockchains[self.forkpoint] = self - blockchains[parent.forkpoint] = parent + with blockchains_lock: + blockchains[self.forkpoint] = self + blockchains[parent.forkpoint] = parent def assert_headers_file_available(self, path): if os.path.exists(path): @@ -290,12 +295,12 @@ class Blockchain(util.PrintError): else: raise FileNotFoundError('Cannot find headers file but headers_dir is there. Should be at {}'.format(path)) - def write(self, data, offset, truncate=True): + def write(self, data: bytes, offset: int, truncate: bool=True) -> None: filename = self.path() with self.lock: self.assert_headers_file_available(filename) with open(filename, 'rb+') as f: - if truncate and offset != self._size*80: + if truncate and offset != self._size * HEADER_SIZE: f.seek(offset) f.truncate() f.seek(offset) @@ -305,16 +310,16 @@ class Blockchain(util.PrintError): self.update_size() @with_lock - def save_header(self, header): + def save_header(self, header: dict) -> None: delta = header.get('block_height') - self.forkpoint data = bfh(serialize_header(header)) # headers are only _appended_ to the end: assert delta == self.size() - assert len(data) == 80 - self.write(data, delta*80) + assert len(data) == HEADER_SIZE + self.write(data, delta*HEADER_SIZE) self.swap_with_parent() - def read_header(self, height): + def read_header(self, height: int) -> Optional[dict]: assert self.parent_id != self.forkpoint if height < 0: return @@ -326,15 +331,15 @@ class Blockchain(util.PrintError): name = self.path() self.assert_headers_file_available(name) with open(name, 'rb') as f: - f.seek(delta * 80) - h = f.read(80) - if len(h) < 80: + f.seek(delta * HEADER_SIZE) + h = f.read(HEADER_SIZE) + if len(h) < HEADER_SIZE: raise Exception('Expected to read a full header. This was only {} bytes'.format(len(h))) - if h == bytes([0])*80: + if h == bytes([0])*HEADER_SIZE: return None return deserialize_header(h, height) - def get_hash(self, height): + def get_hash(self, height: int) -> str: def is_height_checkpoint(): within_cp_range = height <= constants.net.max_checkpoint() at_chunk_boundary = (height+1) % 2016 == 0 @@ -354,7 +359,7 @@ class Blockchain(util.PrintError): raise MissingHeader(height) return hash_header(header) - def get_target(self, index): + def get_target(self, index: int) -> int: # compute target from chunk x, used in chunk x+1 if constants.net.TESTNET: return 0 @@ -406,7 +411,7 @@ class Blockchain(util.PrintError): bnNew = self.target_to_bits(int(bnNew)) return bnNew - def bits_to_target(self, bits): + def bits_to_target(self, bits: int) -> int: bitsN = (bits >> 24) & 0xff if not (bitsN >= 0x03 and bitsN <= 0x1e): raise BaseException("First part of bits should be in [0x03, 0x1e]") @@ -415,7 +420,7 @@ class Blockchain(util.PrintError): raise Exception("Second part of bits should be in [0x8000, 0x7fffff]") return bitsBase << (8 * (bitsN-3)) - def target_to_bits(self, target): + def target_to_bits(self, target: int) -> int: c = ("%064x" % target)[2:] while c[:2] == '00' and len(c) > 6: c = c[2:] @@ -425,12 +430,12 @@ class Blockchain(util.PrintError): bitsBase >>= 8 return bitsN << 24 | bitsBase - def can_connect(self, header, check_height=True): + def can_connect(self, header: dict, check_height: bool=True) -> bool: if header is None: return False height = header['block_height'] if check_height and self.height() != height - 1: - #self.print_error("cannot connect at height", height) + # self.print_error("cannot connect at height", height) return False if height == 0: return hash_header(header) == constants.net.GENESIS @@ -450,11 +455,11 @@ class Blockchain(util.PrintError): return False return True - def connect_chunk(self, idx, hexdata): + def connect_chunk(self, idx: int, hexdata: str) -> bool: try: data = bfh(hexdata) self.verify_chunk(idx, data) - #self.print_error("validated chunk %d" % idx) + # self.print_error("validated chunk %d" % idx) self.save_chunk(idx, data) return True except BaseException as e: @@ -471,6 +476,7 @@ class Blockchain(util.PrintError): cp.append((h, target)) return cp + def AveragingInterval(self, height): # V1 if height < constants.net.nHeight_Difficulty_Version2: @@ -534,3 +540,21 @@ class Blockchain(util.PrintError): assert len(data) == 80 self.write(data, delta * 80) # self.swap_with_parent() + + +def check_header(header: dict) -> Optional[Blockchain]: + if type(header) is not dict: + return None + with blockchains_lock: chains = list(blockchains.values()) + for b in chains: + if b.check_header(header): + return b + return None + + +def can_connect(header: dict) -> Optional[Blockchain]: + with blockchains_lock: chains = list(blockchains.values()) + for b in chains: + if b.can_connect(header): + return b + return None diff --git a/electrum/commands.py b/electrum/commands.py index 3730f88b..4807856e 100644 --- a/electrum/commands.py +++ b/electrum/commands.py @@ -853,6 +853,7 @@ def get_parser(): parser_gui.add_argument("-o", "--offline", action="store_true", dest="offline", default=False, help="Run offline") parser_gui.add_argument("-m", action="store_true", dest="hide_gui", default=False, help="hide GUI on startup") parser_gui.add_argument("-L", "--lang", dest="language", default=None, help="default language used in GUI") + parser_gui.add_argument("--daemon", action="store_true", dest="daemon", default=False, help="keep daemon running after GUI is closed") add_network_options(parser_gui) add_global_options(parser_gui) # daemon diff --git a/electrum/daemon.py b/electrum/daemon.py index c939d109..e867882b 100644 --- a/electrum/daemon.py +++ b/electrum/daemon.py @@ -120,7 +120,7 @@ def get_rpc_credentials(config): class Daemon(DaemonThread): - def __init__(self, config, fd, is_gui): + def __init__(self, config, fd): DaemonThread.__init__(self) self.config = config if config.get('offline'): @@ -133,12 +133,11 @@ class Daemon(DaemonThread): self.gui = None self.wallets = {} # Setup JSONRPC server - self.init_server(config, fd, is_gui) + self.init_server(config, fd) - def init_server(self, config, fd, is_gui): + def init_server(self, config, fd): host = config.get('rpchost', '127.0.0.1') port = config.get('rpcport', 0) - rpc_user, rpc_password = get_rpc_credentials(config) try: server = VerifyingJSONRPCServer((host, port), logRequests=False, @@ -153,14 +152,12 @@ class Daemon(DaemonThread): self.server = server server.timeout = 0.1 server.register_function(self.ping, 'ping') - if is_gui: - server.register_function(self.run_gui, 'gui') - else: - server.register_function(self.run_daemon, 'daemon') - self.cmd_runner = Commands(self.config, None, self.network) - for cmdname in known_commands: - server.register_function(getattr(self.cmd_runner, cmdname), cmdname) - server.register_function(self.run_cmdline, 'run_cmdline') + server.register_function(self.run_gui, 'gui') + server.register_function(self.run_daemon, 'daemon') + self.cmd_runner = Commands(self.config, None, self.network) + for cmdname in known_commands: + server.register_function(getattr(self.cmd_runner, cmdname), cmdname) + server.register_function(self.run_cmdline, 'run_cmdline') def ping(self): return True @@ -215,13 +212,12 @@ class Daemon(DaemonThread): def run_gui(self, config_options): config = SimpleConfig(config_options) if self.gui: - #if hasattr(self.gui, 'new_window'): - # path = config.get_wallet_path() - # self.gui.new_window(path, config.get('url')) - # response = "ok" - #else: - # response = "error: current GUI does not support multiple windows" - response = "error: Electrum GUI already running" + if hasattr(self.gui, 'new_window'): + path = config.get_wallet_path() + self.gui.new_window(path, config.get('url')) + response = "ok" + else: + response = "error: current GUI does not support multiple windows" else: response = "Error: Electrum is running in daemon mode. Please stop the daemon first." return response @@ -255,7 +251,8 @@ class Daemon(DaemonThread): return self.wallets.get(path) def stop_wallet(self, path): - wallet = self.wallets.pop(path) + wallet = self.wallets.pop(path, None) + if not wallet: return wallet.stop_threads() def run_cmdline(self, config_options): @@ -299,6 +296,8 @@ class Daemon(DaemonThread): self.on_stop() def stop(self): + if self.gui: + self.gui.stop() self.print_error("stopping, removing lockfile") remove_lockfile(get_lockfile(self.config)) DaemonThread.stop(self) diff --git a/electrum/ecc.py b/electrum/ecc.py index 0fd0f248..1470618f 100644 --- a/electrum/ecc.py +++ b/electrum/ecc.py @@ -38,6 +38,7 @@ from ecdsa.util import string_to_number, number_to_string from .util import bfh, bh2u, assert_bytes, print_error, to_bytes, InvalidPassword, profiler from .crypto import (Hash, aes_encrypt_with_iv, aes_decrypt_with_iv, hmac_oneshot) from .ecc_fast import do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1 +from . import msqr do_monkey_patching_of_python_ecdsa_internals_with_libsecp256k1() @@ -94,20 +95,19 @@ def point_to_ser(P, compressed=True) -> bytes: return bfh('04'+('%064x' % x)+('%064x' % y)) -def get_y_coord_from_x(x, odd=True): +def get_y_coord_from_x(x: int, odd: bool=True) -> int: curve = curve_secp256k1 _p = curve.p() _a = curve.a() _b = curve.b() - for offset in range(128): - Mx = x + offset - My2 = pow(Mx, 3, _p) + _a * pow(Mx, 2, _p) + _b % _p - My = pow(My2, (_p + 1) // 4, _p) - if curve.contains_point(Mx, My): - if odd == bool(My & 1): - return My - return _p - My - raise Exception('ECC_YfromX: No Y found') + x = x % _p + y2 = (pow(x, 3, _p) + _a * x + _b) % _p + y = msqr.modular_sqrt(y2, _p) + if curve.contains_point(x, y): + if odd == bool(y & 1): + return y + return _p - y + raise InvalidECPointException() def ser_to_point(ser: bytes) -> (int, int): diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py index 2e9dcd1a..8ebbc229 100644 --- a/electrum/exchange_rate.py +++ b/electrum/exchange_rate.py @@ -47,7 +47,8 @@ class ExchangeBase(PrintError): url = ''.join(['https://', site, get_string]) async with make_aiohttp_session(Network.get_instance().proxy) as session: async with session.get(url) as response: - return await response.json() + # set content_type to None to disable checking MIME type + return await response.json(content_type=None) async def get_csv(self, site, get_string): raw = await self.get_raw(site, get_string) @@ -445,13 +446,13 @@ class FxThread(ThreadJob): self.ccy_combo = None self.hist_checkbox = None self.cache_dir = os.path.join(config.path, 'cache') - self.trigger = asyncio.Event() - self.trigger.set() + self._trigger = asyncio.Event() + self._trigger.set() self.set_exchange(self.config_exchange()) make_dir(self.cache_dir) def set_proxy(self, trigger_name, *args): - self.trigger.set() + self._trigger.set() def get_currencies(self, h): d = get_exchanges_by_ccy(h) @@ -473,11 +474,11 @@ class FxThread(ThreadJob): async def run(self): while True: try: - await asyncio.wait_for(self.trigger.wait(), 150) + await asyncio.wait_for(self._trigger.wait(), 150) except concurrent.futures.TimeoutError: pass else: - self.trigger.clear() + self._trigger.clear() if self.is_enabled(): if self.show_history(): self.exchange.get_historical_rates(self.ccy, self.cache_dir) @@ -489,7 +490,7 @@ class FxThread(ThreadJob): def set_enabled(self, b): self.config.set_key('use_exchange_rate', bool(b)) - self.trigger.set() + self.trigger_update() def get_history_config(self): return bool(self.config.get('history_rates')) @@ -522,9 +523,13 @@ class FxThread(ThreadJob): def set_currency(self, ccy): self.ccy = ccy self.config.set_key('currency', ccy, True) - self.trigger.set() # Because self.ccy changes + self.trigger_update() self.on_quotes() + def trigger_update(self): + if self.network: + self.network.asyncio_loop.call_soon_threadsafe(self._trigger.set) + def set_exchange(self, name): class_ = globals().get(name, BitcoinAverage) self.print_error("using exchange", name) @@ -533,7 +538,7 @@ class FxThread(ThreadJob): self.exchange = class_(self.on_quotes, self.on_history) # A new exchange means new fx quotes, initially empty. Force # a quote refresh - self.trigger.set() + self.trigger_update() self.exchange.read_historical_rates(self.ccy, self.cache_dir) def on_quotes(self): diff --git a/electrum/gui/kivy/main_window.py b/electrum/gui/kivy/main_window.py index 0dbb5280..ec3320f7 100644 --- a/electrum/gui/kivy/main_window.py +++ b/electrum/gui/kivy/main_window.py @@ -490,7 +490,8 @@ class ElectrumWindow(App): activity.bind(on_new_intent=self.on_new_intent) # connect callbacks if self.network: - interests = ['updated', 'status', 'new_transaction', 'verified', 'interfaces'] + interests = ['wallet_updated', 'network_updated', 'blockchain_updated', + 'status', 'new_transaction', 'verified'] self.network.register_callback(self.on_network_event, interests) self.network.register_callback(self.on_fee, ['fee']) self.network.register_callback(self.on_fee_histogram, ['fee_histogram']) @@ -669,11 +670,15 @@ class ElectrumWindow(App): def on_network_event(self, event, *args): Logger.info('network event: '+ event) - if event == 'interfaces': + if event == 'network_updated': self._trigger_update_interfaces() - elif event == 'updated': + self._trigger_update_status() + elif event == 'wallet_updated': self._trigger_update_wallet() self._trigger_update_status() + elif event == 'blockchain_updated': + # to update number of confirmations in history + self._trigger_update_wallet() elif event == 'status': self._trigger_update_status() elif event == 'new_transaction': diff --git a/electrum/gui/kivy/nfc_scanner/__init__.py b/electrum/gui/kivy/nfc_scanner/__init__.py index 81084a64..d9935d23 100644 --- a/electrum/gui/kivy/nfc_scanner/__init__.py +++ b/electrum/gui/kivy/nfc_scanner/__init__.py @@ -1,3 +1,7 @@ +from kivy.uix.widget import Widget +from kivy.properties import ObjectProperty +from kivy.core import core_select_lib + __all__ = ('NFCBase', 'NFCScanner') class NFCBase(Widget): diff --git a/electrum/gui/kivy/nfc_scanner/scanner_android.py b/electrum/gui/kivy/nfc_scanner/scanner_android.py index 32ffda16..84e6d418 100644 --- a/electrum/gui/kivy/nfc_scanner/scanner_android.py +++ b/electrum/gui/kivy/nfc_scanner/scanner_android.py @@ -117,8 +117,8 @@ class ScannerAndroid(NFCBase): recTypes = [] for record in ndefrecords: recTypes.append({ - 'type': ''.join(map(unichr, record.getType())), - 'payload': ''.join(map(unichr, record.getPayload())) + 'type': ''.join(map(chr, record.getType())), + 'payload': ''.join(map(chr, record.getPayload())) }) details['recTypes'] = recTypes diff --git a/electrum/gui/kivy/nfc_scanner/scanner_dummy.py b/electrum/gui/kivy/nfc_scanner/scanner_dummy.py index a0d3e264..3b3faf1a 100644 --- a/electrum/gui/kivy/nfc_scanner/scanner_dummy.py +++ b/electrum/gui/kivy/nfc_scanner/scanner_dummy.py @@ -3,6 +3,7 @@ from . import NFCBase from kivy.clock import Clock from kivy.logger import Logger +from kivy.app import App class ScannerDummy(NFCBase): '''This is the dummy interface that gets selected in case any other diff --git a/electrum/gui/kivy/uix/dialogs/nfc_transaction.py b/electrum/gui/kivy/uix/dialogs/nfc_transaction.py index f6dfd579..981f208c 100644 --- a/electrum/gui/kivy/uix/dialogs/nfc_transaction.py +++ b/electrum/gui/kivy/uix/dialogs/nfc_transaction.py @@ -1,4 +1,8 @@ -class NFCTransactionDialog(AnimatedPopup): +from kivy.properties import ObjectProperty, OptionProperty +from kivy.factory import Factory + + +class NFCTransactionDialog(Factory.AnimatedPopup): mode = OptionProperty('send', options=('send','receive')) @@ -19,14 +23,14 @@ class NFCTransactionDialog(AnimatedPopup): sctr = self.ids.sctr if value: def _cmp(*l): - anim = Animation(rotation=2, scale=1, opacity=1) + anim = Factory.Animation(rotation=2, scale=1, opacity=1) anim.start(sctr) anim.bind(on_complete=_start) def _start(*l): - anim = Animation(rotation=350, scale=2, opacity=0) + anim = Factory.Animation(rotation=350, scale=2, opacity=0) anim.start(sctr) anim.bind(on_complete=_cmp) _start() return - Animation.cancel_all(sctr) \ No newline at end of file + Factory.Animation.cancel_all(sctr) diff --git a/electrum/gui/kivy/uix/drawer.py b/electrum/gui/kivy/uix/drawer.py index 49a9c399..ade3bd10 100644 --- a/electrum/gui/kivy/uix/drawer.py +++ b/electrum/gui/kivy/uix/drawer.py @@ -10,6 +10,7 @@ from kivy.factory import Factory from kivy.properties import OptionProperty, NumericProperty, ObjectProperty from kivy.clock import Clock from kivy.lang import Builder +from kivy.logger import Logger import gc diff --git a/electrum/gui/kivy/uix/menus.py b/electrum/gui/kivy/uix/menus.py deleted file mode 100644 index a7cdaefe..00000000 --- a/electrum/gui/kivy/uix/menus.py +++ /dev/null @@ -1,95 +0,0 @@ -from functools import partial - -from kivy.animation import Animation -from kivy.core.window import Window -from kivy.clock import Clock -from kivy.uix.bubble import Bubble, BubbleButton -from kivy.properties import ListProperty -from kivy.uix.widget import Widget - -from ..i18n import _ - -class ContextMenuItem(Widget): - '''abstract class - ''' - -class ContextButton(ContextMenuItem, BubbleButton): - pass - -class ContextMenu(Bubble): - - buttons = ListProperty([_('ok'), _('cancel')]) - '''List of Buttons to be displayed at the bottom''' - - __events__ = ('on_press', 'on_release') - - def __init__(self, **kwargs): - self._old_buttons = self.buttons - super(ContextMenu, self).__init__(**kwargs) - self.on_buttons(self, self.buttons) - - def on_touch_down(self, touch): - if not self.collide_point(*touch.pos): - self.hide() - return - return super(ContextMenu, self).on_touch_down(touch) - - def on_buttons(self, _menu, value): - if 'menu_content' not in self.ids.keys(): - return - if value == self._old_buttons: - return - blayout = self.ids.menu_content - blayout.clear_widgets() - for btn in value: - ib = ContextButton(text=btn) - ib.bind(on_press=partial(self.dispatch, 'on_press')) - ib.bind(on_release=partial(self.dispatch, 'on_release')) - blayout.add_widget(ib) - self._old_buttons = value - - def on_press(self, instance): - pass - - def on_release(self, instance): - pass - - def show(self, pos, duration=0): - Window.add_widget(self) - # wait for the bubble to adjust it's size according to text then animate - Clock.schedule_once(lambda dt: self._show(pos, duration)) - - def _show(self, pos, duration): - def on_stop(*l): - if duration: - Clock.schedule_once(self.hide, duration + .5) - - self.opacity = 0 - arrow_pos = self.arrow_pos - if arrow_pos[0] in ('l', 'r'): - pos = pos[0], pos[1] - (self.height/2) - else: - pos = pos[0] - (self.width/2), pos[1] - - self.limit_to = Window - - anim = Animation(opacity=1, pos=pos, d=.32) - anim.bind(on_complete=on_stop) - anim.cancel_all(self) - anim.start(self) - - - def hide(self, *dt): - - def on_stop(*l): - Window.remove_widget(self) - anim = Animation(opacity=0, d=.25) - anim.bind(on_complete=on_stop) - anim.cancel_all(self) - anim.start(self) - - def add_widget(self, widget, index=0): - if not isinstance(widget, ContextMenuItem): - super(ContextMenu, self).add_widget(widget, index) - return - menu_content.add_widget(widget, index) diff --git a/electrum/gui/qt/__init__.py b/electrum/gui/qt/__init__.py index 3fd204d7..80adeaa9 100644 --- a/electrum/gui/qt/__init__.py +++ b/electrum/gui/qt/__init__.py @@ -45,7 +45,7 @@ from electrum.base_wizard import GoBack # from electrum.synchronizer import Synchronizer # from electrum.verifier import SPV # from electrum.util import DebugMem -from electrum.util import (UserCancelled, print_error, +from electrum.util import (UserCancelled, PrintError, WalletFileException, BitcoinException) # from electrum.wallet import Abstract_Wallet @@ -86,7 +86,7 @@ class QNetworkUpdatedSignalObject(QObject): network_updated_signal = pyqtSignal(str, object) -class ElectrumGui: +class ElectrumGui(PrintError): def __init__(self, config, daemon, plugins): set_language(config.get('language')) @@ -128,7 +128,7 @@ class ElectrumGui: self.app.setStyleSheet(qdarkstyle.load_stylesheet_pyqt5()) except BaseException as e: use_dark_theme = False - print_error('Error setting dark theme: {}'.format(e)) + self.print_error('Error setting dark theme: {}'.format(e)) # Even if we ourselves don't set the dark theme, # the OS/window manager/etc might set *a dark theme*. # Hence, try to choose colors accordingly: @@ -222,7 +222,7 @@ class ElectrumGui: except UserCancelled: pass except GoBack as e: - print_error('[start_new_window] Exception caught (GoBack)', e) + self.print_error('[start_new_window] Exception caught (GoBack)', e) except (WalletFileException, BitcoinException) as e: traceback.print_exc(file=sys.stderr) d = QMessageBox(QMessageBox.Warning, _('Error'), @@ -267,6 +267,7 @@ class ElectrumGui: if not self.windows: self.config.save_last_wallet(window.wallet) run_hook('on_close_window', window) + self.daemon.stop_wallet(window.wallet.storage.path) def init_network(self): # Show network dialog if config does not exist @@ -309,6 +310,14 @@ class ElectrumGui: self.tray.hide() self.app.aboutToQuit.connect(clean_up) + # keep daemon running after close + if self.config.get('daemon'): + self.app.setQuitOnLastWindowClosed(False) + # main loop self.app.exec_() # on some platforms the exec_ call may not return, so use clean_up() + + def stop(self): + self.print_error('closing GUI') + self.app.quit() diff --git a/electrum/gui/qt/history_list.py b/electrum/gui/qt/history_list.py index 50e9bea9..0566a1b5 100644 --- a/electrum/gui/qt/history_list.py +++ b/electrum/gui/qt/history_list.py @@ -25,6 +25,7 @@ import webbrowser import datetime +from datetime import date from electrum.address_synchronizer import TX_HEIGHT_LOCAL from .util import * @@ -220,7 +221,6 @@ class HistoryList(MyTreeWidget, AcceptFileDragDrop): self.transactions = r['transactions'] self.summary = r['summary'] if not self.years and self.transactions: - from datetime import date start_date = self.transactions[0].get('date') or date.today() end_date = self.transactions[-1].get('date') or date.today() self.years = [str(i) for i in range(start_date.year, end_date.year + 1)] @@ -317,7 +317,7 @@ class HistoryList(MyTreeWidget, AcceptFileDragDrop): conf = tx_mined_status.conf status, status_str = self.wallet.get_tx_status(tx_hash, tx_mined_status) icon = self.icon_cache.get(":icons/" + TX_ICONS[status]) - items = self.findItems(tx_hash, Qt.UserRole|Qt.MatchContains|Qt.MatchRecursive, column=1) + items = self.findItems(tx_hash, Qt.MatchExactly, column=1) if items: item = items[0] item.setIcon(0, icon) diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 5c3c3847..c71e128b 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -107,6 +107,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): self.setup_exception_hook() self.network = gui_object.daemon.network + self.wallet = wallet self.fx = gui_object.daemon.fx self.invoices = wallet.invoices self.contacts = wallet.contacts @@ -188,8 +189,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): # network callbacks if self.network: self.network_signal.connect(self.on_network_qt) - interests = ['updated', 'new_transaction', 'status', - 'banner', 'verified', 'fee'] + interests = ['wallet_updated', 'network_updated', 'blockchain_updated', + 'new_transaction', 'status', + 'banner', 'verified', 'fee', 'fee_histogram'] # To avoid leaking references to "self" that prevent the # window from being GC-ed when closed, callbacks should be # methods of this class only, and specifically not be @@ -295,15 +297,22 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): self.show_error(str(exc_info[1])) def on_network(self, event, *args): - if event == 'updated': - self.need_update.set() + if event == 'wallet_updated': + wallet = args[0] + if wallet == self.wallet: + self.need_update.set() + elif event == 'network_updated': self.gui_object.network_updated_signal_obj.network_updated_signal \ .emit(event, args) + self.network_signal.emit('status', None) + elif event == 'blockchain_updated': + # to update number of confirmations in history + self.need_update.set() elif event == 'new_transaction': - # FIXME maybe this event should also include which wallet - # the tx is for. now all wallets get this. - self.tx_notification_queue.put(args[0]) - elif event in ['status', 'banner', 'verified', 'fee']: + wallet, tx = args + if wallet == self.wallet: + self.tx_notification_queue.put(tx) + elif event in ['status', 'banner', 'verified', 'fee', 'fee_histogram']: # Handle in GUI thread self.network_signal.emit(event, args) else: @@ -316,7 +325,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): elif event == 'banner': self.console.showMessage(args[0]) elif event == 'verified': - self.history_list.update_item(*args) + wallet, tx_hash, tx_mined_status = args + if wallet == self.wallet: + self.history_list.update_item(tx_hash, tx_mined_status) elif event == 'fee': if self.config.is_dynfee(): self.fee_slider.update() @@ -350,12 +361,10 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): @profiler def load_wallet(self, wallet): wallet.thread = TaskThread(self, self.on_error) - self.wallet = wallet self.update_recently_visited(wallet.storage.path) - # address used to create a dummy transaction and estimate transaction fee - self.history_list.update() - self.address_list.update() - self.utxo_list.update() + # update(==init) all tabs; expensive for large wallets.. + # so delay it somewhat, hence __init__ can finish and the window can appear sooner + QTimer.singleShot(50, self.update_tabs) self.need_update.set() # Once GUI has been initialized check if we want to announce something since the callback has been called before the GUI was initialized # update menus @@ -443,6 +452,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if filename in recent: recent.remove(filename) recent.insert(0, filename) + recent = [path for path in recent if os.path.exists(path)] recent = recent[:5] self.config.set_key('recently_open', recent) self.recently_visited_menu.clear() @@ -588,13 +598,13 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): self.show_message(msg, title="Electrum - " + _("Reporting Bugs")) def notify_transactions(self): - # note: during initial history sync for a wallet, many txns will be - # received multiple times. hence the "total amount received" will be - # a lot higher than should be. this is expected though not intended if self.tx_notification_queue.qsize() == 0: return + if not self.wallet.up_to_date: + return # no notifications while syncing now = time.time() - if self.tx_notification_last_time + 5 > now: + rate_limit = 20 # seconds + if self.tx_notification_last_time + rate_limit > now: return self.tx_notification_last_time = now self.print_error("Notifying GUI about new transactions") @@ -609,14 +619,14 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): total_amount = 0 for tx in txns: is_relevant, is_mine, v, fee = self.wallet.get_wallet_delta(tx) - if v > 0: + if is_relevant: total_amount += v self.notify(_("{} new transactions received: Total amount received in the new transactions {}") .format(len(txns), self.format_amount_and_units(total_amount))) else: for tx in txns: is_relevant, is_mine, v, fee = self.wallet.get_wallet_delta(tx) - if v > 0: + if is_relevant: self.notify(_("New transaction received: {}").format(self.format_amount_and_units(v))) def notify(self, message): @@ -661,7 +671,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): self.do_update_fee() self.require_fee_update = False self.notify_transactions() - def format_amount(self, x, is_diff=False, whitespaces=False): return format_satoshis(x, self.num_zeros, self.decimal_point, is_diff=is_diff, whitespaces=whitespaces) @@ -766,7 +775,6 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): self.balance_label.setText(text) self.status_button.setIcon( icon ) - def update_wallet(self): self.update_status() if self.wallet.up_to_date or not self.network or not self.network.is_connected(): @@ -2280,8 +2288,8 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): try: public_key = ecc.ECPubkey(bfh(pubkey_e.text())) except BaseException as e: - traceback.print_exc(file=sys.stdout) - self.show_warning(_('Invalid Public key')) + traceback.print_exc(file=sys.stdout) + self.show_warning(_('Invalid Public key')) return encrypted = public_key.encrypt_message(message) encrypted_e.setText(encrypted.decode('ascii')) @@ -2943,9 +2951,11 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): exchanges = self.fx.get_exchanges_by_ccy(c, h) else: exchanges = self.fx.get_exchanges_by_ccy('USD', False) + ex_combo.blockSignals(True) ex_combo.clear() ex_combo.addItems(sorted(exchanges)) ex_combo.setCurrentIndex(ex_combo.findText(self.fx.config_exchange())) + ex_combo.blockSignals(False) def on_currency(hh): if not self.fx: return @@ -2969,8 +2979,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): update_exchanges() self.history_list.refresh_headers() if self.fx.is_enabled() and checked: - # reset timeout to get historical rates - self.fx.timeout = 0 + self.fx.trigger_update() update_history_capgains_cb() def on_history_capgains(checked): @@ -3032,7 +3041,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): d.exec_() if self.fx: - self.fx.timeout = 0 + self.fx.trigger_update() self.alias_received_signal.disconnect(set_alias_color) @@ -3191,9 +3200,9 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): tx_size = tx.estimated_size() d = WindowModalDialog(self, _('Bump Fee')) vbox = QVBoxLayout(d) + vbox.addWidget(WWLabel(_("Increase your transaction's fee to improve its position in mempool."))) vbox.addWidget(QLabel(_('Current fee') + ': %s'% self.format_amount(fee) + ' ' + self.base_unit())) vbox.addWidget(QLabel(_('New fee' + ':'))) - fee_e = BTCAmountEdit(self.get_decimal_point) fee_e.setAmount(fee * 1.5) vbox.addWidget(fee_e) diff --git a/electrum/gui/qt/network_dialog.py b/electrum/gui/qt/network_dialog.py index b9b185e8..e58148ac 100644 --- a/electrum/gui/qt/network_dialog.py +++ b/electrum/gui/qt/network_dialog.py @@ -52,7 +52,7 @@ class NetworkDialog(QDialog): vbox.addLayout(Buttons(CloseButton(self))) self.network_updated_signal_obj.network_updated_signal.connect( self.on_update) - network.register_callback(self.on_network, ['updated', 'interfaces']) + network.register_callback(self.on_network, ['network_updated']) def on_network(self, event, *args): self.network_updated_signal_obj.network_updated_signal.emit(event, args) @@ -126,6 +126,8 @@ class NodesListWidget(QTreeWidget): h.setSectionResizeMode(0, QHeaderView.Stretch) h.setSectionResizeMode(1, QHeaderView.ResizeToContents) + super().update() + class ServerListWidget(QTreeWidget): @@ -180,6 +182,8 @@ class ServerListWidget(QTreeWidget): h.setSectionResizeMode(0, QHeaderView.Stretch) h.setSectionResizeMode(1, QHeaderView.ResizeToContents) + super().update() + class NetworkChoiceLayout(object): diff --git a/electrum/gui/qt/paytoedit.py b/electrum/gui/qt/paytoedit.py index 376303b9..d97b4d94 100644 --- a/electrum/gui/qt/paytoedit.py +++ b/electrum/gui/qt/paytoedit.py @@ -237,7 +237,7 @@ class PayToEdit(CompletionTextEdit, ScanQRTextEdit): #if self.win.config.get('openalias_autoadd') == 'checked': self.win.contacts[key] = ('openalias', name) - self.win.contact_list.on_update() + self.win.contact_list.update() self.setFrozen(True) if data.get('type') == 'openalias': diff --git a/electrum/gui/stdio.py b/electrum/gui/stdio.py index cbb0d340..dc547765 100644 --- a/electrum/gui/stdio.py +++ b/electrum/gui/stdio.py @@ -37,7 +37,7 @@ class ElectrumGui: self.wallet.start_network(self.network) self.contacts = self.wallet.contacts - self.network.register_callback(self.on_network, ['updated', 'banner']) + self.network.register_callback(self.on_network, ['wallet_updated', 'network_updated', 'banner']) self.commands = [_("[h] - displays this help text"), \ _("[i] - display transaction history"), \ _("[o] - enter payment order"), \ @@ -50,7 +50,7 @@ class ElectrumGui: self.num_commands = len(self.commands) def on_network(self, event, *args): - if event == 'updated': + if event in ['wallet_updated', 'network_updated']: self.updated() elif event == 'banner': self.print_banner() @@ -88,7 +88,7 @@ class ElectrumGui: + "%d"%(width[2]+delta)+"s"+"%"+"%d"%(width[3]+delta)+"s" messages = [] - for tx_hash, tx_mined_status, delta, balance in self.wallet.get_history(): + for tx_hash, tx_mined_status, delta, balance in reversed(self.wallet.get_history()): if tx_mined_status.conf: timestamp = tx_mined_status.timestamp try: diff --git a/electrum/gui/text.py b/electrum/gui/text.py index 98134013..1bfcc4ef 100644 --- a/electrum/gui/text.py +++ b/electrum/gui/text.py @@ -62,7 +62,7 @@ class ElectrumGui: self.history = None if self.network: - self.network.register_callback(self.update, ['updated']) + self.network.register_callback(self.update, ['wallet_updated', 'network_updated']) self.tab_names = [_("History"), _("Send"), _("Receive"), _("Addresses"), _("Contacts"), _("Banner")] self.num_tabs = len(self.tab_names) @@ -182,8 +182,10 @@ class ElectrumGui: self.maxpos = 6 def print_banner(self): - if self.network: - self.print_list( self.network.banner.split('\n')) + if self.network and self.network.banner: + banner = self.network.banner + banner = banner.replace('\r', '') + self.print_list(banner.split('\n')) def print_qr(self, data): import qrcode @@ -198,9 +200,15 @@ class ElectrumGui: self.qr.print_ascii(out=s, invert=False) msg = s.getvalue() lines = msg.split('\n') - for i, l in enumerate(lines): - l = l.encode("utf-8") - self.stdscr.addstr(i+5, 5, l, curses.color_pair(3)) + try: + for i, l in enumerate(lines): + l = l.encode("utf-8") + self.stdscr.addstr(i+5, 5, l, curses.color_pair(3)) + except curses.error: + m = 'error. screen too small?' + m = m.encode(self.encoding) + self.stdscr.addstr(5, 1, m, 0) + def print_list(self, lst, firstline = None): lst = list(lst) @@ -301,19 +309,22 @@ class ElectrumGui: def main(self): tty.setraw(sys.stdin) - while self.tab != -1: - self.run_tab(0, self.print_history, self.run_history_tab) - self.run_tab(1, self.print_send_tab, self.run_send_tab) - self.run_tab(2, self.print_receive, self.run_receive_tab) - self.run_tab(3, self.print_addresses, self.run_banner_tab) - self.run_tab(4, self.print_contacts, self.run_contacts_tab) - self.run_tab(5, self.print_banner, self.run_banner_tab) - - tty.setcbreak(sys.stdin) - curses.nocbreak() - self.stdscr.keypad(0) - curses.echo() - curses.endwin() + try: + while self.tab != -1: + self.run_tab(0, self.print_history, self.run_history_tab) + self.run_tab(1, self.print_send_tab, self.run_send_tab) + self.run_tab(2, self.print_receive, self.run_receive_tab) + self.run_tab(3, self.print_addresses, self.run_banner_tab) + self.run_tab(4, self.print_contacts, self.run_contacts_tab) + self.run_tab(5, self.print_banner, self.run_banner_tab) + except curses.error as e: + raise Exception("Error with curses. Is your screen too small?") from e + finally: + tty.setcbreak(sys.stdin) + curses.nocbreak() + self.stdscr.keypad(0) + curses.echo() + curses.endwin() def do_clear(self): diff --git a/electrum/interface.py b/electrum/interface.py index 47fdf308..aa2c2ce7 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -29,16 +29,18 @@ import sys import traceback import asyncio from typing import Tuple, Union +from collections import defaultdict import aiorpcx from aiorpcx import ClientSession, Notification -from .util import PrintError, aiosafe, bfh, AIOSafeSilentException, CustomTaskGroup +from .util import PrintError, aiosafe, bfh, AIOSafeSilentException, SilentTaskGroup from . import util from . import x509 from . import pem from .version import ELECTRUM_VERSION, PROTOCOL_VERSION from . import blockchain +from .blockchain import Blockchain from . import constants @@ -46,8 +48,11 @@ class NotificationSession(ClientSession): def __init__(self, *args, **kwargs): super(NotificationSession, self).__init__(*args, **kwargs) - self.subscriptions = {} + self.subscriptions = defaultdict(list) self.cache = {} + self.in_flight_requests_semaphore = asyncio.Semaphore(100) + # disable bandwidth limiting (used by superclass): + self.bw_limit = 0 async def handle_request(self, request): # note: if server sends malformed request and we raise, the superclass @@ -63,19 +68,26 @@ class NotificationSession(ClientSession): assert False, request.method async def send_request(self, *args, timeout=-1, **kwargs): + # note: the timeout starts after the request touches the wire! if timeout == -1: timeout = 20 if not self.proxy else 30 - return await asyncio.wait_for( - super().send_request(*args, **kwargs), - timeout) + # note: the semaphore implementation guarantees no starvation + async with self.in_flight_requests_semaphore: + try: + return await asyncio.wait_for( + super().send_request(*args, **kwargs), + timeout) + except asyncio.TimeoutError as e: + raise GracefulDisconnect('request timed out: {}'.format(args)) from e async def subscribe(self, method, params, queue): + # note: until the cache is written for the first time, + # each 'subscribe' call might make a request on the network. key = self.get_index(method, params) - if key in self.subscriptions: - self.subscriptions[key].append(queue) + self.subscriptions[key].append(queue) + if key in self.cache: result = self.cache[key] else: - self.subscriptions[key] = [queue] result = await self.send_request(method, params) self.cache[key] = result await queue.put(params + [result]) @@ -94,8 +106,7 @@ class NotificationSession(ClientSession): return str(method) + repr(params) -# FIXME this is often raised inside a TaskGroup, but then it's not silent :( -class GracefulDisconnect(AIOSafeSilentException): pass +class GracefulDisconnect(Exception): pass class ErrorParsingSSLCert(Exception): pass @@ -104,10 +115,11 @@ class ErrorParsingSSLCert(Exception): pass class ErrorGettingSSLCertFromServer(Exception): pass - def deserialize_server(server_str: str) -> Tuple[str, str, str]: # host might be IPv6 address, hence do rsplit: host, port, protocol = str(server_str).rsplit(':', 2) + if not host: + raise ValueError('host must not be empty') if protocol not in ('s', 't'): raise ValueError('invalid network protocol: {}'.format(protocol)) int(port) # Throw if cannot be converted to int @@ -131,15 +143,21 @@ class Interface(PrintError): self.config_path = config_path self.cert_path = os.path.join(self.config_path, 'certs', self.host) self.blockchain = None + self._requested_chunks = set() self.network = network + self._set_proxy(proxy) self.tip_header = None self.tip = 0 # TODO combine? self.fut = asyncio.get_event_loop().create_task(self.run()) - self.group = CustomTaskGroup() + self.group = SilentTaskGroup() + def diagnostic_name(self): + return self.host + + def _set_proxy(self, proxy: dict): if proxy: username, pw = proxy.get('user'), proxy.get('password') if not username or not pw: @@ -151,13 +169,10 @@ class Interface(PrintError): elif proxy['mode'] == "socks5": self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth) else: - raise NotImplementedError # http proxy not available with aiorpcx + raise NotImplementedError # http proxy not available with aiorpcx else: self.proxy = None - def diagnostic_name(self): - return self.host - async def is_server_ca_signed(self, sslc): try: await self.open_session(sslc, exit_early=True) @@ -224,7 +239,17 @@ class Interface(PrintError): sslc.check_hostname = 0 return sslc + def handle_graceful_disconnect(func): + async def wrapper_func(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except GracefulDisconnect as e: + self.print_error("disconnecting gracefully. {}".format(e)) + self.exception = e + return wrapper_func + @aiosafe + @handle_graceful_disconnect async def run(self): try: ssl_context = await self._get_ssl_context() @@ -241,17 +266,22 @@ class Interface(PrintError): assert False def mark_ready(self): + if self.ready.cancelled(): + raise GracefulDisconnect('conn establishment was too slow; *ready* future was cancelled') + if self.ready.done(): + return + assert self.tip_header chain = blockchain.check_header(self.tip_header) if not chain: self.blockchain = blockchain.blockchains[0] else: self.blockchain = chain + assert self.blockchain is not None self.print_error("set blockchain with height", self.blockchain.height()) - if not self.ready.done(): - self.ready.set_result(1) + self.ready.set_result(1) async def save_certificate(self): if not os.path.exists(self.cert_path): @@ -285,15 +315,32 @@ class Interface(PrintError): async def get_block_header(self, height, assert_mode): # use lower timeout as we usually have network.bhi_lock here + self.print_error('requesting block header {} in mode {}'.format(height, assert_mode)) timeout = 5 if not self.proxy else 10 res = await self.session.send_request('blockchain.block.header', [height], timeout=timeout) return blockchain.deserialize_header(bytes.fromhex(res), height) - async def request_chunk(self, idx, tip): - return await self.network.request_chunk(idx, tip, self.session) + async def request_chunk(self, height, tip=None, *, can_return_early=False): + index = height // 2016 + if can_return_early and index in self._requested_chunks: + return + self.print_error("requesting chunk from height {}".format(height)) + size = 2016 + if tip is not None: + size = min(size, tip - index * 2016 + 1) + size = max(size, 0) + try: + self._requested_chunks.add(index) + res = await self.session.send_request('blockchain.block.headers', [index * 2016, size]) + finally: + try: self._requested_chunks.remove(index) + except KeyError: pass + conn = self.blockchain.connect_chunk(index, res['hex']) + if not conn: + return conn, 0 + return conn, res['count'] async def open_session(self, sslc, exit_early): - header_queue = asyncio.Queue() self.session = NotificationSession(self.host, self.port, ssl=sslc, proxy=self.proxy) async with self.session as session: try: @@ -302,11 +349,11 @@ class Interface(PrintError): raise GracefulDisconnect(e) # probably 'unsupported protocol version' if exit_early: return - self.print_error(ver, self.host) - await session.subscribe('blockchain.headers.subscribe', [], header_queue) + self.print_error("connection established. version: {}".format(ver)) + async with self.group as group: await group.spawn(self.ping()) - await group.spawn(self.run_fetch_blocks(header_queue)) + await group.spawn(self.run_fetch_blocks()) await group.spawn(self.monitor_connection()) # NOTE: group.__aexit__ will be called here; this is needed to notice exceptions in the group! @@ -322,199 +369,221 @@ class Interface(PrintError): await self.session.send_request('server.ping') def close(self): - self.fut.cancel() - asyncio.get_event_loop().create_task(self.group.cancel_remaining()) + async def job(): + self.fut.cancel() + await self.group.cancel_remaining() + asyncio.run_coroutine_threadsafe(job(), self.network.asyncio_loop) - async def run_fetch_blocks(self, header_queue): + async def run_fetch_blocks(self): + header_queue = asyncio.Queue() + await self.session.subscribe('blockchain.headers.subscribe', [], header_queue) while True: - self.network.notify('updated') item = await header_queue.get() - item = item[0] - height = item['height'] - item = blockchain.deserialize_header(bfh(item['hex']), item['height']) - self.tip_header = item + raw_header = item[0] + height = raw_header['height'] + header = blockchain.deserialize_header(bfh(raw_header['hex']), height) + self.tip_header = header self.tip = height if self.tip < constants.net.max_checkpoint(): raise GracefulDisconnect('server tip below max checkpoint') - if not self.ready.done(): - self.mark_ready() - async with self.network.bhi_lock: - if self.blockchain.height() < item['block_height']-1: - _, height = await self.sync_until(height, None) - if self.blockchain.height() >= height and self.blockchain.check_header(item): - # another interface amended the blockchain - self.print_error("skipping header", height) - continue - if self.tip < height: - height = self.tip - _, height = await self.step(height, item) + self.mark_ready() + await self._process_header_at_tip() + self.network.trigger_callback('network_updated') + self.network.switch_lagging_interface() + + async def _process_header_at_tip(self): + height, header = self.tip, self.tip_header + async with self.network.bhi_lock: + if self.blockchain.height() >= height and self.blockchain.check_header(header): + # another interface amended the blockchain + self.print_error("skipping header", height) + return + _, height = await self.step(height, header) + # in the simple case, height == self.tip+1 + if height <= self.tip: + await self.sync_until(height) + self.network.trigger_callback('blockchain_updated') async def sync_until(self, height, next_height=None): if next_height is None: next_height = self.tip last = None - while last is None or height < next_height: + while last is None or height <= next_height: + prev_last, prev_height = last, height if next_height > height + 10: - self.print_error("requesting chunk from height {}".format(height)) could_connect, num_headers = await self.request_chunk(height, next_height) if not could_connect: if height <= constants.net.max_checkpoint(): - raise Exception('server chain conflicts with checkpoints or genesis') + raise GracefulDisconnect('server chain conflicts with checkpoints or genesis') last, height = await self.step(height) continue - self.network.notify('updated') + self.network.trigger_callback('network_updated') height = (height // 2016 * 2016) + num_headers - if height > next_height: - assert False, (height, self.tip) + assert height <= next_height+1, (height, self.tip) last = 'catchup' else: last, height = await self.step(height) + assert (prev_last, prev_height) != (last, height), 'had to prevent infinite loop in interface.sync_until' return last, height async def step(self, height, header=None): assert height != 0 + assert height <= self.tip, (height, self.tip) if header is None: header = await self.get_block_header(height, 'catchup') - chain = self.blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - if chain: return 'catchup', height - can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) - bad_header = None + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: + self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain + return 'catchup', height+1 + + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) if not can_connect: self.print_error("can't connect", height) - #backward - bad = height - bad_header = header - height -= 1 + height, header, bad, bad_header = await self._search_headers_backwards(height, header) + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) + assert chain or can_connect + if can_connect: + self.print_error("could connect", height) + height += 1 + if isinstance(can_connect, Blockchain): # not when mocking + self.blockchain = can_connect + self.blockchain.save_header(header) + return 'catchup', height + + good, bad, bad_header = await self._search_headers_binary(height, bad, bad_header, chain) + return await self._resolve_potential_chain_fork_given_forkpoint(good, bad, bad_header) + + async def _search_headers_binary(self, height, bad, bad_header, chain): + assert bad == bad_header['block_height'] + _assert_header_does_not_check_against_any_chain(bad_header) + + self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain + good = height + while True: + assert good < bad, (good, bad) + height = (good + bad) // 2 + self.print_error("binary step. good {}, bad {}, height {}".format(good, bad, height)) + header = await self.get_block_header(height, 'binary') + chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: + self.blockchain = chain if isinstance(chain, Blockchain) else self.blockchain + good = height + else: + bad = height + bad_header = header + if good + 1 == bad: + break + + mock = 'mock' in bad_header and bad_header['mock']['connect'](height) + real = not mock and self.blockchain.can_connect(bad_header, check_height=False) + if not real and not mock: + raise Exception('unexpected bad header during binary: {}'.format(bad_header)) + _assert_header_does_not_check_against_any_chain(bad_header) + + self.print_error("binary search exited. good {}, bad {}".format(good, bad)) + return good, bad, bad_header + + async def _resolve_potential_chain_fork_given_forkpoint(self, good, bad, bad_header): + assert good + 1 == bad + assert bad == bad_header['block_height'] + _assert_header_does_not_check_against_any_chain(bad_header) + # 'good' is the height of a block 'good_header', somewhere in self.blockchain. + # bad_header connects to good_header; bad_header itself is NOT in self.blockchain. + + bh = self.blockchain.height() + assert bh >= good + if bh == good: + height = good + 1 + self.print_error("catching up from {}".format(height)) + return 'no_fork', height + + # this is a new fork we don't yet have + height = bad + 1 + branch = blockchain.blockchains.get(bad) + if branch is not None: + # Conflict!! As our fork handling is not completely general, + # we need to delete another fork to save this one. + # Note: This could be a potential DOS vector against Electrum. + # However, mining blocks that satisfy the difficulty requirements + # is assumed to be expensive; especially as forks below the max + # checkpoint are ignored. + self.print_error("new fork at bad height {}. conflict!!".format(bad)) + assert self.blockchain != branch + ismocking = type(branch) is dict + if ismocking: + self.print_error("TODO replace blockchain") + return 'fork_conflict', height + self.print_error('forkpoint conflicts with existing fork', branch.path()) + self._raise_if_fork_conflicts_with_default_server(branch) + self._disconnect_from_interfaces_on_conflicting_blockchain(branch) + branch.write(b'', 0) + branch.save_header(bad_header) + self.blockchain = branch + return 'fork_conflict', height + else: + # No conflict. Just save the new fork. + self.print_error("new fork at bad height {}. NO conflict.".format(bad)) + forkfun = self.blockchain.fork if 'mock' not in bad_header else bad_header['mock']['fork'] + b = forkfun(bad_header) + with blockchain.blockchains_lock: + assert bad not in blockchain.blockchains, (bad, list(blockchain.blockchains)) + blockchain.blockchains[bad] = b + self.blockchain = b + assert b.forkpoint == bad + return 'fork_noconflict', height + + def _raise_if_fork_conflicts_with_default_server(self, chain_to_delete: Blockchain) -> None: + main_interface = self.network.interface + if not main_interface: return + if main_interface == self: return + chain_of_default_server = main_interface.blockchain + if not chain_of_default_server: return + if chain_to_delete == chain_of_default_server: + raise GracefulDisconnect('refusing to overwrite blockchain of default server') + + def _disconnect_from_interfaces_on_conflicting_blockchain(self, chain: Blockchain) -> None: + ifaces = self.network.disconnect_from_interfaces_on_given_blockchain(chain) + if not ifaces: return + servers = [interface.server for interface in ifaces] + self.print_error("forcing disconnect of other interfaces: {}".format(servers)) + + async def _search_headers_backwards(self, height, header): + async def iterate(): + nonlocal height, header checkp = False if height <= constants.net.max_checkpoint(): height = constants.net.max_checkpoint() checkp = True - header = await self.get_block_header(height, 'backward') chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) - if checkp and not (can_connect or chain): - raise Exception("server chain conflicts with checkpoints. {} {}".format(can_connect, chain)) - while not chain and not can_connect: - bad = height - bad_header = header - delta = self.tip - height - next_height = self.tip - 2 * delta - checkp = False - if next_height <= constants.net.max_checkpoint(): - next_height = constants.net.max_checkpoint() - checkp = True - height = next_height + if chain or can_connect: + return False + if checkp: + raise GracefulDisconnect("server chain conflicts with checkpoints") + return True - header = await self.get_block_header(height, 'backward') - chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) - if checkp and not (can_connect or chain): - raise Exception("server chain conflicts with checkpoints. {} {}".format(can_connect, chain)) - self.print_error("exiting backward mode at", height) - if can_connect: - self.print_error("could connect", height) - chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + bad, bad_header = height, header + _assert_header_does_not_check_against_any_chain(bad_header) + with blockchain.blockchains_lock: chains = list(blockchain.blockchains.values()) + local_max = max([0] + [x.height() for x in chains]) if 'mock' not in header else float('inf') + height = min(local_max + 1, height - 1) + while await iterate(): + bad, bad_header = height, header + delta = self.tip - height + height = self.tip - 2 * delta - if type(can_connect) is bool: - # mock - height += 1 - if height > self.tip: - assert False - return 'catchup', height - self.blockchain = can_connect - height += 1 - self.blockchain.save_header(header) - return 'catchup', height + _assert_header_does_not_check_against_any_chain(bad_header) + self.print_error("exiting backward mode at", height) + return height, header, bad, bad_header - if not chain: - raise Exception("not chain") # line 931 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py - # binary - if type(chain) in [int, bool]: - pass # mock - else: - self.blockchain = chain - good = height - height = (bad + good) // 2 - header = await self.get_block_header(height, 'binary') - while True: - self.print_error("binary step") - chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - if chain: - assert bad != height, (bad, height) - good = height - self.blockchain = self.blockchain if type(chain) in [bool, int] else chain - else: - bad = height - assert good != height - bad_header = header - if bad != good + 1: - height = (bad + good) // 2 - header = await self.get_block_header(height, 'binary') - continue - mock = bad_header and 'mock' in bad_header and bad_header['mock']['connect'](height) - real = not mock and self.blockchain.can_connect(bad_header, check_height=False) - if not real and not mock: - raise Exception('unexpected bad header during binary' + str(bad_header)) # line 948 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py - branch = blockchain.blockchains.get(bad) - if branch is not None: - ismocking = False - if type(branch) is dict: - ismocking = True - # FIXME: it does not seem sufficient to check that the branch - # contains the bad_header. what if self.blockchain doesn't? - # the chains shouldn't be joined then. observe the incorrect - # joining on regtest with a server that has a fork of height - # one. the problem is observed only if forking is not during - # electrum runtime - if not ismocking and branch.check_header(bad_header) \ - or ismocking and branch['check'](bad_header): - self.print_error('joining chain', bad) - height += 1 - return 'join', height - else: - if not ismocking and branch.parent().check_header(header) \ - or ismocking and branch['parent']['check'](header): - self.print_error('reorg', bad, self.tip) - self.blockchain = branch.parent() if not ismocking else branch['parent'] - height = bad - header = await self.get_block_header(height, 'binary') - else: - if ismocking: - height = bad + 1 - self.print_error("TODO replace blockchain") - return 'conflict', height - self.print_error('forkpoint conflicts with existing fork', branch.path()) - branch.write(b'', 0) - branch.save_header(bad_header) - self.blockchain = branch - height = bad + 1 - return 'conflict', height - else: - bh = self.blockchain.height() - if bh > good: - forkfun = self.blockchain.fork - if 'mock' in bad_header: - chain = bad_header['mock']['check'](bad_header) - forkfun = bad_header['mock']['fork'] if 'fork' in bad_header['mock'] else forkfun - else: - chain = self.blockchain.check_header(bad_header) - if not chain: - b = forkfun(bad_header) - assert bad not in blockchain.blockchains, (bad, list(blockchain.blockchains.keys())) - blockchain.blockchains[bad] = b - self.blockchain = b - height = b.forkpoint + 1 - assert b.forkpoint == bad - return 'fork', height - else: - assert bh == good - if bh < self.tip: - self.print_error("catching up from %d"% (bh + 1)) - height = bh + 1 - return 'no_fork', height +def _assert_header_does_not_check_against_any_chain(header: dict) -> None: + chain_bad = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain_bad: + raise Exception('bad_header must not check!') def check_cert(host, cert): diff --git a/electrum/keystore.py b/electrum/keystore.py index c822be13..6769eefc 100644 --- a/electrum/keystore.py +++ b/electrum/keystore.py @@ -253,12 +253,17 @@ class Xpub: @classmethod def parse_xpubkey(self, pubkey): + # type + xpub + derivation assert pubkey[0:2] == 'ff' pk = bfh(pubkey) + # xpub: pk = pk[1:] xkey = bitcoin.EncodeBase58Check(pk[0:78]) + # derivation: dd = pk[78:] s = [] + # FIXME: due to an oversight, levels in the derivation are only + # allocated 2 bytes, instead of 4 (in bip32) while dd: n = int(bitcoin.rev_hex(bh2u(dd[0:2])), 16) dd = dd[2:] diff --git a/electrum/network.py b/electrum/network.py index 06d33935..687b7322 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -32,7 +32,7 @@ import json import sys import ipaddress import asyncio -from typing import NamedTuple, Optional +from typing import NamedTuple, Optional, Sequence import dns import dns.resolver @@ -43,6 +43,7 @@ from .util import PrintError, print_error, aiosafe, bfh from .bitcoin import COIN from . import constants from . import blockchain +from .blockchain import Blockchain from .interface import Interface, serialize_server, deserialize_server from .version import PROTOCOL_VERSION from .simple_config import SimpleConfig @@ -61,13 +62,13 @@ def parse_servers(result): pruning_level = '-' if len(item) > 2: for v in item[2]: - if re.match("[st]\d*", v): + if re.match(r"[st]\d*", v): protocol, port = v[0], v[1:] if port == '': port = constants.net.DEFAULT_PORTS[protocol] out[protocol] = port elif re.match("v(.?)+", v): version = v[1:] - elif re.match("p\d*", v): + elif re.match(r"p\d*", v): pruning_level = v[1:] if pruning_level == '': pruning_level = '0' if out: @@ -177,7 +178,7 @@ class Network(PrintError): config = {} # Do not use mutables as default values! self.config = SimpleConfig(config) if isinstance(config, dict) else config self.num_server = 10 if not self.config.get('oneserver') else 0 - blockchain.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock + blockchain.blockchains = blockchain.read_blockchains(self.config) self.print_error("blockchains", list(blockchain.blockchains.keys())) self.blockchain_index = config.get('blockchain_index', 0) if self.blockchain_index not in blockchain.blockchains.keys(): @@ -198,14 +199,9 @@ class Network(PrintError): self.bhi_lock = asyncio.Lock() self.interface_lock = threading.RLock() # <- re-entrant self.callback_lock = threading.Lock() - self.pending_sends_lock = threading.Lock() self.recent_servers_lock = threading.RLock() # <- re-entrant - self.blockchains_lock = threading.Lock() - self.pending_sends = [] - self.message_id = 0 - self.debug = False - self.irc_servers = {} # returned by interface (list from irc) + self.server_peers = {} # returned by interface (servers that the main interface knows about) self.recent_servers = self.read_recent_servers() # note: needs self.recent_servers_lock self.banner = '' @@ -217,10 +213,6 @@ class Network(PrintError): dir_path = os.path.join(self.config.path, 'certs') util.make_dir(dir_path) - # subscriptions and requests - self.h2addr = {} - # Requests from client we've not seen a response to - self.unanswered_requests = {} # retry times self.server_retry_time = time.time() self.nodes_retry_time = time.time() @@ -231,11 +223,11 @@ class Network(PrintError): self.interfaces = {} # note: needs self.interface_lock self.auto_connect = self.config.get('auto_connect', True) self.connecting = set() - self.requested_chunks = set() - self.socket_queue = queue.Queue() + self.server_queue = None + self.server_queue_group = None + self.asyncio_loop = asyncio.get_event_loop() self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) - self.asyncio_loop = asyncio.get_event_loop() @staticmethod def get_instance(): @@ -268,11 +260,11 @@ class Network(PrintError): with self.callback_lock: callbacks = self.callbacks[event][:] for callback in callbacks: + # FIXME: if callback throws, we will lose the traceback if asyncio.iscoroutinefunction(callback): - # FIXME: if callback throws, we will lose the traceback asyncio.run_coroutine_threadsafe(callback(event, *args), self.asyncio_loop) else: - callback(event, *args) + self.asyncio_loop.call_soon_threadsafe(callback, event, *args) def read_recent_servers(self): if not self.config.path: @@ -317,7 +309,8 @@ class Network(PrintError): self.notify('status') def is_connected(self): - return self.interface is not None and self.interface.ready.done() + interface = self.interface + return interface is not None and interface.ready.done() def is_connecting(self): return self.connection_status == 'connecting' @@ -325,14 +318,29 @@ class Network(PrintError): async def request_server_info(self, interface): await interface.ready session = interface.session - self.banner = await session.send_request('server.banner') - self.notify('banner') - self.donation_address = await session.send_request('server.donation_address') - self.irc_servers = parse_servers(await session.send_request('server.peers.subscribe')) - self.notify('servers') - await self.request_fee_estimates(interface) - relayfee = await session.send_request('blockchain.relayfee') - self.relay_fee = int(relayfee * COIN) if relayfee is not None else None + + async def get_banner(): + self.banner = await session.send_request('server.banner') + self.notify('banner') + async def get_donation_address(): + self.donation_address = await session.send_request('server.donation_address') + async def get_server_peers(): + self.server_peers = parse_servers(await session.send_request('server.peers.subscribe')) + self.notify('servers') + async def get_relay_fee(): + relayfee = await session.send_request('blockchain.relayfee') + if relayfee is None: + self.relay_fee = None + else: + relayfee = int(relayfee * COIN) + self.relay_fee = max(0, relayfee) + + async with TaskGroup() as group: + await group.spawn(get_banner) + await group.spawn(get_donation_address) + await group.spawn(get_server_peers) + await group.spawn(get_relay_fee) + await group.spawn(self.request_fee_estimates(interface)) async def request_fee_estimates(self, interface): session = interface.session @@ -343,7 +351,8 @@ class Network(PrintError): fee_tasks = [] for i in FEE_ETA_TARGETS: fee_tasks.append((i, await group.spawn(session.send_request('blockchain.estimatefee', [i])))) - self.config.mempool_fees = histogram_task.result() + self.config.mempool_fees = histogram = histogram_task.result() + self.print_error('fee_histogram', histogram) self.notify('fee_histogram') for i, task in fee_tasks: fee = int(task.result() * COIN) @@ -360,12 +369,10 @@ class Network(PrintError): value = self.config.fee_estimates elif key == 'fee_histogram': value = self.config.mempool_fees - elif key == 'updated': - value = (self.get_local_height(), self.get_server_height()) elif key == 'servers': value = self.get_servers() - elif key == 'interfaces': - value = self.get_interfaces() + else: + raise Exception('unexpected trigger key {}'.format(key)) return value def notify(self, key): @@ -389,33 +396,36 @@ class Network(PrintError): @with_recent_servers_lock def get_servers(self): + # start with hardcoded servers out = constants.net.DEFAULT_SERVERS - if self.irc_servers: - out.update(filter_version(self.irc_servers.copy())) - else: - for s in self.recent_servers: - try: - host, port, protocol = deserialize_server(s) - except: - continue - if host not in out: - out[host] = {protocol: port} + # add recent servers + for s in self.recent_servers: + try: + host, port, protocol = deserialize_server(s) + except: + continue + if host not in out: + out[host] = {protocol: port} + # add servers received from main interface + if self.server_peers: + out.update(filter_version(self.server_peers.copy())) + # potentially filter out some if self.config.get('noonion'): out = filter_noonion(out) return out @with_interface_lock def start_interface(self, server): - if (not server in self.interfaces and not server in self.connecting): + if server not in self.interfaces and server not in self.connecting: if server == self.default_server: self.print_error("connecting to %s as new interface" % server) self.set_status('connecting') self.connecting.add(server) - self.socket_queue.put(server) + self.server_queue.put(server) def start_random_interface(self): with self.interface_lock: - exclude_set = self.disconnected_servers.union(set(self.interfaces)) + exclude_set = self.disconnected_servers | set(self.interfaces) | self.connecting server = pick_random_server(self.get_servers(), self.protocol, exclude_set) if server: self.start_interface(server) @@ -471,12 +481,24 @@ class Network(PrintError): @with_interface_lock def start_network(self, protocol: str, proxy: Optional[dict]): assert not self.interface and not self.interfaces - assert not self.connecting and self.socket_queue.empty() + assert not self.connecting and not self.server_queue + assert not self.server_queue_group self.print_error('starting network') self.disconnected_servers = set([]) # note: needs self.interface_lock self.protocol = protocol + self._init_server_queue() self.set_proxy(proxy) self.start_interface(self.default_server) + self.trigger_callback('network_updated') + + def _init_server_queue(self): + self.server_queue = queue.Queue() + self.server_queue_group = server_queue_group = TaskGroup() + async def job(): + forever = asyncio.Event() + async with server_queue_group as group: + await group.spawn(forever.wait()) + asyncio.run_coroutine_threadsafe(job(), self.asyncio_loop) @with_interface_lock def stop_network(self): @@ -488,8 +510,14 @@ class Network(PrintError): assert self.interface is None assert not self.interfaces self.connecting.clear() + self._stop_server_queue() + self.trigger_callback('network_updated') + + def _stop_server_queue(self): # Get a new queue - no old pending connections thanks! - self.socket_queue = queue.Queue() + self.server_queue = None + asyncio.run_coroutine_threadsafe(self.server_queue_group.cancel_remaining(), self.asyncio_loop) + self.server_queue_group = None def set_parameters(self, net_params: NetworkParameters): proxy = net_params.proxy @@ -521,7 +549,6 @@ class Network(PrintError): self.switch_to_interface(server_str) else: self.switch_lagging_interface() - self.notify('updated') def switch_to_random_interface(self): '''Switch to a random connected server other than the current one''' @@ -562,23 +589,25 @@ class Network(PrintError): i = self.interfaces[server] if self.interface != i: self.print_error("switching to", server) + blockchain_updated = False if self.interface is not None: + blockchain_updated = i.blockchain != self.interface.blockchain # Stop any current interface in order to terminate subscriptions, # and to cancel tasks in interface.group. # However, for headers sub, give preference to this interface # over unknown ones, i.e. start it again right away. old_server = self.interface.server self.close_interface(self.interface) - if len(self.interfaces) <= self.num_server: + if old_server != server and len(self.interfaces) <= self.num_server: self.start_interface(old_server) self.interface = i - asyncio.get_event_loop().create_task( - i.group.spawn(self.request_server_info(i))) + asyncio.run_coroutine_threadsafe( + i.group.spawn(self.request_server_info(i)), self.asyncio_loop) self.trigger_callback('default_server_changed') self.set_status('connected') - self.notify('updated') - self.notify('interfaces') + self.trigger_callback('network_updated') + if blockchain_updated: self.trigger_callback('blockchain_updated') @with_interface_lock def close_interface(self, interface): @@ -607,17 +636,10 @@ class Network(PrintError): self.set_status('disconnected') if server in self.interfaces: self.close_interface(self.interfaces[server]) - self.notify('interfaces') - with self.blockchains_lock: - for b in blockchain.blockchains.values(): - if b.catch_up == server: - b.catch_up = None + self.trigger_callback('network_updated') @aiosafe async def new_interface(self, server): - # todo: get tip first, then decide which checkpoint to use. - self.add_recent_server(server) - interface = Interface(self, server, self.config.path, self.proxy) timeout = 10 if not self.proxy else 20 try: @@ -625,20 +647,28 @@ class Network(PrintError): except BaseException as e: #import traceback #traceback.print_exc() - self.print_error(interface.server, "couldn't launch because", str(e), str(type(e))) - self.connection_down(interface.server) + self.print_error(server, "couldn't launch because", str(e), str(type(e))) + # note: connection_down will not call interface.close() as + # interface is not yet in self.interfaces. OTOH, calling + # interface.close() here will sometimes raise deep inside the + # asyncio internal select.select... instead, interface will close + # itself when it detects the cancellation of interface.ready; + # however this might take several seconds... + self.connection_down(server) return + else: + with self.interface_lock: + self.interfaces[server] = interface finally: - try: self.connecting.remove(server) - except KeyError: pass - - with self.interface_lock: - self.interfaces[server] = interface + with self.interface_lock: + try: self.connecting.remove(server) + except KeyError: pass if server == self.default_server: self.switch_to_interface(server) - self.notify('interfaces') + self.add_recent_server(server) + self.trigger_callback('network_updated') def init_headers_file(self): b = blockchain.blockchains[0] @@ -646,9 +676,10 @@ class Network(PrintError): length = 80 * len(constants.net.CHECKPOINTS) * 2016 if not os.path.exists(filename) or os.path.getsize(filename) < length: with open(filename, 'wb') as f: - if length>0: + if length > 0: f.seek(length-1) f.write(b'\x00') + util.ensure_sparse_file(filename) with b.lock: b.update_size() @@ -672,25 +703,8 @@ class Network(PrintError): return False, "error: " + out return True, out - async def request_chunk(self, height, tip, session=None, can_return_early=False): - if session is None: session = self.interface.session - index = height // 2016 - if can_return_early and index in self.requested_chunks: - return - size = 2016 - if tip is not None: - size = min(size, tip - index * 2016) - size = max(size, 0) - try: - self.requested_chunks.add(index) - res = await session.send_request('blockchain.block.headers', [index * 2016, size]) - finally: - try: self.requested_chunks.remove(index) - except KeyError: pass - conn = self.blockchain().connect_chunk(index, res['hex']) - if not conn: - return conn, 0 - return conn, res['count'] + async def request_chunk(self, height, tip=None, *, can_return_early=False): + return await self.interface.request_chunk(height, tip=tip, can_return_early=can_return_early) @with_interface_lock def blockchain(self): @@ -700,15 +714,22 @@ class Network(PrintError): @with_interface_lock def get_blockchains(self): - out = {} - with self.blockchains_lock: - blockchain_items = list(blockchain.blockchains.items()) - for k, b in blockchain_items: - r = list(filter(lambda i: i.blockchain==b, list(self.interfaces.values()))) + out = {} # blockchain_id -> list(interfaces) + with blockchain.blockchains_lock: blockchain_items = list(blockchain.blockchains.items()) + for chain_id, bc in blockchain_items: + r = list(filter(lambda i: i.blockchain==bc, list(self.interfaces.values()))) if r: - out[k] = r + out[chain_id] = r return out + @with_interface_lock + def disconnect_from_interfaces_on_given_blockchain(self, chain: Blockchain) -> Sequence[Interface]: + chain_id = chain.forkpoint + ifaces = self.get_blockchains().get(chain_id) or [] + for interface in ifaces: + self.connection_down(interface.server) + return ifaces + def follow_chain(self, index): bc = blockchain.blockchains.get(index) if bc: @@ -757,9 +778,9 @@ class Network(PrintError): async def maintain_sessions(self): while True: - while self.socket_queue.qsize() > 0: - server = self.socket_queue.get() - asyncio.get_event_loop().create_task(self.new_interface(server)) + while self.server_queue.qsize() > 0: + server = self.server_queue.get() + await self.server_queue_group.spawn(self.new_interface(server)) remove = [] for k, i in self.interfaces.items(): if i.fut.done() and not i.exception: @@ -799,9 +820,6 @@ class Network(PrintError): self.switch_to_interface(self.default_server) else: if self.config.is_fee_estimates_update_required(): - await self.interface.group.spawn(self.attempt_fee_estimate_update()) + await self.interface.group.spawn(self.request_fee_estimates, self.interface) await asyncio.sleep(0.1) - - async def attempt_fee_estimate_update(self): - await self.request_fee_estimates(self.interface) diff --git a/electrum/paymentrequest.py b/electrum/paymentrequest.py index 78df7566..0cb15e47 100644 --- a/electrum/paymentrequest.py +++ b/electrum/paymentrequest.py @@ -370,8 +370,7 @@ def verify_cert_chain(chain): hashBytes = bytearray(hashlib.sha512(data).digest()) verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes) else: - raise Exception("Algorithm not supported") - util.print_error(self.error, algo.getComponentByName('algorithm')) + raise Exception("Algorithm not supported: {}".format(algo)) if not verify: raise Exception("Certificate not Signed by Provided CA Certificate Chain") diff --git a/electrum/plugins/coldcard/qt.py b/electrum/plugins/coldcard/qt.py index 583a845b..90df1053 100644 --- a/electrum/plugins/coldcard/qt.py +++ b/electrum/plugins/coldcard/qt.py @@ -7,6 +7,7 @@ from electrum.gui.qt.util import * from .coldcard import ColdcardPlugin from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available class Plugin(ColdcardPlugin, QtPluginBase): @@ -17,6 +18,7 @@ class Plugin(ColdcardPlugin, QtPluginBase): return Coldcard_Handler(window) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if type(wallet) is not Standard_Wallet: return @@ -27,6 +29,7 @@ class Plugin(ColdcardPlugin, QtPluginBase): menu.addAction(_("Show on Coldcard"), show_address) @hook + @only_hook_if_libraries_available def transaction_dialog(self, dia): # see gui/qt/transaction_dialog.py diff --git a/electrum/plugins/digitalbitbox/qt.py b/electrum/plugins/digitalbitbox/qt.py index 59475696..451bff0c 100644 --- a/electrum/plugins/digitalbitbox/qt.py +++ b/electrum/plugins/digitalbitbox/qt.py @@ -1,12 +1,13 @@ from functools import partial -from ..hw_wallet.qt import QtHandlerBase, QtPluginBase -from .digitalbitbox import DigitalBitboxPlugin - from electrum.i18n import _ from electrum.plugin import hook from electrum.wallet import Standard_Wallet +from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available +from .digitalbitbox import DigitalBitboxPlugin + class Plugin(DigitalBitboxPlugin, QtPluginBase): icon_unpaired = ":icons/digitalbitbox_unpaired.png" @@ -16,6 +17,7 @@ class Plugin(DigitalBitboxPlugin, QtPluginBase): return DigitalBitbox_Handler(window) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if type(wallet) is not Standard_Wallet: return diff --git a/electrum/plugins/hw_wallet/plugin.py b/electrum/plugins/hw_wallet/plugin.py index 24861494..ff77ba1c 100644 --- a/electrum/plugins/hw_wallet/plugin.py +++ b/electrum/plugins/hw_wallet/plugin.py @@ -135,3 +135,10 @@ def trezor_validate_op_return_output_and_get_data(output: TxOutput) -> bytes: if output.value != 0: raise Exception(_("Amount for OP_RETURN output must be zero.")) return script[2:] + + +def only_hook_if_libraries_available(func): + def wrapper(self, *args, **kwargs): + if not self.libraries_available: return None + return func(self, *args, **kwargs) + return wrapper diff --git a/electrum/plugins/keepkey/qt.py b/electrum/plugins/keepkey/qt.py index cd61d180..f879b2b9 100644 --- a/electrum/plugins/keepkey/qt.py +++ b/electrum/plugins/keepkey/qt.py @@ -12,6 +12,7 @@ from electrum.util import PrintError, UserCancelled, bh2u from electrum.wallet import Wallet, Standard_Wallet from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available from .keepkey import KeepKeyPlugin, TIM_NEW, TIM_RECOVER, TIM_MNEMONIC @@ -195,6 +196,7 @@ class QtPlugin(QtPluginBase): return QtHandler(window, self.pin_matrix_widget_class(), self.device) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if type(wallet) is not Standard_Wallet: return diff --git a/electrum/plugins/ledger/qt.py b/electrum/plugins/ledger/qt.py index d49bcb8d..cfd33991 100644 --- a/electrum/plugins/ledger/qt.py +++ b/electrum/plugins/ledger/qt.py @@ -7,6 +7,7 @@ from electrum.gui.qt.util import * from .ledger import LedgerPlugin from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available class Plugin(LedgerPlugin, QtPluginBase): @@ -17,6 +18,7 @@ class Plugin(LedgerPlugin, QtPluginBase): return Ledger_Handler(window) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if type(wallet) is not Standard_Wallet: return diff --git a/electrum/plugins/safe_t/qt.py b/electrum/plugins/safe_t/qt.py index a9114f2d..408df9df 100644 --- a/electrum/plugins/safe_t/qt.py +++ b/electrum/plugins/safe_t/qt.py @@ -12,6 +12,7 @@ from electrum.util import PrintError, UserCancelled, bh2u from electrum.wallet import Wallet, Standard_Wallet from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available from .safe_t import SafeTPlugin, TIM_NEW, TIM_RECOVER, TIM_MNEMONIC @@ -71,6 +72,7 @@ class QtPlugin(QtPluginBase): return QtHandler(window, self.pin_matrix_widget_class(), self.device) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if len(addrs) != 1: return diff --git a/electrum/plugins/safe_t/safe_t.py b/electrum/plugins/safe_t/safe_t.py index f6ef634c..320d2ad4 100644 --- a/electrum/plugins/safe_t/safe_t.py +++ b/electrum/plugins/safe_t/safe_t.py @@ -401,7 +401,7 @@ class SafeTPlugin(HW_PluginBase): def tx_outputs(self, derivation, tx): def create_output_by_derivation(): - script_type = self.get_trezor_output_script_type(info.script_type) + script_type = self.get_safet_output_script_type(info.script_type) if len(xpubs) == 1: address_n = self.client_class.expand_path(derivation + "/%d/%d" % index) txoutputtype = self.types.TxOutputType( diff --git a/electrum/plugins/safe_t/transport.py b/electrum/plugins/safe_t/transport.py index 3753434d..e34916ad 100644 --- a/electrum/plugins/safe_t/transport.py +++ b/electrum/plugins/safe_t/transport.py @@ -8,6 +8,8 @@ class SafeTTransport(PrintError): """Reimplemented safetlib.transport.all_transports so that we can enable/disable specific transports. """ + # NOTE: the bridge and UDP transports are disabled as they are using + # the same ports as trezor try: # only to detect safetlib version from safetlib.transport import all_transports diff --git a/electrum/plugins/trezor/qt.py b/electrum/plugins/trezor/qt.py index 04632817..95f68130 100644 --- a/electrum/plugins/trezor/qt.py +++ b/electrum/plugins/trezor/qt.py @@ -12,6 +12,7 @@ from electrum.util import PrintError, UserCancelled, bh2u from electrum.wallet import Wallet, Standard_Wallet from ..hw_wallet.qt import QtHandlerBase, QtPluginBase +from ..hw_wallet.plugin import only_hook_if_libraries_available from .trezor import (TrezorPlugin, TIM_NEW, TIM_RECOVER, TIM_MNEMONIC, RECOVERY_TYPE_SCRAMBLED_WORDS, RECOVERY_TYPE_MATRIX) @@ -166,6 +167,7 @@ class QtPlugin(QtPluginBase): return QtHandler(window, self.pin_matrix_widget_class(), self.device) @hook + @only_hook_if_libraries_available def receive_menu(self, menu, addrs, wallet): if len(addrs) != 1: return diff --git a/electrum/plugins/trezor/transport.py b/electrum/plugins/trezor/transport.py index 5ce686c6..78c1dd20 100644 --- a/electrum/plugins/trezor/transport.py +++ b/electrum/plugins/trezor/transport.py @@ -14,11 +14,11 @@ class TrezorTransport(PrintError): except ImportError: # old trezorlib. compat for trezorlib < 0.9.2 transports = [] - #try: - # from trezorlib.transport_bridge import BridgeTransport - # transports.append(BridgeTransport) - #except BaseException: - # pass + try: + from trezorlib.transport_bridge import BridgeTransport + transports.append(BridgeTransport) + except BaseException: + pass try: from trezorlib.transport_hid import HidTransport transports.append(HidTransport) @@ -37,11 +37,11 @@ class TrezorTransport(PrintError): else: # new trezorlib. transports = [] - #try: - # from trezorlib.transport.bridge import BridgeTransport - # transports.append(BridgeTransport) - #except BaseException: - # pass + try: + from trezorlib.transport.bridge import BridgeTransport + transports.append(BridgeTransport) + except BaseException: + pass try: from trezorlib.transport.hid import HidTransport transports.append(HidTransport) diff --git a/electrum/rsakey.py b/electrum/rsakey.py index 97efd65d..18a0d733 100644 --- a/electrum/rsakey.py +++ b/electrum/rsakey.py @@ -125,7 +125,6 @@ def numBits(n): '8':4, '9':4, 'a':4, 'b':4, 'c':4, 'd':4, 'e':4, 'f':4, }[s[0]] - return int(math.floor(math.log(n, 2))+1) def numBytes(n): if n==0: diff --git a/electrum/scripts/block_headers.py b/electrum/scripts/block_headers.py index d3ecb7fd..649a0493 100755 --- a/electrum/scripts/block_headers.py +++ b/electrum/scripts/block_headers.py @@ -3,6 +3,8 @@ # A simple script that connects to a server and displays block headers import time +import sys + from .. import SimpleConfig, Network from electrum.util import print_msg, json_encode diff --git a/electrum/storage.py b/electrum/storage.py index 156df968..ac46b059 100644 --- a/electrum/storage.py +++ b/electrum/storage.py @@ -29,7 +29,7 @@ import json import copy import re import stat -import hmac, hashlib +import hashlib import base64 import zlib from collections import defaultdict @@ -54,7 +54,7 @@ def multisig_type(wallet_type): otherwise return None.''' if not wallet_type: return None - match = re.match('(\d+)of(\d+)', wallet_type) + match = re.match(r'(\d+)of(\d+)', wallet_type) if match: match = [int(x) for x in match.group(1, 2)] return match @@ -73,7 +73,7 @@ class JsonDB(PrintError): def __init__(self, path): self.db_lock = threading.RLock() self.data = {} - self.path = path + self.path = os.path.normcase(os.path.abspath(path)) self.modified = False def get(self, key, default=None): @@ -142,8 +142,8 @@ class JsonDB(PrintError): class WalletStorage(JsonDB): def __init__(self, path, manual_upgrades=False): - self.print_error("wallet path", path) JsonDB.__init__(self, path) + self.print_error("wallet path", path) self.manual_upgrades = manual_upgrades self.pubkey = None if self.file_exists(): diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index a74961a3..d74d80d8 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -25,7 +25,7 @@ import asyncio import hashlib -from aiorpcx import TaskGroup +from aiorpcx import TaskGroup, run_in_thread from .transaction import Transaction from .util import bh2u, PrintError @@ -51,30 +51,35 @@ class Synchronizer(PrintError): ''' def __init__(self, wallet): self.wallet = wallet + self.asyncio_loop = wallet.network.asyncio_loop self.requested_tx = {} self.requested_histories = {} self.requested_addrs = set() self.scripthash_to_address = {} + self._processed_some_notifications = False # so that we don't miss them # Queues self.add_queue = asyncio.Queue() self.status_queue = asyncio.Queue() + def diagnostic_name(self): + return '{}:{}'.format(self.__class__.__name__, self.wallet.diagnostic_name()) + def is_up_to_date(self): return (not self.requested_addrs and not self.requested_histories and not self.requested_tx) def add(self, addr): - self.requested_addrs.add(addr) - self.add_queue.put_nowait(addr) + asyncio.run_coroutine_threadsafe(self._add(addr), self.asyncio_loop) - async def on_address_status(self, addr, status): + async def _add(self, addr): + self.requested_addrs.add(addr) + await self.add_queue.put(addr) + + async def _on_address_status(self, addr, status): history = self.wallet.history.get(addr, []) if history_status(history) == status: return - # note that at this point 'result' can be None; - # if we had a history for addr but now the server is telling us - # there is no history if addr in self.requested_histories: return # request address history @@ -97,14 +102,12 @@ class Synchronizer(PrintError): # Store received history self.wallet.receive_history_callback(addr, hist, tx_fees) # Request transactions we don't have - await self.request_missing_txs(hist) + await self._request_missing_txs(hist) # Remove request; this allows up_to_date to be True self.requested_histories.pop(addr) - if self.wallet.network: self.wallet.network.notify('updated') - - async def request_missing_txs(self, hist): + async def _request_missing_txs(self, hist): # "hist" is a list of [tx_hash, tx_height] lists transaction_hashes = [] for tx_hash, tx_height in hist: @@ -115,11 +118,12 @@ class Synchronizer(PrintError): transaction_hashes.append(tx_hash) self.requested_tx[tx_hash] = tx_height + if not transaction_hashes: return async with TaskGroup() as group: for tx_hash in transaction_hashes: - await group.spawn(self.get_transaction, tx_hash) + await group.spawn(self._get_transaction, tx_hash) - async def get_transaction(self, tx_hash): + async def _get_transaction(self, tx_hash): result = await self.session.send_request('blockchain.transaction.get', [tx_hash]) tx = Transaction(result) try: @@ -136,24 +140,25 @@ class Synchronizer(PrintError): self.print_error("received tx %s height: %d bytes: %d" % (tx_hash, tx_height, len(tx.raw))) # callbacks - self.wallet.network.trigger_callback('new_transaction', tx) - - async def subscribe_to_address(self, addr): - h = address_to_scripthash(addr) - self.scripthash_to_address[h] = addr - await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) - self.requested_addrs.remove(addr) + self.wallet.network.trigger_callback('new_transaction', self.wallet, tx) async def send_subscriptions(self, group: TaskGroup): + async def subscribe_to_address(addr): + h = address_to_scripthash(addr) + self.scripthash_to_address[h] = addr + await self.session.subscribe('blockchain.scripthash.subscribe', [h], self.status_queue) + self.requested_addrs.remove(addr) + while True: addr = await self.add_queue.get() - await group.spawn(self.subscribe_to_address, addr) + await group.spawn(subscribe_to_address, addr) async def handle_status(self, group: TaskGroup): while True: h, status = await self.status_queue.get() addr = self.scripthash_to_address[h] - await group.spawn(self.on_address_status, addr, status) + await group.spawn(self._on_address_status, addr, status) + self._processed_some_notifications = True @property def session(self): @@ -162,21 +167,23 @@ class Synchronizer(PrintError): return s async def main(self): + self.wallet.set_up_to_date(False) # request missing txns, if any - async with TaskGroup() as group: - for history in self.wallet.history.values(): - # Old electrum servers returned ['*'] when all history for the address - # was pruned. This no longer happens but may remain in old wallets. - if history == ['*']: continue - await group.spawn(self.request_missing_txs, history) + for history in self.wallet.history.values(): + # Old electrum servers returned ['*'] when all history for the address + # was pruned. This no longer happens but may remain in old wallets. + if history == ['*']: continue + await self._request_missing_txs(history) # add addresses to bootstrap for addr in self.wallet.get_addresses(): - self.add(addr) + await self._add(addr) # main loop while True: await asyncio.sleep(0.1) - self.wallet.synchronize() + await run_in_thread(self.wallet.synchronize) up_to_date = self.is_up_to_date() - if up_to_date != self.wallet.is_up_to_date(): + if (up_to_date != self.wallet.is_up_to_date() + or up_to_date and self._processed_some_notifications): + self._processed_some_notifications = False self.wallet.set_up_to_date(up_to_date) - self.wallet.network.trigger_callback('updated') + self.wallet.network.trigger_callback('wallet_updated', self.wallet) diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py index b28dff77..dc5ba440 100644 --- a/electrum/tests/test_network.py +++ b/electrum/tests/test_network.py @@ -38,7 +38,7 @@ class TestNetwork(unittest.TestCase): self.config = SimpleConfig({'electrum_path': tempfile.mkdtemp(prefix="test_network")}) self.interface = MockInterface(self.config) - def test_new_fork(self): + def test_fork_noconflict(self): blockchain.blockchains = {} self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) def mock_connect(height): @@ -49,10 +49,24 @@ class TestNetwork(unittest.TestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) + self.assertEqual(('fork_noconflict', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) self.assertEqual(self.interface.q.qsize(), 0) - def test_new_can_connect_during_backward(self): + def test_fork_conflict(self): + blockchain.blockchains = {7: {'check': lambda bad_header: False}} + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(height): + return height == 6 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1,'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1,'check':lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('fork_conflict', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(self.interface.q.qsize(), 0) + + def test_can_connect_during_backward(self): blockchain.blockchains = {} self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) def mock_connect(height): @@ -62,13 +76,13 @@ class TestNetwork(unittest.TestCase): self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=5))) + self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4))) self.assertEqual(self.interface.q.qsize(), 0) def mock_fork(self, bad_header): return blockchain.Blockchain(self.config, bad_header['block_height'], None) - def test_new_chain_false_during_binary(self): + def test_chain_false_during_binary(self): blockchain.blockchains = {} self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) mock_connect = lambda height: height == 3 @@ -79,40 +93,9 @@ class TestNetwork(unittest.TestCase): self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) ifa = self.interface - self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6))) self.assertEqual(self.interface.q.qsize(), 0) - def test_new_join(self): - blockchain.blockchains = {7: {'check': lambda bad_header: True}} - self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) - self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) - ifa = self.interface - self.assertEqual(('join', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) - self.assertEqual(self.interface.q.qsize(), 0) - - def test_new_reorg(self): - times = 0 - def check(header): - nonlocal times - self.assertEqual(header['block_height'], 7) - times += 1 - return times != 1 - blockchain.blockchains = {7: {'check': check, 'parent': {'check': lambda x: True}}} - self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) - self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}}) - self.interface.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}}) - ifa = self.interface - self.assertEqual(('join', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) - self.assertEqual(self.interface.q.qsize(), 0) - self.assertEqual(times, 2) if __name__=="__main__": constants.set_regtest() diff --git a/electrum/util.py b/electrum/util.py index 027c2d33..247e3827 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -23,7 +23,7 @@ import binascii import os, sys, re, json from collections import defaultdict -from typing import NamedTuple +from typing import NamedTuple, Union from datetime import datetime import decimal from decimal import Decimal @@ -36,7 +36,9 @@ import inspect from locale import localeconv import asyncio import urllib.request, urllib.parse, urllib.error -import queue +import builtins +import json +import time import aiohttp from aiohttp_socks import SocksConnector, SocksVer @@ -280,9 +282,12 @@ class DaemonThread(threading.Thread, PrintError): verbosity = '*' -def set_verbosity(b): +def set_verbosity(filters: Union[str, bool]): global verbosity - verbosity = b + if type(filters) is bool: # backwards compat + verbosity = '*' if filters else '' + return + verbosity = filters def print_error(*args): @@ -378,6 +383,16 @@ def android_check_data_dir(): return data_dir +def ensure_sparse_file(filename): + # On modern Linux, no need to do anything. + # On Windows, need to explicitly mark file. + if os.name == "nt": + try: + os.system('fsutil sparse setflag "{}" 1'.format(filename)) + except Exception as e: + print_error('error marking file {} as sparse: {}'.format(filename, e)) + + def get_headers_dir(config): return android_headers_dir() if 'ANDROID_DATA' in os.environ else config.path @@ -717,7 +732,6 @@ def raw_input(prompt=None): sys.stdout.write(prompt) return builtin_raw_input() -import builtins builtin_raw_input = builtins.input builtins.input = raw_input @@ -734,114 +748,6 @@ def parse_json(message): return j, message[n+1:] -class timeout(Exception): - pass - -import socket -import json -import ssl -import time - - -class SocketPipe: - def __init__(self, socket): - self.socket = socket - self.message = b'' - self.set_timeout(0.1) - self.recv_time = time.time() - - def set_timeout(self, t): - self.socket.settimeout(t) - - def idle_time(self): - return time.time() - self.recv_time - - def get(self): - while True: - response, self.message = parse_json(self.message) - if response is not None: - return response - try: - data = self.socket.recv(1024) - except socket.timeout: - raise timeout - except ssl.SSLError: - raise timeout - except socket.error as err: - if err.errno == 60: - raise timeout - elif err.errno in [11, 35, 10035]: - print_error("socket errno %d (resource temporarily unavailable)"% err.errno) - time.sleep(0.2) - raise timeout - else: - print_error("pipe: socket error", err) - data = b'' - except: - traceback.print_exc(file=sys.stderr) - data = b'' - - if not data: # Connection closed remotely - return None - self.message += data - self.recv_time = time.time() - - def send(self, request): - out = json.dumps(request) + '\n' - out = out.encode('utf8') - self._send(out) - - def send_all(self, requests): - out = b''.join(map(lambda x: (json.dumps(x) + '\n').encode('utf8'), requests)) - self._send(out) - - def _send(self, out): - while out: - try: - sent = self.socket.send(out) - out = out[sent:] - except ssl.SSLError as e: - print_error("SSLError:", e) - time.sleep(0.1) - continue - - -class QueuePipe: - - def __init__(self, send_queue=None, get_queue=None): - self.send_queue = send_queue if send_queue else queue.Queue() - self.get_queue = get_queue if get_queue else queue.Queue() - self.set_timeout(0.1) - - def get(self): - try: - return self.get_queue.get(timeout=self.timeout) - except queue.Empty: - raise timeout - - def get_all(self): - responses = [] - while True: - try: - r = self.get_queue.get_nowait() - responses.append(r) - except queue.Empty: - break - return responses - - def set_timeout(self, t): - self.timeout = t - - def send(self, request): - self.send_queue.put(request) - - def send_all(self, requests): - for request in requests: - self.send(request) - - - - def setup_thread_excepthook(): """ Workaround for `sys.excepthook` thread bug from: @@ -920,9 +826,12 @@ def aiosafe(f): except asyncio.CancelledError as e: self.exception = e except BaseException as e: - self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e)) - traceback.print_exc(file=sys.stderr) self.exception = e + self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e)) + try: + traceback.print_exc(file=sys.stderr) + except BaseException as e2: + self.print_error("aiosafe:traceback.print_exc raised: {}... original exc: {}".format(e2, e)) return f2 TxMinedStatus = NamedTuple("TxMinedStatus", [("height", int), @@ -950,7 +859,7 @@ def make_aiohttp_session(proxy): return aiohttp.ClientSession(headers={'User-Agent' : 'Electrum'}, timeout=aiohttp.ClientTimeout(total=10)) -class CustomTaskGroup(TaskGroup): +class SilentTaskGroup(TaskGroup): def spawn(self, *args, **kwargs): # don't complain if group is already closed. diff --git a/electrum/verifier.py b/electrum/verifier.py index 26b8857e..2127bc2a 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -24,13 +24,15 @@ import asyncio from typing import Sequence, Optional +import aiorpcx from aiorpcx import TaskGroup -from .util import ThreadJob, bh2u, VerifiedTxInfo +from .util import PrintError, bh2u, VerifiedTxInfo from .bitcoin import Hash, hash_decode, hash_encode from .transaction import Transaction from .blockchain import hash_header from .interface import GracefulDisconnect +from . import constants class MerkleVerificationFailure(Exception): pass @@ -39,7 +41,7 @@ class MerkleRootMismatch(MerkleVerificationFailure): pass class InnerNodeOfSpvProofIsValidTx(MerkleVerificationFailure): pass -class SPV(ThreadJob): +class SPV(PrintError): """ Simple Payment Verification """ def __init__(self, network, wallet): @@ -49,8 +51,12 @@ class SPV(ThreadJob): self.merkle_roots = {} # txid -> merkle root (once it has been verified) self.requested_merkle = set() # txid set of pending requests + def diagnostic_name(self): + return '{}:{}'.format(self.__class__.__name__, self.wallet.diagnostic_name()) + async def main(self, group: TaskGroup): while True: + await self._maybe_undo_verifications() await self._request_proofs(group) await asyncio.sleep(0.1) @@ -64,33 +70,43 @@ class SPV(ThreadJob): unverified = self.wallet.get_unverified_txs() for tx_hash, tx_height in unverified.items(): - # do not request merkle branch before headers are available + # do not request merkle branch if we already requested it + if tx_hash in self.requested_merkle or tx_hash in self.merkle_roots: + continue + # or before headers are available if tx_height <= 0 or tx_height > local_height: continue - + # if it's in the checkpoint region, we still might not have the header header = blockchain.read_header(tx_height) if header is None: - index = tx_height // 2016 - if index < len(blockchain.checkpoints): + if tx_height < constants.net.max_checkpoint(): await group.spawn(self.network.request_chunk(tx_height, None, can_return_early=True)) - elif (tx_hash not in self.requested_merkle - and tx_hash not in self.merkle_roots): - self.print_error('requested merkle', tx_hash) - self.requested_merkle.add(tx_hash) - await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) - - if self.network.blockchain() != self.blockchain: - self.blockchain = self.network.blockchain() - self._undo_verifications() + continue + # request now + self.print_error('requested merkle', tx_hash) + self.requested_merkle.add(tx_hash) + await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) async def _request_and_verify_single_proof(self, tx_hash, tx_height): - merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height) + try: + merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height) + except aiorpcx.jsonrpc.RPCError as e: + self.print_error('tx {} not at height {}'.format(tx_hash, tx_height)) + self.wallet.remove_unverified_tx(tx_hash, tx_height) + try: self.requested_merkle.remove(tx_hash) + except KeyError: pass + return # Verify the hash of the server-provided merkle branch to a # transaction matches the merkle root of its block + if tx_height != merkle.get('block_height'): + self.print_error('requested tx_height {} differs from received tx_height {} for txid {}' + .format(tx_height, merkle.get('block_height'), tx_hash)) tx_height = merkle.get('block_height') pos = merkle.get('pos') merkle_branch = merkle.get('merkle') - header = self.network.blockchain().read_header(tx_height) + # we need to wait if header sync/reorg is still ongoing, hence lock: + async with self.network.bhi_lock: + header = self.network.blockchain().read_header(tx_height) try: verify_tx_is_in_block(tx_hash, merkle_branch, pos, header, tx_height) except MerkleVerificationFailure as e: @@ -98,8 +114,7 @@ class SPV(ThreadJob): raise GracefulDisconnect(e) # we passed all the tests self.merkle_roots[tx_hash] = header.get('merkle_root') - try: - self.requested_merkle.remove(tx_hash) + try: self.requested_merkle.remove(tx_hash) except KeyError: pass self.print_error("verified %s" % tx_hash) header_hash = hash_header(header) @@ -138,12 +153,18 @@ class SPV(ThreadJob): else: raise InnerNodeOfSpvProofIsValidTx() - def _undo_verifications(self): - height = self.blockchain.get_forkpoint() - tx_hashes = self.wallet.undo_verifications(self.blockchain, height) - for tx_hash in tx_hashes: - self.print_error("redoing", tx_hash) - self.remove_spv_proof_for_tx(tx_hash) + async def _maybe_undo_verifications(self): + def undo_verifications(): + height = self.blockchain.get_forkpoint() + self.print_error("undoing verifications back to height {}".format(height)) + tx_hashes = self.wallet.undo_verifications(self.blockchain, height) + for tx_hash in tx_hashes: + self.print_error("redoing", tx_hash) + self.remove_spv_proof_for_tx(tx_hash) + + if self.network.blockchain() != self.blockchain: + self.blockchain = self.network.blockchain() + undo_verifications() def remove_spv_proof_for_tx(self, tx_hash): self.merkle_roots.pop(tx_hash, None) diff --git a/electrum/wallet.py b/electrum/wallet.py index b299ee74..b9855e66 100644 --- a/electrum/wallet.py +++ b/electrum/wallet.py @@ -43,19 +43,17 @@ from .i18n import _ from .util import (NotEnoughFunds, PrintError, UserCancelled, profiler, format_satoshis, format_fee_satoshis, NoDynamicFeeEstimates, TimeoutException, WalletFileException, BitcoinException, - InvalidPassword, format_time) - + InvalidPassword, format_time, timestamp_to_datetime, Satoshis, + Fiat) from .bitcoin import * from .version import * from .keystore import load_keystore, Hardware_KeyStore from .storage import multisig_type, STO_EV_PLAINTEXT, STO_EV_USER_PW, STO_EV_XPUB_PW - from . import transaction, bitcoin, coinchooser, paymentrequest, contacts from .transaction import Transaction, TxOutput, TxOutputHwInfo from .plugin import run_hook from .address_synchronizer import (AddressSynchronizer, TX_HEIGHT_LOCAL, TX_HEIGHT_UNCONF_PARENT, TX_HEIGHT_UNCONFIRMED) - from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED from .paymentrequest import InvoiceStore from .contacts import Contacts @@ -388,7 +386,6 @@ class Abstract_Wallet(AddressSynchronizer): @profiler def get_full_history(self, domain=None, from_timestamp=None, to_timestamp=None, fx=None, show_addresses=False): - from .util import timestamp_to_datetime, Satoshis, Fiat out = [] income = 0 expenditures = 0 @@ -1453,10 +1450,10 @@ class Deterministic_Wallet(Abstract_Wallet): if len(addresses) < limit: self.create_new_address(for_change) continue - if list(map(lambda a: self.address_is_old(a), addresses[-limit:] )) == limit*[False]: - break - else: + if any(map(self.address_is_old, addresses[-limit:])): self.create_new_address(for_change) + else: + break def synchronize(self): with self.lock: diff --git a/run_electrum b/run_electrum index 0a43adbc..a254fabb 100755 --- a/run_electrum +++ b/run_electrum @@ -415,7 +415,7 @@ if __name__ == '__main__': fd, server = daemon.get_fd_or_server(config) if fd is not None: plugins = init_plugins(config, config.get('gui', 'qt')) - d = daemon.Daemon(config, fd, True) + d = daemon.Daemon(config, fd) d.start() d.init_gui(config, plugins) sys.exit(0) @@ -436,7 +436,7 @@ if __name__ == '__main__': print_stderr("starting daemon (PID %d)" % pid) sys.exit(0) init_plugins(config, 'cmdline') - d = daemon.Daemon(config, fd, False) + d = daemon.Daemon(config, fd) d.start() if config.get('websocket_server'): from electrum import websockets diff --git a/tox.ini b/tox.ini index 895f302a..fa202c7e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py36 +envlist = py36, py37 [testenv] deps=