stratum-mining/mining/DB_Mysql_Extended.py
2014-01-09 11:09:50 +00:00

427 lines
12 KiB
Python

import time
import hashlib
import lib.settings as settings
import lib.logger
log = lib.logger.get_logger('DB_Mysql')
import MySQLdb
import DB_Mysql
class DB_Mysql_Extended(DB_Mysql.DB_Mysql):
def __init__(self):
DB_Mysql.DB_Mysql.__init__(self)
def updateStats(self, averageOverTime):
log.debug("Updating Stats")
# Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited.
self.execute(
"""
UPDATE `pool_worker`
SET `speed` = 0,
`alive` = 0
"""
);
stime = '%.0f' % (time.time() - averageOverTime);
self.execute(
"""
UPDATE `pool_worker` pw
LEFT JOIN (
SELECT `worker`, ROUND(ROUND(SUM(`difficulty`)) * 4294967296) / %(average)s AS 'speed'
FROM `shares`
WHERE `time` > FROM_UNIXTIME(%(time)s)
GROUP BY `worker`
) AS leJoin
ON leJoin.`worker` = pw.`id`
SET pw.`alive` = 1,
pw.`speed` = leJoin.`speed`
WHERE pw.`id` = leJoin.`worker`
""",
{
"time": stime,
"average": int(averageOverTime) * 1000000
}
)
self.execute(
"""
UPDATE `pool`
SET `value` = (
SELECT IFNULL(SUM(`speed`), 0)
FROM `pool_worker`
WHERE `alive` = 1
)
WHERE `parameter` = 'pool_speed'
"""
)
self.dbh.commit()
def archive_check(self):
# Check for found shares to archive
self.execute(
"""
SELECT `time`
FROM `shares`
WHERE `upstream_result` = 1
ORDER BY `time`
LIMIT 1
"""
)
data = self.dbc.fetchone()
if data is None or (data[0] + getattr(settings, 'ARCHIVE_DELAY')) > time.time():
return False
return data[0]
def archive_found(self, found_time):
self.execute(
"""
INSERT INTO `shares_archive_found`
SELECT s.`id`, s.`time`, s.`rem_host`, pw.`id`, s.`our_result`,
s.`upstream_result`, s.`reason`, s.`solution`, s.`block_num`,
s.`prev_block_hash`, s.`useragent`, s.`difficulty`
FROM `shares` s
LEFT JOIN `pool_worker` pw
ON s.`worker` = pw.`id`
WHERE `upstream_result` = 1
AND `time` <= FROM_UNIXTIME(%(time)s)
""",
{
"time": found_time
}
)
self.dbh.commit()
def archive_to_db(self, found_time):
self.execute(
"""
INSERT INTO `shares_archive`
SELECT s.`id`, s.`time`, s.`rem_host`, pw.`id`, s.`our_result`,
s.`upstream_result`, s.`reason`, s.`solution`, s.`block_num`,
s.`prev_block_hash`, s.`useragent`, s.`difficulty`
FROM `shares` s
LEFT JOIN `pool_worker` pw
ON s.`worker` = pw.`id`
WHERE `time` <= FROM_UNIXTIME(%(time)s)
""",
{
"time": found_time
}
)
self.dbh.commit()
def archive_cleanup(self, found_time):
self.execute(
"""
DELETE FROM `shares`
WHERE `time` <= FROM_UNIXTIME(%(time)s)
""",
{
"time": found_time
}
)
self.dbh.commit()
def archive_get_shares(self, found_time):
self.execute(
"""
SELECT *
FROM `shares`
WHERE `time` <= FROM_UNIXTIME(%(time)s)
""",
{
"time": found_time
}
)
return self.dbc
def import_shares(self, data):
# Data layout
# 0: worker_name,
# 1: block_header,
# 2: block_hash,
# 3: difficulty,
# 4: timestamp,
# 5: is_valid,
# 6: ip,
# 7: self.block_height,
# 8: self.prev_hash,
# 9: invalid_reason,
# 10: share_diff
log.debug("Importing Shares")
checkin_times = {}
total_shares = 0
best_diff = 0
for k, v in enumerate(data):
total_shares += v[3]
if v[0] in checkin_times:
if v[4] > checkin_times[v[0]]:
checkin_times[v[0]]["time"] = v[4]
else:
checkin_times[v[0]] = {
"time": v[4],
"shares": 0,
"rejects": 0
}
if v[5] == True:
checkin_times[v[0]]["shares"] += v[3]
else:
checkin_times[v[0]]["rejects"] += v[3]
if v[10] > best_diff:
best_diff = v[10]
self.execute(
"""
INSERT INTO `shares`
(time, rem_host, worker, our_result, upstream_result,
reason, solution, block_num, prev_block_hash,
useragent, difficulty)
VALUES
(FROM_UNIXTIME(%(time)s), %(host)s,
(SELECT `id` FROM `pool_worker` WHERE `username` = %(uname)s),
%(lres)s, 0, %(reason)s, %(solution)s,
%(blocknum)s, %(hash)s, '', %(difficulty)s)
""",
{
"time": v[4],
"host": v[6],
"uname": v[0],
"lres": v[5],
"reason": v[9],
"solution": v[2],
"blocknum": v[7],
"hash": v[8],
"difficulty": v[3]
}
)
self.execute(
"""
SELECT `parameter`, `value`
FROM `pool`
WHERE `parameter` = 'round_best_share'
OR `parameter` = 'round_shares'
OR `parameter` = 'bitcoin_difficulty'
OR `parameter` = 'round_progress'
"""
)
current_parameters = {}
for data in self.dbc.fetchall():
current_parameters[data[0]] = data[1]
round_best_share = int(current_parameters['round_best_share'])
difficulty = float(current_parameters['bitcoin_difficulty'])
round_shares = int(current_parameters['round_shares']) + total_shares
updates = [
{
"param": "round_shares",
"value": round_shares
},
{
"param": "round_progress",
"value": 0 if difficulty == 0 else (round_shares / difficulty) * 100
}
]
if best_diff > round_best_share:
updates.append({
"param": "round_best_share",
"value": best_diff
})
self.executemany(
"""
UPDATE `pool`
SET `value` = %(value)s
WHERE `parameter` = %(param)s
""",
updates
)
for k, v in checkin_times.items():
self.execute(
"""
UPDATE `pool_worker`
SET `last_checkin` = FROM_UNIXTIME(%(time)s),
`total_shares` = `total_shares` + %(shares)s,
`total_rejects` = `total_rejects` + %(rejects)s
WHERE `username` = %(uname)s
""",
{
"time": v["time"],
"shares": v["shares"],
"rejects": v["rejects"],
"uname": k
}
)
self.dbh.commit()
def found_block(self, data):
# for database compatibility we are converting our_worker to Y/N format
#if data[5]:
# data[5] = 'Y'
#else:
# data[5] = 'N'
# Note: difficulty = -1 here
self.execute(
"""
UPDATE `shares`
SET `upstream_result` = %(result)s,
`solution` = %(solution)s
WHERE `time` = FROM_UNIXTIME(%(time)s)
AND `worker` = (SELECT id
FROM `pool_worker`
WHERE `username` = %(uname)s)
LIMIT 1
""",
{
"result": data[5],
"solution": data[2],
"time": data[4],
"uname": data[0]
}
)
if data[5] == True:
self.execute(
"""
UPDATE `pool_worker`
SET `total_found` = `total_found` + 1
WHERE `username` = %(uname)s
""",
{
"uname": data[0]
}
)
self.execute(
"""
SELECT `value`
FROM `pool`
WHERE `parameter` = 'pool_total_found'
"""
)
total_found = int(self.dbc.fetchone()[0]) + 1
self.executemany(
"""
UPDATE `pool`
SET `value` = %(value)s
WHERE `parameter` = %(param)s
""",
[
{
"param": "round_shares",
"value": "0"
},
{
"param": "round_progress",
"value": "0"
},
{
"param": "round_best_share",
"value": "0"
},
{
"param": "round_start",
"value": time.time()
},
{
"param": "pool_total_found",
"value": total_found
}
]
)
self.dbh.commit()
def update_pool_info(self, pi):
self.executemany(
"""
UPDATE `pool`
SET `value` = %(value)s
WHERE `parameter` = %(param)s
""",
[
{
"param": "bitcoin_blocks",
"value": pi['blocks']
},
{
"param": "bitcoin_balance",
"value": pi['balance']
},
{
"param": "bitcoin_connections",
"value": pi['connections']
},
{
"param": "bitcoin_difficulty",
"value": pi['difficulty']
},
{
"param": "bitcoin_infotime",
"value": time.time()
}
]
)
self.dbh.commit()
def get_pool_stats(self):
self.execute(
"""
SELECT * FROM `pool`
"""
)
ret = {}
for data in self.dbc.fetchall():
ret[data[0]] = data[1]
return ret
def get_workers_stats(self):
self.execute(
"""
SELECT `username`, `speed`, `last_checkin`, `total_shares`,
`total_rejects`, `total_found`, `alive`, `difficulty`
FROM `pool_worker`
WHERE `id` > 0
"""
)
ret = {}
for data in self.dbc.fetchall():
ret[data[0]] = {
"username": data[0],
"speed": int(data[1]),
"last_checkin": time.mktime(data[2].timetuple()),
"total_shares": int(data[3]),
"total_rejects": int(data[4]),
"total_found": int(data[5]),
"alive": True if data[6] is 1 else False,
"difficulty": int(data[7])
}
return ret