From 91ca5fc14a69aa2ae2a139a2455a55563dcb8b2b Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 5 Jan 2017 22:43:57 +0900 Subject: [PATCH] Split out history into its own DB. --- docs/ENVIRONMENT.rst | 52 ++++--- query.py | 12 +- server/block_processor.py | 116 ++++++---------- server/db.py | 277 +++++++++++++++++++++++--------------- server/env.py | 2 +- server/storage.py | 16 +-- 6 files changed, 247 insertions(+), 228 deletions(-) diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 13c9227..8d9ff9c 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -260,34 +260,29 @@ Cache ----- If synchronizing from the Genesis block your performance might change -by tweaking the following cache variables. Cache size is only checked -roughly every minute, so the caches can grow beyond the specified -size. Also the Python process is often quite a bit fatter than the -combined cache size, because of Python overhead and also because -leveldb consumes a lot of memory during UTXO flushing. So I recommend -you set the sum of these to nothing over half your available physical -RAM: +by tweaking the cache size. Cache size is only checked roughly every +minute, so the cache can grow beyond the specified size. Moreover, +the Python process is often quite a bit fatter than the cache size, +because of Python overhead and also because leveldb consumes a lot of +memory when flushing. So I recommend you do not set this over 60% of +your available physical RAM: -* **HIST_MB** +* **CACHE_MB** - The amount of history cache, in MB, to retain before flushing to - disk. Default is 300; probably no benefit being much larger as - history is append-only and not searched. + The amount of cache, in MB, to use. The default is 1,200. - I do not recommend setting this above 500. + A portion of the cache is reserved for unflushed history, which is + written out frequently. The bulk is used to cache UTXOs. -* **UTXO_MB** + Larger caches probably increase performance a little as there is + significant searching of the UTXO cache during indexing. However, I + don't see much benefit in my tests pushing this too high, and in + fact performance begins to fall, probably because LevelDB already + caches, and also because of Python GC. - The amount of UTXO and history cache, in MB, to retain before - flushing to disk. Default is 1000. This may be too large for small - boxes or too small for machines with lots of RAM. Larger caches - generally perform better as there is significant searching of the - UTXO cache during indexing. However, I don't see much benefit in my - tests pushing this too high, and in fact performance begins to fall. - My machine has 24GB RAM; the slow down is probably because of - leveldb caching and Python GC effects. - - I do not recommend setting this above 2000. + I do not recommend raising this above 2000. If upgrading from prior + versions, a value of 90% of the sum of the old UTXO_MB and HIST_MB + variables is roughly equivalent. Debugging --------- @@ -297,9 +292,12 @@ The following are for debugging purposes: * **FORCE_REORG** If set to a positive integer, it will simulate a reorg of the - blockchain for that number of blocks on startup. Although it should - fail gracefully if set to a value greater than **REORG_LIMIT**, I do - not recommend it as I have not tried it and there is a chance your - DB might corrupt. + blockchain for that number of blocks on startup. You must have + synced before using this, otherwise there will be no undo + information. + + Although it should fail gracefully if set to a value greater than + **REORG_LIMIT**, I do not recommend it as I have not tried it and + there is a chance your DB might corrupt. .. _lib/coins.py: https://github.com/kyuupichan/electrumx/blob/master/lib/coins.py diff --git a/query.py b/query.py index 05ae277..3824d71 100755 --- a/query.py +++ b/query.py @@ -20,23 +20,23 @@ from server.db import DB from lib.hash import hash_to_str -def count_entries(db): +def count_entries(hist_db, utxo_db): utxos = 0 - for key in db.iterator(prefix=b'u', include_value=False): + for key in utxo_db.iterator(prefix=b'u', include_value=False): utxos += 1 print("UTXO count:", utxos) hashX = 0 - for key in db.iterator(prefix=b'h', include_value=False): + for key in utxo_db.iterator(prefix=b'h', include_value=False): hashX += 1 print("HashX count:", hashX) hist = 0 hist_len = 0 - for key, value in db.iterator(prefix=b'H'): + for key, value in hist_db.iterator(prefix=b'H'): hist += 1 hist_len += len(value) // 4 - print("History rows {:,d} entries {:,d}", hist, hist_len) + print("History rows {:,d} entries {:,d}".format(hist, hist_len)) def main(): @@ -44,7 +44,7 @@ def main(): bp = DB(env) coin = env.coin if len(sys.argv) == 1: - count_entries(bp.db) + count_entries(bp.hist_db, bp.utxo_db) return argc = 1 try: diff --git a/server/block_processor.py b/server/block_processor.py index b301c4f..0174bcf 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -12,7 +12,6 @@ import array import asyncio from struct import pack, unpack import time -from bisect import bisect_left from collections import defaultdict from functools import partial @@ -192,7 +191,7 @@ class BlockProcessor(server.db.DB): self.db_deletes = [] # Log state - if self.first_sync: + if self.utxo_db.for_sync: self.logger.info('flushing DB cache at {:,d} MB' .format(self.cache_MB)) @@ -250,13 +249,12 @@ class BlockProcessor(server.db.DB): def caught_up(self): '''Called when first caught up after starting.''' if not self.caught_up_event.is_set(): + self.first_sync = False self.flush(True) - if self.first_sync: + if self.utxo_db.for_sync: self.logger.info('{} synced to height {:,d}' .format(VERSION, self.height)) - self.first_sync = False - self.flush_state(self.db) - self.open_db(for_sync=False) + self.open_dbs() self.caught_up_event.set() async def handle_chain_reorg(self, touched, count=None): @@ -336,22 +334,34 @@ class BlockProcessor(server.db.DB): self.assert_flushed() return - self.flush_count += 1 flush_start = time.time() last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - with self.db.write_batch() as batch: - # History first - fast and frees memory. Flush state last - # as it reads the wall time. - self.flush_history(batch) + # Flush to file system + self.fs_flush() + fs_end = time.time() + if self.utxo_db.for_sync: + self.logger.info('flushed to FS in {:.1f}s' + .format(fs_end - flush_start)) + + # History next - it's fast and frees memory + self.flush_history(self.history) + if self.utxo_db.for_sync: + self.logger.info('flushed history in {:.1f}s for {:,d} addrs' + .format(time.time() - fs_end, len(self.history))) + self.history = defaultdict(partial(array.array, 'I')) + self.history_size = 0 + + # Flush state last as it reads the wall time. + with self.utxo_db.write_batch() as batch: if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch - self.flush_state(self.db) + self.flush_state(self.utxo_db) self.logger.info('flush #{:,d} took {:.1f}s. Height {:,d} txs: {:,d}' .format(self.flush_count, @@ -359,7 +369,7 @@ class BlockProcessor(server.db.DB): self.height, self.tx_count)) # Catch-up stats - if self.first_sync: + if self.utxo_db.for_sync: daemon_height = self.daemon.cached_height() tx_per_sec = int(self.tx_count / self.wall_time) this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) @@ -381,32 +391,12 @@ class BlockProcessor(server.db.DB): .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) - def flush_history(self, batch): - fs_start = time.time() - self.fs_flush() - fs_end = time.time() - - flush_id = pack('>H', self.flush_count) - - for hashX, hist in self.history.items(): - key = b'H' + hashX + flush_id - batch.put(key, hist.tobytes()) - - if self.first_sync: - self.logger.info('flushed to FS in {:.1f}s, history in {:.1f}s ' - 'for {:,d} addrs' - .format(fs_end - fs_start, time.time() - fs_end, - len(self.history))) - self.history = defaultdict(partial(array.array, 'I')) - self.history_size = 0 - def fs_flush(self): '''Flush the things stored on the filesystem.''' assert self.fs_height + len(self.headers) == self.height assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 self.fs_update(self.fs_height, self.headers, self.tx_hashes) - self.fs_height = self.height self.fs_tx_count = self.tx_count self.tx_hashes = [] @@ -422,55 +412,30 @@ class BlockProcessor(server.db.DB): assert self.height < self.db_height assert not self.history - self.flush_count += 1 flush_start = time.time() - with self.db.write_batch() as batch: + # Backup FS (just move the pointers back) + self.fs_height = self.height + self.fs_tx_count = self.tx_count + assert not self.headers + assert not self.tx_hashes + + # Backup history + nremoves = self.backup_history(hashXs) + self.logger.info('backing up removed {:,d} history entries from ' + '{:,d} addresses'.format(nremoves, len(hashXs))) + + with self.utxo_db.write_batch() as batch: # Flush state last as it reads the wall time. - self.backup_history(batch, hashXs) self.flush_utxos(batch) self.flush_state(batch) - # Update and put the wall time again - otherwise we drop the - # time it took to commit the batch - self.flush_state(self.db) - self.logger.info('backup flush #{:,d} took {:.1f}s. ' 'Height {:,d} txs: {:,d}' .format(self.flush_count, self.last_flush - flush_start, self.height, self.tx_count)) - def backup_history(self, batch, hashXs): - nremoves = 0 - for hashX in sorted(hashXs): - prefix = b'H' + hashX - deletes = [] - puts = {} - for key, hist in self.db.iterator(prefix=prefix, reverse=True): - a = array.array('I') - a.frombytes(hist) - # Remove all history entries >= self.tx_count - idx = bisect_left(a, self.tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[key] = a[:idx].tobytes() - break - deletes.append(key) - - for key in deletes: - batch.delete(key) - for key, value in puts.items(): - batch.put(key, value) - - self.fs_height = self.height - self.fs_tx_count = self.tx_count - assert not self.headers - assert not self.tx_hashes - - self.logger.info('backing up removed {:,d} history entries from ' - '{:,d} addresses'.format(nremoves, len(hashXs))) - def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and @@ -701,7 +666,7 @@ class BlockProcessor(server.db.DB): # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX - in self.db.iterator(prefix=prefix)} + in self.utxo_db.iterator(prefix=prefix)} for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] @@ -716,7 +681,7 @@ class BlockProcessor(server.db.DB): # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer udb_key = b'u' + hashX + hdb_key[-6:] - utxo_value_packed = self.db.get(udb_key) + utxo_value_packed = self.utxo_db.get(udb_key) if utxo_value_packed: # Remove both entries for this UTXO self.db_deletes.append(hdb_key) @@ -733,9 +698,10 @@ class BlockProcessor(server.db.DB): # may be in the DB already. flush_start = time.time() delete_count = len(self.db_deletes) // 2 + utxo_cache_len = len(self.utxo_cache) batch_delete = batch.delete - for key in self.db_deletes: + for key in sorted(self.db_deletes): batch_delete(key) self.db_deletes = [] @@ -747,12 +713,12 @@ class BlockProcessor(server.db.DB): batch_put(b'h' + cache_key[:4] + suffix, hashX) batch_put(b'u' + hashX + suffix, cache_value[-8:]) - if self.first_sync: + if self.utxo_db.for_sync: self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' 'adds, {:,d} spends in {:.1f}s, committing...' .format(self.height - self.db_height, self.tx_count - self.db_tx_count, - len(self.utxo_cache), delete_count, + utxo_cache_len, delete_count, time.time() - flush_start)) self.utxo_cache = {} diff --git a/server/db.py b/server/db.py index 219a233..0c653e3 100644 --- a/server/db.py +++ b/server/db.py @@ -13,12 +13,12 @@ import ast import itertools import os from struct import pack, unpack -from bisect import bisect_right +from bisect import bisect_left, bisect_right from collections import namedtuple import lib.util as util from lib.hash import hash_to_str -from server.storage import open_db +from server.storage import db_class from server.version import VERSION @@ -31,7 +31,7 @@ class DB(util.LoggedClass): it was shutdown uncleanly. ''' - DB_VERSIONS = [4] + DB_VERSIONS = [5] class MissingUTXOError(Exception): '''Raised if a mempool tx input UTXO couldn't be found.''' @@ -48,8 +48,13 @@ class DB(util.LoggedClass): .format(env.db_dir)) os.chdir(env.db_dir) - self.db = None - self.open_db(for_sync=False) + self.db_class = db_class(self.env.db_engine) + self.logger.info('using {} for DB backend'.format(self.env.db_engine)) + + self.utxo_db = None + self.open_dbs() + self.clean_db() + self.logger.info('reorg limit is {:,d} blocks' .format(self.env.reorg_limit)) @@ -67,67 +72,68 @@ class DB(util.LoggedClass): assert self.db_tx_count == self.tx_counts[-1] else: assert self.db_tx_count == 0 - self.clean_db() - def open_db(self, for_sync): - '''Open the database. If the database is already open, it is - closed and re-opened. + def open_dbs(self): + '''Open the databases. If already open they are closed and re-opened. - If for_sync is True, it is opened for sync (high number of open - file, etc.) - Re-open to set the maximum number of open files appropriately. + When syncing we want to reserve a lot of open files for the + synchtonization. When serving clients we want the open files for + serving network connections. ''' def log_reason(message, is_for_sync): reason = 'sync' if is_for_sync else 'serving' self.logger.info('{} for {}'.format(message, reason)) - if self.db: - if self.db.for_sync == for_sync: - return - log_reason('closing DB to re-open', for_sync) - self.db.close() + # Assume we're serving until we find out otherwise + for for_sync in [False, True]: + if self.utxo_db: + if self.utxo_db.for_sync == for_sync: + return + log_reason('closing DB to re-open', for_sync) + self.utxo_db.close() + self.hist_db.close() - # Open DB and metadata files. Record some of its state. - self.db = open_db('db', self.env.db_engine, for_sync) - if self.db.is_new: - self.logger.info('created new {} database' - .format(self.env.db_engine)) - self.logger.info('creating metadata diretcory') - os.mkdir('meta') - with self.open_file('COIN', create=True) as f: - f.write('ElectrumX DB and metadata files for {} {}' - .format(self.coin.NAME, self.coin.NET).encode()) - else: - log_reason('opened {} database'.format(self.env.db_engine), - self.db.for_sync) + # Open DB and metadata files. Record some of its state. + self.utxo_db = self.db_class('utxo', for_sync) + self.hist_db = self.db_class('hist', for_sync) + if self.utxo_db.is_new: + self.logger.info('created new database') + self.logger.info('creating metadata diretcory') + os.mkdir('meta') + with self.open_file('COIN', create=True) as f: + f.write('ElectrumX databases and metadata for {} {}' + .format(self.coin.NAME, self.coin.NET).encode()) + else: + log_reason('opened DB', self.utxo_db.for_sync) - self.read_state() - if self.first_sync == self.db.for_sync: - self.logger.info('software version: {}'.format(VERSION)) - self.logger.info('DB version: {:d}'.format(self.db_version)) - self.logger.info('coin: {}'.format(self.coin.NAME)) - self.logger.info('network: {}'.format(self.coin.NET)) - self.logger.info('height: {:,d}'.format(self.db_height)) - self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) - self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) - if self.first_sync: - self.logger.info('sync time so far: {}' - .format(util.formatted_time(self.wall_time))) - else: - self.open_db(self.first_sync) + self.read_utxo_state() + if self.first_sync == self.utxo_db.for_sync: + break - def read_state(self): - if self.db.is_new: + self.read_history_state() + + self.logger.info('software version: {}'.format(VERSION)) + self.logger.info('DB version: {:d}'.format(self.db_version)) + self.logger.info('coin: {}'.format(self.coin.NAME)) + self.logger.info('network: {}'.format(self.coin.NET)) + self.logger.info('height: {:,d}'.format(self.db_height)) + self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) + self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.first_sync: + self.logger.info('sync time so far: {}' + .format(util.formatted_time(self.wall_time))) + + def read_utxo_state(self): + if self.utxo_db.is_new: self.db_height = -1 self.db_tx_count = 0 self.db_tip = b'\0' * 32 self.db_version = max(self.DB_VERSIONS) - self.flush_count = 0 self.utxo_flush_count = 0 self.wall_time = 0 self.first_sync = True else: - state = self.db.get(b'state') + state = self.utxo_db.get(b'state') if state: state = ast.literal_eval(state.decode()) if not isinstance(state, dict): @@ -144,22 +150,17 @@ class DB(util.LoggedClass): self.db_height = state['height'] self.db_tx_count = state['tx_count'] self.db_tip = state['tip'] - self.flush_count = state['flush_count'] self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] - if self.flush_count < self.utxo_flush_count: - raise self.DBError('DB corrupt: flush_count < utxo_flush_count') - def write_state(self, batch): - '''Write chain state to the batch.''' + '''Write (UTXO) state to the batch.''' state = { 'genesis': self.coin.GENESIS_HASH, 'height': self.db_height, 'tx_count': self.db_tx_count, 'tip': self.db_tip, - 'flush_count': self.flush_count, 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, 'first_sync': self.first_sync, @@ -174,48 +175,28 @@ class DB(util.LoggedClass): recent UTXO flush (only happens on unclean shutdown), and aged undo information. ''' + if self.flush_count < self.utxo_flush_count: + raise self.DBError('DB corrupt: flush_count < utxo_flush_count') if self.flush_count > self.utxo_flush_count: - self.utxo_flush_count = self.flush_count - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') - history_keys = self.excess_history_keys() - self.logger.info('deleting {:,d} history entries' - .format(len(history_keys))) - else: - history_keys = [] + self.clear_excess_history(self.utxo_flush_count) - undo_keys = self.stale_undo_keys() - if undo_keys: - self.logger.info('deleting {:,d} stale undo entries' - .format(len(undo_keys))) - - with self.db.write_batch() as batch: - batch_delete = batch.delete - for key in history_keys: - batch_delete(key) - for key in undo_keys: - batch_delete(key) - self.write_state(batch) - - def excess_history_keys(self): - prefix = b'H' - keys = [] - for key, hist in self.db.iterator(prefix=prefix): - flush_id, = unpack('>H', key[-2:]) - if flush_id > self.utxo_flush_count: - keys.append(key) - return keys - - def stale_undo_keys(self): + # Remove stale undo information prefix = b'U' cutoff = self.db_height - self.env.reorg_limit keys = [] - for key, hist in self.db.iterator(prefix=prefix): + for key, hist in self.utxo_db.iterator(prefix=prefix): height, = unpack('>I', key[-4:]) if height > cutoff: break keys.append(key) - return keys + if keys: + self.logger.info('deleting {:,d} stale undo entries' + .format(len(keys))) + + with self.utxo_db.write_batch() as batch: + for key in keys: + batch.delete(key) + self.write_state(batch) def undo_key(self, height): '''DB key for undo information at the given height.''' @@ -223,11 +204,11 @@ class DB(util.LoggedClass): def write_undo_info(self, height, undo_info): '''Write out undo information for the current height.''' - self.db.put(self.undo_key(height), undo_info) + self.utxo_db.put(self.undo_key(height), undo_info) def read_undo_info(self, height): '''Read undo information from a file for the current height.''' - return self.db.get(self.undo_key(height)) + return self.utxo_db.get(self.undo_key(height)) def open_file(self, filename, create=False): '''Open the file name. Return its handle.''' @@ -308,24 +289,6 @@ class DB(util.LoggedClass): assert isinstance(limit, int) and limit >= 0 return limit - def get_history(self, hashX, limit=1000): - '''Generator that returns an unpruned, sorted list of (tx_hash, - height) tuples of confirmed transactions that touched the address, - earliest in the blockchain first. Includes both spending and - receiving transactions. By default yields at most 1000 entries. - Set limit to None to get them all. - ''' - limit = self._resolve_limit(limit) - prefix = b'H' + hashX - for key, hist in self.db.iterator(prefix=prefix): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield self.fs_tx_hash(tx_num) - limit -= 1 - def get_balance(self, hashX): '''Returns the confirmed balance of an address.''' return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) @@ -340,7 +303,7 @@ class DB(util.LoggedClass): # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX - for db_key, db_value in self.db.iterator(prefix=prefix): + for db_key, db_value in self.utxo_db.iterator(prefix=prefix): if limit == 0: return limit -= 1 @@ -358,7 +321,7 @@ class DB(util.LoggedClass): prefix = b'h' + tx_hash[:4] + idx_packed # Find which entry, if any, the TX_HASH matches. - for db_key, hashX in self.db.iterator(prefix=prefix): + for db_key, hashX in self.utxo_db.iterator(prefix=prefix): tx_num_packed = db_key[-4:] tx_num, = unpack('H', key[-2:]) + if flush_id > flush_count: + keys.append(key) + + self.logger.info('deleting {:,d} history entries'.format(len(keys))) + + self.flush_count = flush_count + with self.hist_db.write_batch() as batch: + for key in keys: + batch.delete(key) + self.write_history_state(batch) + + self.logger.info('deleted excess history entries') + + def write_history_state(self, batch): + state = {'flush_count': self.flush_count} + # History entries are not prefixed; the suffix \0\0 ensures we + # look similar to other entries and aren't interfered with + batch.put(b'state\0\0', repr(state).encode()) + + def read_history_state(self): + state = self.hist_db.get(b'state\0\0') + if state: + state = ast.literal_eval(state.decode()) + if not isinstance(state, dict): + raise self.DBError('failed reading state from history DB') + self.flush_count = state['flush_count'] + else: + self.flush_count = 0 + + def flush_history(self, history): + self.flush_count += 1 + flush_id = pack('>H', self.flush_count) + + with self.hist_db.write_batch() as batch: + for hashX in sorted(history): + key = hashX + flush_id + batch.put(key, history[hashX].tobytes()) + self.write_history_state(batch) + + def backup_history(self, hashXs): + # Not certain this is needed, but it doesn't hurt + self.flush_count += 1 + nremoves = 0 + + with self.hist_db.write_batch() as batch: + for hashX in sorted(hashXs): + deletes = [] + puts = {} + for key, hist in self.hist_db.iterator(prefix=hashX, + reverse=True): + a = array.array('I') + a.frombytes(hist) + # Remove all history entries >= self.tx_count + idx = bisect_left(a, self.tx_count) + nremoves += len(a) - idx + if idx > 0: + puts[key] = a[:idx].tobytes() + break + deletes.append(key) + + for key in deletes: + batch.delete(key) + for key, value in puts.items(): + batch.put(key, value) + self.write_history_state(batch) + + return nremoves + + def get_history(self, hashX, limit=1000): + '''Generator that returns an unpruned, sorted list of (tx_hash, + height) tuples of confirmed transactions that touched the address, + earliest in the blockchain first. Includes both spending and + receiving transactions. By default yields at most 1000 entries. + Set limit to None to get them all. + ''' + limit = self._resolve_limit(limit) + for key, hist in self.hist_db.iterator(prefix=hashX): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + if limit == 0: + return + yield self.fs_tx_hash(tx_num) + limit -= 1 diff --git a/server/env.py b/server/env.py index 15d8137..4ce5c14 100644 --- a/server/env.py +++ b/server/env.py @@ -27,7 +27,7 @@ class Env(LoggedClass): network = self.default('NETWORK', 'mainnet') self.coin = Coin.lookup_coin_class(coin_name, network) self.db_dir = self.required('DB_DIRECTORY') - self.cache_MB = self.integer('CACHE_MB', 1250) + self.cache_MB = self.integer('CACHE_MB', 1200) self.host = self.default('HOST', 'localhost') self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.daemon_url = self.required('DAEMON_URL') diff --git a/server/storage.py b/server/storage.py index edf62d0..a2b5d7f 100644 --- a/server/storage.py +++ b/server/storage.py @@ -15,15 +15,13 @@ from functools import partial from lib.util import subclasses, increment_byte_string - -def open_db(name, db_engine, for_sync): - '''Returns a database handle.''' +def db_class(name): + '''Returns a DB engine class.''' for db_class in subclasses(Storage): - if db_class.__name__.lower() == db_engine.lower(): + if db_class.__name__.lower() == name.lower(): db_class.import_module() - return db_class(name, for_sync) - - raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine)) + return db_class + raise RuntimeError('unrecognised DB engine "{}"'.format(name)) class Storage(object): @@ -81,7 +79,7 @@ class LevelDB(Storage): cls.module = plyvel def open(self, name, create): - mof = 1024 if self.for_sync else 256 + mof = 512 if self.for_sync else 128 self.db = self.module.DB(name, create_if_missing=create, max_open_files=mof, compression=None) self.close = self.db.close @@ -101,7 +99,7 @@ class RocksDB(Storage): cls.module = rocksdb def open(self, name, create): - mof = 1024 if self.for_sync else 256 + mof = 512 if self.for_sync else 128 compression = "no" compression = getattr(self.module.CompressionType, compression + "_compression")