diff --git a/conf/config_sample.py b/conf/config_sample.py index e02a2c2..6d00126 100644 --- a/conf/config_sample.py +++ b/conf/config_sample.py @@ -181,13 +181,15 @@ WORKER_BAN_TIME = 300 # How long we temporarily ban worker INVALID_SHARES_PERCENT = 50 # Allow average invalid shares vary this % before we ban # ******************** E-Mail Notification Settings ********************* -NOTIFY_EMAIL_TO = '' # Where to send Start/Found block notifications -NOTIFY_EMAIL_TO_DEADMINER = '' # Where to send dead miner notifications +NOTIFY_ADMIN = 'admin@domain.com' NOTIFY_EMAIL_FROM = 'root@localhost' # Sender address -NOTIFY_EMAIL_SERVER = 'localhost' # E-Mail Sender -NOTIFY_EMAIL_USERNAME = '' # E-Mail server SMTP Logon -NOTIFY_EMAIL_PASSWORD = '' -NOTIFY_EMAIL_USETLS = True +NOTIFY_EMAIL_SERVER = 'localhost' # SMTP Server +NOTIFY_EMAIL_SERVER_PORT = '587' # SMTP Port +NOTIFY_EMAIL_USERNAME = '' # SMTP Login +NOTIFY_EMAIL_PASSWORD = '' # SMTP Password +NOTIFY_EMAIL_USETLS = True # WIP +NOTIFY_DEADMINER_INTERVAL = 600 # Notify Dead Miners every x seconds +NOTIFY_MAX_EMAILS = 10 # Max emails sent per user #### Memcache #### # Memcahce is a requirement. Enter the settings below diff --git a/mining/DBInterface.py b/mining/DBInterface.py index 04fec8a..8faeb5b 100644 --- a/mining/DBInterface.py +++ b/mining/DBInterface.py @@ -4,6 +4,7 @@ from datetime import datetime import Queue import signal import Cache +import notify_email from sets import Set import lib.settings as settings @@ -23,11 +24,15 @@ class DBInterface(): self.cache = Cache.Cache() + self.email = notify_email.NOTIFY_EMAIL() + self.nextStatsUpdate = 0 self.scheduleImport() self.next_force_import_time = time.time() + settings.DB_LOADER_FORCE_TIME + self.next_force_notify_time = time.time() + settings.NOTIFY_DEADMINER_INTERVAL + signal.signal(signal.SIGINT, self.signal_handler) @@ -105,6 +110,122 @@ class DBInterface(): self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], 'connections' : data['connections'], 'difficulty' : data['difficulty'] }) + def do_import(self, dbi, force): + log.debug("DBInterface.do_import called. force: %s, queue size: %s", 'yes' if force == True else 'no', self.q.qsize()) + + # Flush the whole queue on force + forcesize = 0 + if force == True: + forcesize = self.q.qsize() + + # Only run if we have data + while self.q.empty() == False and (force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN or time.time() >= self.next_force_import_time or forcesize > 0): +from twisted.internet import reactor, defer +import time +from datetime import datetime +import Queue +import signal +import Cache +from sets import Set +import notify_email +import lib.settings as settings + +import lib.logger +log = lib.logger.get_logger('DBInterface') + +class DBInterface(): + def __init__(self): + self.dbi = self.connectDB() + + def init_main(self): + self.dbi.check_tables() + self.q = Queue.Queue() + self.queueclock = None + + self.cache = Cache.Cache() + self.email = notify_email.NOTIFY_EMAIL() + self.nextStatsUpdate = 0 + + self.scheduleImport() + + self.next_force_import_time = time.time() + settings.DB_LOADER_FORCE_TIME + self.next_force_notify_time = time.time() + settings.NOTIFY_DEADMINER_INTERVAL + signal.signal(signal.SIGINT, self.signal_handler) + + def signal_handler(self, signal, frame): + log.warning("SIGINT Detected, shutting down") + self.do_import(self.dbi, True) + reactor.stop() + + def set_bitcoinrpc(self, bitcoinrpc): + self.bitcoinrpc = bitcoinrpc + + def connectDB(self): + if settings.DATABASE_DRIVER == "sqlite": + log.debug('DB_Sqlite INIT') + import DB_Sqlite + return DB_Sqlite.DB_Sqlite() + elif settings.DATABASE_DRIVER == "mysql": + if settings.VARIABLE_DIFF: + log.debug("DB_Mysql_Vardiff INIT") + import DB_Mysql_Vardiff + return DB_Mysql_Vardiff.DB_Mysql_Vardiff() + else: + log.debug('DB_Mysql INIT') + import DB_Mysql + return DB_Mysql.DB_Mysql() + elif settings.DATABASE_DRIVER == "postgresql": + log.debug('DB_Postgresql INIT') + import DB_Postgresql + return DB_Postgresql.DB_Postgresql() + elif settings.DATABASE_DRIVER == "none": + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + else: + log.error('Invalid DATABASE_DRIVER -- using NONE') + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + + def scheduleImport(self): + # This schedule's the Import + if settings.DATABASE_DRIVER == "sqlite": + use_thread = False + else: + use_thread = True + + if use_thread: + self.queueclock = reactor.callLater(settings.DB_LOADER_CHECKTIME , self.run_import_thread) + else: + self.queueclock = reactor.callLater(settings.DB_LOADER_CHECKTIME , self.run_import) + + def run_import_thread(self): + log.debug("run_import_thread current size: %d", self.q.qsize()) + + if self.q.qsize() >= settings.DB_LOADER_REC_MIN or time.time() >= self.next_force_import_time: # Don't incur thread overhead if we're not going to run + reactor.callInThread(self.import_thread) + + self.scheduleImport() + + def run_import(self): + log.debug("DBInterface.run_import called") + + self.do_import(self.dbi, False) + + self.scheduleImport() + + def import_thread(self): + # Here we are in the thread. + dbi = self.connectDB() + self.do_import(dbi, False) + + dbi.close() + + def _update_pool_info(self, data): + self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], + 'connections' : data['connections'], 'difficulty' : data['difficulty'] }) + def do_import(self, dbi, force): log.debug("DBInterface.do_import called. force: %s, queue size: %s", 'yes' if force == True else 'no', self.q.qsize()) @@ -220,3 +341,225 @@ class DBInterface(): def clear_worker_diff(self): return self.dbi.clear_worker_diff() +from twisted.internet import reactor, defer +import time +from datetime import datetime +import Queue +import signal +import Cache +from sets import Set +import notify_email +import lib.settings as settings + +import lib.logger +log = lib.logger.get_logger('DBInterface') + +class DBInterface(): + def __init__(self): + self.dbi = self.connectDB() + + def init_main(self): + self.dbi.check_tables() + self.q = Queue.Queue() + self.queueclock = None + + self.cache = Cache.Cache() + self.email = notify_email.NOTIFY_EMAIL() + self.nextStatsUpdate = 0 + + self.scheduleImport() + + self.next_force_import_time = time.time() + settings.DB_LOADER_FORCE_TIME + self.next_force_notify_time = time.time() + settings.NOTIFY_DEADMINER_INTERVAL + signal.signal(signal.SIGINT, self.signal_handler) + + def signal_handler(self, signal, frame): + log.warning("SIGINT Detected, shutting down") + self.do_import(self.dbi, True) + reactor.stop() + + def set_bitcoinrpc(self, bitcoinrpc): + self.bitcoinrpc = bitcoinrpc + + def connectDB(self): + if settings.DATABASE_DRIVER == "sqlite": + log.debug('DB_Sqlite INIT') + import DB_Sqlite + return DB_Sqlite.DB_Sqlite() + elif settings.DATABASE_DRIVER == "mysql": + if settings.VARIABLE_DIFF: + log.debug("DB_Mysql_Vardiff INIT") + import DB_Mysql_Vardiff + return DB_Mysql_Vardiff.DB_Mysql_Vardiff() + else: + log.debug('DB_Mysql INIT') + import DB_Mysql + return DB_Mysql.DB_Mysql() + elif settings.DATABASE_DRIVER == "postgresql": + log.debug('DB_Postgresql INIT') + import DB_Postgresql + return DB_Postgresql.DB_Postgresql() + elif settings.DATABASE_DRIVER == "none": + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + else: + log.error('Invalid DATABASE_DRIVER -- using NONE') + log.debug('DB_None INIT') + import DB_None + return DB_None.DB_None() + + def scheduleImport(self): + # This schedule's the Import + if settings.DATABASE_DRIVER == "sqlite": + use_thread = False + else: + use_thread = True + + if use_thread: + self.queueclock = reactor.callLater(settings.DB_LOADER_CHECKTIME , self.run_import_thread) + else: + self.queueclock = reactor.callLater(settings.DB_LOADER_CHECKTIME , self.run_import) + + def run_import_thread(self): + log.debug("run_import_thread current size: %d", self.q.qsize()) + + if self.q.qsize() >= settings.DB_LOADER_REC_MIN or time.time() >= self.next_force_import_time: # Don't incur thread overhead if we're not going to run + reactor.callInThread(self.import_thread) + + self.scheduleImport() + + def run_import(self): + log.debug("DBInterface.run_import called") + + self.do_import(self.dbi, False) + + self.scheduleImport() + + def import_thread(self): + # Here we are in the thread. + dbi = self.connectDB() + self.do_import(dbi, False) + + dbi.close() + + def _update_pool_info(self, data): + self.dbi.update_pool_info({ 'blocks' : data['blocks'], 'balance' : data['balance'], + 'connections' : data['connections'], 'difficulty' : data['difficulty'] }) + + def do_import(self, dbi, force): + log.debug("DBInterface.do_import called. force: %s, queue size: %s", 'yes' if force == True else 'no', self.q.qsize()) + + # Flush the whole queue on force + forcesize = 0 + if force == True: + forcesize = self.q.qsize() + + # Only run if we have data + while self.q.empty() == False and (force == True or self.q.qsize() >= settings.DB_LOADER_REC_MIN or time.time() >= self.next_force_import_time or forcesize > 0): + self.next_force_import_time = time.time() + settings.DB_LOADER_FORCE_TIME + + force = False + # Put together the data we want to import + sqldata = [] + datacnt = 0 + + while self.q.empty() == False and datacnt < settings.DB_LOADER_REC_MAX: + datacnt += 1 + data = self.q.get() + sqldata.append(data) + self.q.task_done() + + forcesize -= datacnt + + # try to do the import, if we fail, log the error and put the data back in the queue + try: + log.info("Inserting %s Share Records", datacnt) + dbi.import_shares(sqldata) + except Exception as e: + log.error("Insert Share Records Failed: %s", e.args[0]) + for k, v in enumerate(sqldata): + self.q.put(v) + break # Allows us to sleep a little + + def queue_share(self, data): + self.q.put(data) + + def found_block(self, data): + try: + log.info("Updating Found Block Share Record") + self.do_import(self.dbi, True) # We can't Update if the record is not there. + self.dbi.found_block(data) + except Exception as e: + log.error("Update Found Block Share Record Failed: %s", e.args[0]) + + def check_password(self, username, password): + if username == "": + log.info("Rejected worker for blank username") + return False + allowed_chars = Set('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-.') + if Set(username).issubset(allowed_chars) != True: + log.info("Username contains bad arguments") + return False + if username.count('.') > 1: + log.info("Username contains multiple . ") + return False + + # Force username and password to be strings + username = str(username) + password = str(password) + if not settings.USERS_CHECK_PASSWORD and self.user_exists(username): + return True + elif self.cache.get(username) == password: + return True + elif self.dbi.check_password(username, password): + self.cache.set(username, password) + return True + elif settings.USERS_AUTOADD == True: + if self.dbi.get_uid(username) != False: + uid = self.dbi.get_uid(username) + self.dbi.insert_worker(uid, username, password) + self.cache.set(username, password) + return True + + log.info("Authentication for %s failed" % username) + return False + + def list_users(self): + return self.dbi.list_users() + + def get_user(self, id): + return self.dbi.get_user(id) + + def user_exists(self, username): + if self.cache.get(username) is not None: + return True + user = self.dbi.get_user(username) + return user is not None + + def insert_user(self, username, password): + return self.dbi.insert_user(username, password) + + def delete_user(self, username): + self.mc.delete(username) + self.usercache = {} + return self.dbi.delete_user(username) + + def update_user(self, username, password): + self.mc.delete(username) + self.mc.set(username, password) + return self.dbi.update_user(username, password) + + def update_worker_diff(self, username, diff): + return self.dbi.update_worker_diff(username, diff) + + def get_pool_stats(self): + return self.dbi.get_pool_stats() + + def get_workers_stats(self): + return self.dbi.get_workers_stats() + + def clear_worker_diff(self): + return self.dbi.clear_worker_diff() + + diff --git a/mining/interfaces.py b/mining/interfaces.py index eb4513f..af17176 100644 --- a/mining/interfaces.py +++ b/mining/interfaces.py @@ -80,7 +80,11 @@ class ShareManagerInterface(object): dbi.queue_share([worker_name, block_header, block_hash, difficulty, timestamp, is_valid, ip, self.block_height, self.prev_hash, invalid_reason, share_diff ]) - def on_submit_block(self, is_accepted, worker_name, block_header, block_hash, timestamp, ip, share_diff): + def on_submit_block(self, on_submit, worker_name, block_header, block_hash, timestamp, ip, share_diff): + (is_accepted, valid_hash) = on_submit + if (settings.SOLUTION_BLOCK_HASH): + block_hash = valid_hash + log.info("Block %s %s" % (block_hash, 'ACCEPTED' if is_accepted else 'REJECTED')) dbi.found_block([worker_name, block_header, block_hash, -1, timestamp, is_accepted, ip, self.block_height, self.prev_hash, share_diff ]) diff --git a/mining/notify_email.py b/mining/notify_email.py new file mode 100644 index 0000000..32bffe3 --- /dev/null +++ b/mining/notify_email.py @@ -0,0 +1,55 @@ +import os +import smtplib +import time +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from stratum import settings + +import stratum.logger +log = stratum.logger.get_logger('Notify_Email') + +class NOTIFY_EMAIL(): + + def notify_start(self): + subject = (' Stratum ALERT: Stratum started!') + text = ('Stratum service has started!') + message = MIMEText(text, 'plain') + self.send_email(settings.NOTIFY_ADMIN,subject,message) + + def notify_found_block(self,worker_name): + subject = (' Stratum ALERT: Found Block by ' % worker_name) + text = ('%s on Stratum server found a block!' % worker_name) + message = MIMEText(text, 'plain') + self.send_email(settings.NOTIFY_ADMIN,subject,message) + + def notify_dead_coindaemon(self,worker_name): + subject = (' Stratum ALERT: Stratum down!') + text = ('Stratum is down!') + message = MIMEText(text, 'plain') + self.send_email(settings.NOTIFY_ADMIN,subject,message) + + def notify_dead_miner(self,username,email): + log.info("Attempting to send email to: %s" % username) + subject = (' Stratum ALERT: ' + username + ' not authenticating properly!') + text = (' Youre Miner is not authorising With Stratum correctly. please recheck youre worker details and retry') + message = MIMEText(text, 'html') + self.send_email(email,subject,message) + log.info("Sent to %s" % email) + + def send_email(self,to,subject,message): + log.info("Send attempt to %s" % to) + + msg = MIMEMultipart('alternative') + msg['Subject'] = subject + msg['From'] = settings.NOTIFY_EMAIL_FROM + msg['To'] = to + msg.attach(message) + try: + s = smtplib.SMTP(settings.NOTIFY_EMAIL_SERVER,settings.NOTIFY_EMAIL_SERVER_PORT) + s.login(settings.NOTIFY_EMAIL_USERNAME,settings.NOTIFY_EMAIL_PASSWORD) + s.sendmail(msg['From'], msg['To'], msg.as_string()) + s.quit() + except smtplib.SMTPAuthenticationError as e: + log.error('Error sending Email: %s' % e[1]) + except Exception as e: + log.error('Error sending Email: %s' % e[0])