diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index a5e120f..c9cd5b9 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,9 @@ +Version 0.3.1 +------------- + +- fixes issue #9 +- save DB version in DB; warn on DB open if incompatible format + Version 0.3 ----------- diff --git a/server/block_processor.py b/server/block_processor.py index 33d3936..d3ec0e0 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -241,7 +241,7 @@ class MemPool(LoggedClass): await asyncio.sleep(0) if initial and time.time() > next_log: - next_log = time.time() + 10 + next_log = time.time() + 20 self.logger.info('{:,d} done ({:d}%)' .format(n, int(n / len(new_txs) * 100))) @@ -518,17 +518,7 @@ class BlockProcessor(server.db.DB): self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.tx_count - state = { - 'genesis': self.coin.GENESIS_HASH, - 'height': self.db_height, - 'tx_count': self.db_tx_count, - 'tip': self.db_tip, - 'flush_count': self.flush_count, - 'utxo_flush_count': self.utxo_flush_count, - 'wall_time': self.wall_time, - 'first_sync': self.first_sync, - } - batch.put(b'state', repr(state).encode()) + self.write_state(batch) def assert_flushed(self): '''Asserts state is fully flushed.''' diff --git a/server/daemon.py b/server/daemon.py index 7a886fe..4696d76 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -45,20 +45,55 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - async def post(self, data): - '''Send data to the daemon and handle the response.''' - async with self.workqueue_semaphore: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() + async def _send(self, payload, processor): + '''Send a payload to be converted to JSON. - if isinstance(result, list): - errs = [item['error'] for item in result] - if not any(errs): - return [item['result'] for item in result] - if any(err.get('code') == self.WARMING_UP for err in errs if err): - raise DaemonWarmingUpError - raise DaemonError(errs) - else: + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' + prior_msg = None + skip_count = None + + def log_error(msg, skip_once=False): + if skip_once and skip_count is None: + skip_count = 1 + if msg != prior_msg or skip_count == 0: + skip_count = 10 + prior_msg = msg + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + skip_count -= 1 + + data = json.dumps(payload) + secs = 1 + while True: + try: + async with self.workqueue_semaphore: + async with aiohttp.post(self.url, data=data) as resp: + result = processor(await resp.json()) + if prior_msg: + self.logger.info('connection restored') + return result + except (asyncio.CancelledError, DaemonError): + raise + except asyncio.TimeoutError: + log_error('timeout error', skip_once=True) + except aiohttp.ClientHttpProcessingError: + log_error('HTTP error', skip_once=True) + except aiohttp.ServerDisconnectedError: + log_error('disconnected', skip_once=True) + except aiohttp.ClientConnectionError: + log_error('connection problem - is your daemon running?') + except DaemonWarmingUpError: + log_error('still starting up checking blocks...') + except Exception as e: + log_error('request gave unexpected error: {}'.format(e)) + await asyncio.sleep(secs) + secs = min(16, secs * 2) + + async def _send_single(self, method, params=None): + '''Send a single request to the daemon.''' + def processor(result): err = result['error'] if not err: return result['result'] @@ -66,73 +101,39 @@ class Daemon(util.LoggedClass): raise DaemonWarmingUpError raise DaemonError(err) - async def send(self, payload): - '''Send a payload to be converted to JSON. - - Handles temporary connection issues. Daemon reponse errors - are raise through DaemonError. - ''' - data = json.dumps(payload) - secs = 1 - prior_msg = None - while True: - try: - result = await self.post(data) - if prior_msg: - self.logger.info('connection successfully restored') - return result - except asyncio.TimeoutError: - msg = 'timeout error' - except aiohttp.ClientHttpProcessingError: - msg = 'HTTP error' - except aiohttp.ServerDisconnectedError: - msg = 'disconnected' - except aiohttp.ClientConnectionError: - msg = 'connection problem - is your daemon running?' - except DaemonWarmingUpError: - msg = 'still starting up checking blocks...' - except (asyncio.CancelledError, DaemonError): - raise - except Exception as e: - msg = ('request gave unexpected error: {}'.format(e)) - - if msg != prior_msg or count == 10: - self.logger.error('{}. Retrying between sleeps...' - .format(msg)) - prior_msg = msg - count = 0 - await asyncio.sleep(secs) - count += 1 - secs = min(16, secs * 2) - - async def send_single(self, method, params=None): - '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - return await self.send(payload) + return await self._send(payload, processor) - async def send_many(self, mp_iterable): - '''Send several requests at once.''' - payload = [{'method': m, 'params': p} for m, p in mp_iterable] + async def _send_vector(self, method, params_iterable, replace_errs=False): + '''Send several requests of the same method. + + The result will be an array of the same length as params_iterable. + If replace_errs is true, any item with an error is returned as None, + othewise an exception is raised.''' + def processor(result): + errs = [item['error'] for item in result if item['error']] + if not errs or replace_errs: + return [item['result'] for item in result] + if any(err.get('code') == self.WARMING_UP for err in errs): + raise DaemonWarmingUpError + raise DaemonError(errs) + + payload = [{'method': method, 'params': p} for p in params_iterable] if payload: - return await self.send(payload) + return await self._send(payload, processor) return [] - async def send_vector(self, method, params_iterable): - '''Send several requests of the same method.''' - return await self.send_many((method, params) - for params in params_iterable) - async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' params_iterable = ((h, ) for h in range(first, first + count)) - return await self.send_vector('getblockhash', params_iterable) + return await self._send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' params_iterable = ((h, False) for h in hex_hashes) - blocks = await self.send_vector('getblock', params_iterable) + blocks = await self._send_vector('getblock', params_iterable) # Convert hex string to bytes return [bytes.fromhex(block) for block in blocks] @@ -140,39 +141,40 @@ class Daemon(util.LoggedClass): '''Return the hashes of the txs in the daemon's mempool.''' if self.debug_caught_up: return [] - return await self.send_single('getrawmempool') + return await self._send_single('getrawmempool') async def estimatefee(self, params): '''Return the fee estimate for the given parameters.''' - return await self.send_single('estimatefee', params) + return await self._send_single('estimatefee', params) async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' - net_info = await self.send_single('getnetworkinfo') + net_info = await self._send_single('getnetworkinfo') return net_info['relayfee'] async def getrawtransaction(self, hex_hash): '''Return the serialized raw transaction with the given hash.''' - return await self.send_single('getrawtransaction', (hex_hash, 0)) + return await self._send_single('getrawtransaction', (hex_hash, 0)) - async def getrawtransactions(self, hex_hashes): + async def getrawtransactions(self, hex_hashes, replace_errs=True): '''Return the serialized raw transactions with the given hashes. - Breaks large requests up. Yields after each sub request.''' + Replaces errors with None by default.''' params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) - txs = await self.send_vector('getrawtransaction', params_iterable) + txs = await self._send_vector('getrawtransaction', params_iterable, + replace_errs=replace_errs) # Convert hex strings to bytes - return [bytes.fromhex(tx) for tx in txs] + return [bytes.fromhex(tx) if tx else None for tx in txs] async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.''' - return await self.send_single('sendrawtransaction', params) + return await self._send_single('sendrawtransaction', params) async def height(self): '''Query the daemon for its current height.''' if not self.debug_caught_up: - self._height = await self.send_single('getblockcount') + self._height = await self._send_single('getblockcount') return self._height def cached_height(self): diff --git a/server/db.py b/server/db.py index fc01c31..d0d14a0 100644 --- a/server/db.py +++ b/server/db.py @@ -29,6 +29,8 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' + VERSIONS = [0] + class MissingUTXOError(Exception): '''Raised if a mempool tx input UTXO couldn't be found.''' @@ -53,7 +55,7 @@ class DB(LoggedClass): else: self.logger.info('successfully opened {} database {}' .format(env.db_engine, db_name)) - self.init_state_from_db() + self.read_state() create = self.db_height == -1 self.headers_file = self.open_file('headers', create) @@ -70,7 +72,7 @@ class DB(LoggedClass): else: assert self.db_tx_count == 0 - def init_state_from_db(self): + def read_state(self): if self.db.is_new: self.db_height = -1 self.db_tx_count = 0 @@ -81,7 +83,15 @@ class DB(LoggedClass): self.first_sync = True else: state = self.db.get(b'state') - state = ast.literal_eval(state.decode()) + if state: + state = ast.literal_eval(state.decode()) + if not isinstance(state, dict): + raise self.DBError('failed reading state from DB') + db_version = state.get('db_version', 0) + if db_version not in self.VERSIONS: + raise self.DBError('your DB version is {} but this software ' + 'only handles versions {}' + .format(db_version, self.VERSIONS)) if state['genesis'] != self.coin.GENESIS_HASH: raise self.DBError('DB genesis hash {} does not match coin {}' .format(state['genesis_hash'], @@ -92,7 +102,22 @@ class DB(LoggedClass): self.flush_count = state['flush_count'] self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] - self.first_sync = state.get('first_sync', True) + self.first_sync = state['first_sync'] + + def write_state(self, batch): + '''Write chain state to the batch.''' + state = { + 'genesis': self.coin.GENESIS_HASH, + 'height': self.db_height, + 'tx_count': self.db_tx_count, + 'tip': self.db_tip, + 'flush_count': self.flush_count, + 'utxo_flush_count': self.utxo_flush_count, + 'wall_time': self.wall_time, + 'first_sync': self.first_sync, + 'db_version': max(self.VERSIONS), + } + batch.put(b'state', repr(state).encode()) def open_file(self, filename, create=False): '''Open the file name. Return its handle.''' diff --git a/server/version.py b/server/version.py index 150707e..6e1d57d 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.3" +VERSION = "ElectrumX 0.3.1"