70 lines
3.2 KiB
Python
70 lines
3.2 KiB
Python
from stratum.pubsub import Pubsub, Subscription
|
|
from mining.interfaces import Interfaces
|
|
|
|
import lib.settings as settings
|
|
import lib.logger
|
|
log = lib.logger.get_logger('subscription')
|
|
|
|
class MiningSubscription(Subscription):
|
|
'''This subscription object implements
|
|
logic for broadcasting new jobs to the clients.'''
|
|
|
|
event = 'mining.notify'
|
|
|
|
@classmethod
|
|
def on_template(cls, is_new_block):
|
|
'''This is called when TemplateRegistry registers
|
|
new block which we have to broadcast clients.'''
|
|
|
|
start = Interfaces.timestamper.time()
|
|
clean_jobs = is_new_block
|
|
|
|
(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
|
|
Interfaces.template_registry.get_last_broadcast_args()
|
|
|
|
# Push new job to subscribed clients
|
|
for subscription in Pubsub.iterate_subscribers(cls.event):
|
|
try:
|
|
if subscription != None:
|
|
session = subscription.connection_ref().get_session()
|
|
session.setdefault('authorized', {})
|
|
if session['authorized'].keys():
|
|
worker_name = session['authorized'].keys()[0]
|
|
difficulty = session['difficulty']
|
|
work_id = Interfaces.worker_manager.register_work(worker_name, job_id, difficulty)
|
|
subscription.emit_single(work_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs)
|
|
else:
|
|
subscription.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs)
|
|
except Exception as e:
|
|
log.exception("Error broadcasting work to client %s" % str(e))
|
|
pass
|
|
|
|
cnt = Pubsub.get_subscription_count(cls.event)
|
|
log.info("BROADCASTED to %d connections in %.03f sec" % (cnt, (Interfaces.timestamper.time() - start)))
|
|
|
|
def _finish_after_subscribe(self, result):
|
|
'''Send new job to newly subscribed client'''
|
|
try:
|
|
(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \
|
|
Interfaces.template_registry.get_last_broadcast_args()
|
|
except Exception:
|
|
log.error("Template not ready yet")
|
|
return result
|
|
|
|
# Force set higher difficulty
|
|
self.connection_ref().rpc('mining.set_difficulty', [settings.POOL_TARGET, ], is_notification=True)
|
|
# self.connection_ref().rpc('client.get_version', [])
|
|
|
|
# Force client to remove previous jobs if any (eg. from previous connection)
|
|
clean_jobs = True
|
|
self.emit_single(job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, True)
|
|
|
|
return result
|
|
|
|
def after_subscribe(self, *args):
|
|
'''This will send new job to the client *after* he receive subscription details.
|
|
on_finish callback solve the issue that job is broadcasted *during*
|
|
the subscription request and client receive messages in wrong order.'''
|
|
self.connection_ref().on_finish.addCallback(self._finish_after_subscribe)
|
|
|