From e2b06d28391ed255c4531cc90495d596252e8131 Mon Sep 17 00:00:00 2001 From: obigal Date: Sun, 15 Dec 2013 14:06:36 -0500 Subject: [PATCH] vardiff-bug proper fix / send unique work ids to clients --- conf/config_sample.py | 2 -- lib/template_registry.py | 13 +++---------- mining/__init__.py | 6 ++++++ mining/basic_share_limiter.py | 7 +++++-- mining/interfaces.py | 18 ++++++++++++++++++ mining/service.py | 24 ++++++++++++++---------- mining/subscription.py | 13 +++++++++++-- mining/work_log_pruner.py | 23 +++++++++++++++++++++++ 8 files changed, 80 insertions(+), 26 deletions(-) create mode 100644 mining/work_log_pruner.py diff --git a/conf/config_sample.py b/conf/config_sample.py index fe40bf0..23b34f8 100644 --- a/conf/config_sample.py +++ b/conf/config_sample.py @@ -158,8 +158,6 @@ VDIFF_MAX_TARGET = 1024 # Maximum Target difficulty VDIFF_TARGET_TIME = 15 # Target time per share (i.e. try to get 1 share per this many seconds) VDIFF_RETARGET_TIME = 120 # Check to see if we should retarget this often VDIFF_VARIANCE_PERCENT = 30 # Allow average time to very this % from target without retarget -VDIFF_RETARGET_DELAY = 25 # Wait this many seconds before applying new variable difficulty target -VDIFF_RETARGET_REJECT_TIME = 60 # Wait this many seconds before rejecting old difficulty shares # Allow external setting of worker difficulty, checks pool_worker table datarow[6] position for target difficulty # if present or else defaults to pool target, over rides all other difficulty settings, no checks are made diff --git a/lib/template_registry.py b/lib/template_registry.py index 7aa4370..4edd58d 100644 --- a/lib/template_registry.py +++ b/lib/template_registry.py @@ -176,7 +176,7 @@ class TemplateRegistry(object): return j def submit_share(self, job_id, worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, - difficulty, submit_time): + difficulty): '''Check parameters and finalize block template. If it leads to valid block candidate, asynchronously submits the block back to the bitcoin network. @@ -249,16 +249,10 @@ class TemplateRegistry(object): header_hex = header_hex+"000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000" else: pass - target_user = self.diff_to_target(difficulty) - if hash_int > target_user and \ - ( 'prev_jobid' not in session or session['prev_jobid'] < job_id \ - or 'prev_diff' not in session or hash_int > self.diff_to_target(session['prev_diff']) ): + target_user = self.diff_to_target(difficulty) + if hash_int > target_user: raise SubmitException("Share is above target") - if hash_int > target_user and 'prev_ts' in session \ - and (submit_time - session['prev_ts']) > settings.VDIFF_RETARGET_REJECT_TIME: - raise SubmitException("Stale-share above target") - # Mostly for debugging purposes target_info = self.diff_to_target(100000) if hash_int <= target_info: @@ -267,7 +261,6 @@ class TemplateRegistry(object): # Algebra tells us the diff_to_target is the same as hash_to_diff share_diff = int(self.diff_to_target(hash_int)) - # 5. Compare hash with target of the network if hash_int <= job.target: # Yay! It is block candidate! diff --git a/mining/__init__.py b/mining/__init__.py index 65c6082..2047cb6 100644 --- a/mining/__init__.py +++ b/mining/__init__.py @@ -5,6 +5,8 @@ from twisted.internet.error import ConnectionRefusedError import time import simplejson as json from twisted.internet import reactor +import threading +from mining.work_log_pruner import WorkLogPruner @defer.inlineCallbacks def setup(on_startup): @@ -86,6 +88,10 @@ def setup(on_startup): # This is just failsafe solution when -blocknotify # mechanism is not working properly BlockUpdater(registry, bitcoin_rpc) + + prune_thr = threading.Thread(target=WorkLogPruner, args=(Interfaces.worker_manager.job_log,)) + prune_thr.daemon = True + prune_thr.start() log.info("MINING SERVICE IS READY") on_startup.callback(True) diff --git a/mining/basic_share_limiter.py b/mining/basic_share_limiter.py index 06b2fb6..94c052d 100644 --- a/mining/basic_share_limiter.py +++ b/mining/basic_share_limiter.py @@ -167,10 +167,13 @@ class BasicShareLimiter(object): self.worker_stats[worker_name]['buffer'].clear() session = connection_ref().get_session() + + (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \ + Interfaces.template_registry.get_last_broadcast_args() + work_id = Interfaces.worker_manager.register_work(worker_name, job_id, new_diff) - session['prev_diff'] = session['difficulty'] - session['prev_jobid'] = job_id session['difficulty'] = new_diff connection_ref().rpc('mining.set_difficulty', [new_diff, ], is_notification=True) + connection_ref().rpc('mining.notify', [work_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, False, ], is_notification=True) dbi.update_worker_diff(worker_name, new_diff) diff --git a/mining/interfaces.py b/mining/interfaces.py index cf13cd6..b3d44f5 100644 --- a/mining/interfaces.py +++ b/mining/interfaces.py @@ -19,6 +19,8 @@ class WorkerManagerInterface(object): def __init__(self): self.worker_log = {} self.worker_log.setdefault('authorized', {}) + self.job_log = {} + self.job_log.setdefault('None', {}) return def authorize(self, worker_name, worker_password): @@ -33,6 +35,22 @@ class WorkerManagerInterface(object): else: return (False, settings.POOL_TARGET) + def register_work(self, worker_name, job_id, difficulty): + now = Interfaces.timestamper.time() + work_id = WorkIdGenerator.get_new_id() + self.job_log.setdefault(worker_name, {})[work_id] = (job_id, difficulty, now) + return work_id + +class WorkIdGenerator(object): + counter = 1000 + + @classmethod + def get_new_id(cls): + cls.counter += 1 + if cls.counter % 0xffff == 0: + cls.counter = 1 + return "%x" % cls.counter + class ShareLimiterInterface(object): '''Implement difficulty adjustments here''' diff --git a/mining/service.py b/mining/service.py index e1a11f6..4dbc9bd 100644 --- a/mining/service.py +++ b/mining/service.py @@ -80,7 +80,7 @@ class MiningService(GenericService): session['difficulty'] = settings.POOL_TARGET # Following protocol specs, default diff is 1 return Pubsub.subscribe(self.connection_ref(), MiningSubscription()) + (extranonce1_hex, extranonce2_size) - def submit(self, worker_name, job_id, extranonce2, ntime, nonce): + def submit(self, worker_name, work_id, extranonce2, ntime, nonce): '''Try to solve block candidate using given parameters.''' session = self.connection_ref().get_session() @@ -96,8 +96,16 @@ class MiningService(GenericService): if not extranonce1_bin: raise SubmitException("Connection is not subscribed for mining") + # Get current block job_id difficulty = session['difficulty'] - s_difficulty = difficulty + if worker_name in Interfaces.worker_manager.job_log and work_id in Interfaces.worker_manager.job_log[worker_name]: + (job_id, difficulty, job_ts) = Interfaces.worker_manager.job_log[worker_name][work_id] + else: + job_ts = Interfaces.timestamper.time() + Interfaces.worker_manager.job_log.setdefault(worker_name, {})[work_id] = (work_id, difficulty, job_ts) + job_id = work_id + #log.debug("worker_job_log: %s" % repr(Interfaces.worker_manager.job_log)) + submit_time = Interfaces.timestamper.time() ip = self.connection_ref()._get_ip() (valid, invalid, is_banned, diff, is_ext_diff, last_ts) = Interfaces.worker_manager.worker_log['authorized'][worker_name] @@ -119,11 +127,7 @@ class MiningService(GenericService): log.debug("Clearing worker stats for: %s" % worker_name) (valid, invalid, is_banned, last_ts) = (0, 0, is_banned, Interfaces.timestamper.time()) - if 'prev_ts' in session and (submit_time - session['prev_ts']) < settings.VDIFF_RETARGET_DELAY \ - and not is_ext_diff: - difficulty = session['prev_diff'] or session['difficulty'] or settings.POOL_TARGET - diff = difficulty - log.debug("%s (%d, %d, %s, %s, %d) %0.2f%% diff(%f)" % (worker_name, valid, invalid, is_banned, is_ext_diff, last_ts, percent, diff)) + log.debug("%s (%d, %d, %s, %s, %d) %0.2f%% work_id(%s) job_id(%s) diff(%f)" % (worker_name, valid, invalid, is_banned, is_ext_diff, last_ts, percent, work_id, job_id, difficulty)) if not is_ext_diff: Interfaces.share_limiter.submit(self.connection_ref, job_id, difficulty, submit_time, worker_name) @@ -131,11 +135,11 @@ class MiningService(GenericService): # and it is valid proof of work. try: (block_header, block_hash, share_diff, on_submit) = Interfaces.template_registry.submit_share(job_id, - worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, s_difficulty, submit_time) + worker_name, session, extranonce1_bin, extranonce2, ntime, nonce, difficulty) except SubmitException as e: # block_header and block_hash are None when submitted data are corrupted invalid += 1 - Interfaces.worker_manager.worker_log['authorized'][worker_name] = (valid, invalid, is_banned, diff, is_ext_diff, last_ts) + Interfaces.worker_manager.worker_log['authorized'][worker_name] = (valid, invalid, is_banned, difficulty, is_ext_diff, last_ts) if is_banned: raise SubmitException("Worker is temporarily banned") @@ -145,7 +149,7 @@ class MiningService(GenericService): raise valid += 1 - Interfaces.worker_manager.worker_log['authorized'][worker_name] = (valid, invalid, is_banned, diff, is_ext_diff, last_ts) + Interfaces.worker_manager.worker_log['authorized'][worker_name] = (valid, invalid, is_banned, difficulty, is_ext_diff, last_ts) if is_banned: raise SubmitException("Worker is temporarily banned") diff --git a/mining/subscription.py b/mining/subscription.py index 67d2c41..3cac5d6 100644 --- a/mining/subscription.py +++ b/mining/subscription.py @@ -21,9 +21,18 @@ class MiningSubscription(Subscription): (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \ Interfaces.template_registry.get_last_broadcast_args() - + # Push new job to subscribed clients - cls.emit(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs) + for subscription in Pubsub.iterate_subscribers(cls.event): + session = subscription.connection_ref().get_session() + session.setdefault('authorized', {}) + if session['authorized'].keys(): + worker_name = session['authorized'].keys()[0] + difficulty = session['difficulty'] + work_id = Interfaces.worker_manager.register_work(worker_name, job_id, difficulty) + subscription.emit_single(work_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs) + else: + subscription.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs) cnt = Pubsub.get_subscription_count(cls.event) log.info("BROADCASTED to %d connections in %.03f sec" % (cnt, (Interfaces.timestamper.time() - start))) diff --git a/mining/work_log_pruner.py b/mining/work_log_pruner.py new file mode 100644 index 0000000..c4b86a3 --- /dev/null +++ b/mining/work_log_pruner.py @@ -0,0 +1,23 @@ +from time import sleep, time +import traceback +import lib.logger +log = lib.logger.get_logger('work_log_pruner') + +def _WorkLogPruner_I(wl): + now = time() + pruned = 0 + for username in wl: + userwork = wl[username] + for wli in tuple(userwork.keys()): + if now > userwork[wli][2] + 120: + del userwork[wli] + pruned += 1 + log.debug('Pruned %d jobs' % (pruned,)) + +def WorkLogPruner(wl): + while True: + try: + sleep(60) + _WorkLogPruner_I(wl) + except: + log.debug(traceback.format_exc())