diff --git a/mining/DB_Mysql_Extended.py b/mining/DB_Mysql_Extended.py new file mode 100644 index 0000000..d6a9b06 --- /dev/null +++ b/mining/DB_Mysql_Extended.py @@ -0,0 +1,426 @@ +import time +import hashlib +import lib.settings as settings +import lib.logger +log = lib.logger.get_logger('DB_Mysql') + +import MySQLdb +import DB_Mysql + +class DB_Mysql_Extended(DB_Mysql.DB_Mysql): + def __init__(self): + DB_Mysql.DB_Mysql.__init__(self) + + def updateStats(self, averageOverTime): + log.debug("Updating Stats") + # Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited. + self.execute( + """ + UPDATE `pool_worker` + SET `speed` = 0, + `alive` = 0 + """ + ); + + stime = '%.0f' % (time.time() - averageOverTime); + + self.execute( + """ + UPDATE `pool_worker` pw + LEFT JOIN ( + SELECT `worker`, ROUND(ROUND(SUM(`difficulty`)) * 4294967296) / %(average)s AS 'speed' + FROM `shares` + WHERE `time` > FROM_UNIXTIME(%(time)s) + GROUP BY `worker` + ) AS leJoin + ON leJoin.`worker` = pw.`id` + SET pw.`alive` = 1, + pw.`speed` = leJoin.`speed` + WHERE pw.`id` = leJoin.`worker` + """, + { + "time": stime, + "average": int(averageOverTime) * 1000000 + } + ) + + self.execute( + """ + UPDATE `pool` + SET `value` = ( + SELECT IFNULL(SUM(`speed`), 0) + FROM `pool_worker` + WHERE `alive` = 1 + ) + WHERE `parameter` = 'pool_speed' + """ + ) + + self.dbh.commit() + + def archive_check(self): + # Check for found shares to archive + self.execute( + """ + SELECT `time` + FROM `shares` + WHERE `upstream_result` = 1 + ORDER BY `time` + LIMIT 1 + """ + ) + + data = self.dbc.fetchone() + + if data is None or (data[0] + getattr(settings, 'ARCHIVE_DELAY')) > time.time(): + return False + + return data[0] + + def archive_found(self, found_time): + self.execute( + """ + INSERT INTO `shares_archive_found` + SELECT s.`id`, s.`time`, s.`rem_host`, pw.`id`, s.`our_result`, + s.`upstream_result`, s.`reason`, s.`solution`, s.`block_num`, + s.`prev_block_hash`, s.`useragent`, s.`difficulty` + FROM `shares` s + LEFT JOIN `pool_worker` pw + ON s.`worker` = pw.`id` + WHERE `upstream_result` = 1 + AND `time` <= FROM_UNIXTIME(%(time)s) + """, + { + "time": found_time + } + ) + + self.dbh.commit() + + def archive_to_db(self, found_time): + self.execute( + """ + INSERT INTO `shares_archive` + SELECT s.`id`, s.`time`, s.`rem_host`, pw.`id`, s.`our_result`, + s.`upstream_result`, s.`reason`, s.`solution`, s.`block_num`, + s.`prev_block_hash`, s.`useragent`, s.`difficulty` + FROM `shares` s + LEFT JOIN `pool_worker` pw + ON s.`worker` = pw.`id` + WHERE `time` <= FROM_UNIXTIME(%(time)s) + """, + { + "time": found_time + } + ) + + self.dbh.commit() + + def archive_cleanup(self, found_time): + self.execute( + """ + DELETE FROM `shares` + WHERE `time` <= FROM_UNIXTIME(%(time)s) + """, + { + "time": found_time + } + ) + + self.dbh.commit() + + def archive_get_shares(self, found_time): + self.execute( + """ + SELECT * + FROM `shares` + WHERE `time` <= FROM_UNIXTIME(%(time)s) + """, + { + "time": found_time + } + ) + + return self.dbc + + def import_shares(self, data): + # Data layout + # 0: worker_name, + # 1: block_header, + # 2: block_hash, + # 3: difficulty, + # 4: timestamp, + # 5: is_valid, + # 6: ip, + # 7: self.block_height, + # 8: self.prev_hash, + # 9: invalid_reason, + # 10: share_diff + + log.debug("Importing Shares") + checkin_times = {} + total_shares = 0 + best_diff = 0 + + for k, v in enumerate(data): + total_shares += v[3] + + if v[0] in checkin_times: + if v[4] > checkin_times[v[0]]: + checkin_times[v[0]]["time"] = v[4] + else: + checkin_times[v[0]] = { + "time": v[4], + "shares": 0, + "rejects": 0 + } + + if v[5] == True: + checkin_times[v[0]]["shares"] += v[3] + else: + checkin_times[v[0]]["rejects"] += v[3] + + if v[10] > best_diff: + best_diff = v[10] + + self.execute( + """ + INSERT INTO `shares` + (time, rem_host, worker, our_result, upstream_result, + reason, solution, block_num, prev_block_hash, + useragent, difficulty) + VALUES + (FROM_UNIXTIME(%(time)s), %(host)s, + (SELECT `id` FROM `pool_worker` WHERE `username` = %(uname)s), + %(lres)s, 0, %(reason)s, %(solution)s, + %(blocknum)s, %(hash)s, '', %(difficulty)s) + """, + { + "time": v[4], + "host": v[6], + "uname": v[0], + "lres": v[5], + "reason": v[9], + "solution": v[2], + "blocknum": v[7], + "hash": v[8], + "difficulty": v[3] + } + ) + + self.execute( + """ + SELECT `parameter`, `value` + FROM `pool` + WHERE `parameter` = 'round_best_share' + OR `parameter` = 'round_shares' + OR `parameter` = 'bitcoin_difficulty' + OR `parameter` = 'round_progress' + """ + ) + + current_parameters = {} + + for data in self.dbc.fetchall(): + current_parameters[data[0]] = data[1] + + round_best_share = int(current_parameters['round_best_share']) + difficulty = float(current_parameters['bitcoin_difficulty']) + round_shares = int(current_parameters['round_shares']) + total_shares + + updates = [ + { + "param": "round_shares", + "value": round_shares + }, + { + "param": "round_progress", + "value": 0 if difficulty == 0 else (round_shares / difficulty) * 100 + } + ] + + if best_diff > round_best_share: + updates.append({ + "param": "round_best_share", + "value": best_diff + }) + + self.executemany( + """ + UPDATE `pool` + SET `value` = %(value)s + WHERE `parameter` = %(param)s + """, + updates + ) + + for k, v in checkin_times.items(): + self.execute( + """ + UPDATE `pool_worker` + SET `last_checkin` = FROM_UNIXTIME(%(time)s), + `total_shares` = `total_shares` + %(shares)s, + `total_rejects` = `total_rejects` + %(rejects)s + WHERE `username` = %(uname)s + """, + { + "time": v["time"], + "shares": v["shares"], + "rejects": v["rejects"], + "uname": k + } + ) + + self.dbh.commit() + + + def found_block(self, data): + # for database compatibility we are converting our_worker to Y/N format + #if data[5]: + # data[5] = 'Y' + #else: + # data[5] = 'N' + # Note: difficulty = -1 here + self.execute( + """ + UPDATE `shares` + SET `upstream_result` = %(result)s, + `solution` = %(solution)s + WHERE `time` = FROM_UNIXTIME(%(time)s) + AND `worker` = (SELECT id + FROM `pool_worker` + WHERE `username` = %(uname)s) + LIMIT 1 + """, + { + "result": data[5], + "solution": data[2], + "time": data[4], + "uname": data[0] + } + ) + + if data[5] == True: + self.execute( + """ + UPDATE `pool_worker` + SET `total_found` = `total_found` + 1 + WHERE `username` = %(uname)s + """, + { + "uname": data[0] + } + ) + self.execute( + """ + SELECT `value` + FROM `pool` + WHERE `parameter` = 'pool_total_found' + """ + ) + total_found = int(self.dbc.fetchone()[0]) + 1 + + self.executemany( + """ + UPDATE `pool` + SET `value` = %(value)s + WHERE `parameter` = %(param)s + """, + [ + { + "param": "round_shares", + "value": "0" + }, + { + "param": "round_progress", + "value": "0" + }, + { + "param": "round_best_share", + "value": "0" + }, + { + "param": "round_start", + "value": time.time() + }, + { + "param": "pool_total_found", + "value": total_found + } + ] + ) + + self.dbh.commit() + + def update_pool_info(self, pi): + self.executemany( + """ + UPDATE `pool` + SET `value` = %(value)s + WHERE `parameter` = %(param)s + """, + [ + { + "param": "bitcoin_blocks", + "value": pi['blocks'] + }, + { + "param": "bitcoin_balance", + "value": pi['balance'] + }, + { + "param": "bitcoin_connections", + "value": pi['connections'] + }, + { + "param": "bitcoin_difficulty", + "value": pi['difficulty'] + }, + { + "param": "bitcoin_infotime", + "value": time.time() + } + ] + ) + + self.dbh.commit() + + def get_pool_stats(self): + self.execute( + """ + SELECT * FROM `pool` + """ + ) + + ret = {} + + for data in self.dbc.fetchall(): + ret[data[0]] = data[1] + + return ret + + def get_workers_stats(self): + self.execute( + """ + SELECT `username`, `speed`, `last_checkin`, `total_shares`, + `total_rejects`, `total_found`, `alive`, `difficulty` + FROM `pool_worker` + WHERE `id` > 0 + """ + ) + + ret = {} + + for data in self.dbc.fetchall(): + ret[data[0]] = { + "username": data[0], + "speed": int(data[1]), + "last_checkin": time.mktime(data[2].timetuple()), + "total_shares": int(data[3]), + "total_rejects": int(data[4]), + "total_found": int(data[5]), + "alive": True if data[6] is 1 else False, + "difficulty": int(data[7]) + } + + return ret