Added Support for Other DB Types
This commit is contained in:
parent
60edae5681
commit
ec9aa5be5c
@ -88,12 +88,24 @@ PASSWORD_SALT = 'some_crazy_string'
|
||||
|
||||
# ******************** Database *********************
|
||||
|
||||
DATABASE_DRIVER = 'sqlite' # Options: none, sqlite, postgresql or mysql
|
||||
DATABASE_EXTEND = True # False = pushpool db layout, True = pushpool + extra columns
|
||||
|
||||
# SQLite
|
||||
DB_SQLITE_FILE = 'pooldb.sqlite'
|
||||
# Postgresql
|
||||
DB_PGSQL_HOST = 'localhost'
|
||||
DB_PGSQL_DBNAME = 'pooldb'
|
||||
DB_PGSQL_USER = 'pooldb'
|
||||
DB_PGSQL_PASS = '**empty**'
|
||||
DB_PGSQL_SCHEMA = 'public'
|
||||
# MySQL
|
||||
DB_MYSQL_HOST = 'localhost'
|
||||
DB_MYSQL_DBNAME = 'pooldb'
|
||||
DB_MYSQL_USER = 'pooldb'
|
||||
DB_MYSQL_PASS = '**empty**'
|
||||
|
||||
|
||||
# ******************** Adv. DB Settings *********************
|
||||
# Don't change these unless you know what you are doing
|
||||
|
||||
|
||||
@ -43,10 +43,30 @@ class DBInterface():
|
||||
log.debug("DB_Mysql_Vardiff INIT")
|
||||
import DB_Mysql_Vardiff
|
||||
return DB_Mysql_Vardiff.DB_Mysql_Vardiff()
|
||||
else:
|
||||
elif settings.DATABASE_DRIVER == "sqlite":
|
||||
log.debug('DB_Sqlite INIT')
|
||||
import DB_Sqlite
|
||||
return DB_Sqlite.DB_Sqlite()
|
||||
elif settings.DATABASE_DRIVER == "mysql":
|
||||
log.debug('DB_Mysql INIT')
|
||||
import DB_Mysql
|
||||
return DB_Mysql.DB_Mysql()
|
||||
elif settings.DATABASE_DRIVER == "postgresql":
|
||||
log.debug('DB_Postgresql INIT')
|
||||
import DB_Postgresql
|
||||
return DB_Postgresql.DB_Postgresql()
|
||||
elif settings.DATABASE_DRIVER == "none":
|
||||
log.debug('DB_None INIT')
|
||||
import DB_None
|
||||
return DB_None.DB_None()
|
||||
else:
|
||||
log.error('Invalid DATABASE_DRIVER -- using NONE')
|
||||
log.debug('DB_None INIT')
|
||||
import DB_None
|
||||
return DB_None.DB_None()
|
||||
log.debug('DB_Mysql INIT')
|
||||
import DB_Mysql
|
||||
return DB_Mysql.DB_Mysql()
|
||||
|
||||
def clearusercache(self):
|
||||
log.debug("DBInterface.clearusercache called")
|
||||
@ -55,7 +75,9 @@ class DBInterface():
|
||||
|
||||
def scheduleImport(self):
|
||||
# This schedule's the Import
|
||||
use_thread = True
|
||||
if settings.DATABASE_DRIVER == "sqlite":
|
||||
use_thread = False
|
||||
else: use_thread = True
|
||||
|
||||
if use_thread:
|
||||
self.queueclock = reactor.callLater(settings.DB_LOADER_CHECKTIME , self.run_import_thread)
|
||||
|
||||
54
mining/DB_None.py
Normal file
54
mining/DB_None.py
Normal file
@ -0,0 +1,54 @@
|
||||
import stratum.logger
|
||||
log = stratum.logger.get_logger('None')
|
||||
|
||||
class DB_None():
|
||||
def __init__(self):
|
||||
log.debug("Connecting to DB")
|
||||
|
||||
def updateStats(self,averageOverTime):
|
||||
log.debug("Updating Stats")
|
||||
|
||||
def import_shares(self,data):
|
||||
log.debug("Importing Shares")
|
||||
|
||||
def found_block(self,data):
|
||||
log.debug("Found Block")
|
||||
|
||||
def get_user(self, id_or_username):
|
||||
log.debug("Get User")
|
||||
|
||||
def list_users(self):
|
||||
log.debug("List Users")
|
||||
|
||||
def delete_user(self,username):
|
||||
log.debug("Deleting Username")
|
||||
|
||||
def insert_user(self,username,password):
|
||||
log.debug("Adding Username/Password")
|
||||
|
||||
def update_user(self,username,password):
|
||||
log.debug("Updating Username/Password")
|
||||
|
||||
def check_password(self,username,password):
|
||||
log.debug("Checking Username/Password")
|
||||
return True
|
||||
|
||||
def update_pool_info(self,pi):
|
||||
log.debug("Update Pool Info")
|
||||
|
||||
def get_pool_stats(self):
|
||||
log.debug("Get Pool Stats")
|
||||
ret = {}
|
||||
return ret
|
||||
|
||||
def get_workers_stats(self):
|
||||
log.debug("Get Workers Stats")
|
||||
ret = {}
|
||||
return ret
|
||||
|
||||
def check_tables(self):
|
||||
log.debug("Checking Tables")
|
||||
|
||||
def close(self):
|
||||
log.debug("Close Connection")
|
||||
|
||||
394
mining/DB_Postgresql.py
Normal file
394
mining/DB_Postgresql.py
Normal file
@ -0,0 +1,394 @@
|
||||
import time
|
||||
import hashlib
|
||||
from stratum import settings
|
||||
import stratum.logger
|
||||
log = stratum.logger.get_logger('DB_Postgresql')
|
||||
|
||||
import psycopg2
|
||||
from psycopg2 import extras
|
||||
|
||||
class DB_Postgresql():
|
||||
def __init__(self):
|
||||
log.debug("Connecting to DB")
|
||||
self.dbh = psycopg2.connect("host='"+settings.DB_PGSQL_HOST+"' dbname='"+settings.DB_PGSQL_DBNAME+"' user='"+settings.DB_PGSQL_USER+\
|
||||
"' password='"+settings.DB_PGSQL_PASS+"'")
|
||||
# TODO -- set the schema
|
||||
self.dbc = self.dbh.cursor()
|
||||
|
||||
if hasattr(settings, 'PASSWORD_SALT'):
|
||||
self.salt = settings.PASSWORD_SALT
|
||||
else:
|
||||
raise ValueError("PASSWORD_SALT isn't set, please set in config.py")
|
||||
|
||||
def updateStats(self,averageOverTime):
|
||||
log.debug("Updating Stats")
|
||||
# Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited.
|
||||
self.dbc.execute("update pool_worker set speed = 0, alive = 'f'");
|
||||
stime = '%.2f' % ( time.time() - averageOverTime );
|
||||
self.dbc.execute("select username,SUM(difficulty) from shares where time > to_timestamp(%s) group by username", [stime])
|
||||
total_speed = 0
|
||||
for name,shares in self.dbc.fetchall():
|
||||
speed = int(int(shares) * pow(2,32)) / ( int(averageOverTime) * 1000 * 1000)
|
||||
total_speed += speed
|
||||
self.dbc.execute("update pool_worker set speed = %s, alive = 't' where username = %s", (speed,name))
|
||||
self.dbc.execute("update pool set value = %s where parameter = 'pool_speed'",[total_speed])
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_check(self):
|
||||
# Check for found shares to archive
|
||||
self.dbc.execute("select time from shares where upstream_result = true order by time limit 1")
|
||||
data = self.dbc.fetchone()
|
||||
if data is None or (data[0] + settings.ARCHIVE_DELAY) > time.time() :
|
||||
return False
|
||||
return data[0]
|
||||
|
||||
def archive_found(self,found_time):
|
||||
self.dbc.execute("insert into shares_archive_found select * from shares where upstream_result = true and time <= to_timestamp(%s)", [found_time])
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_to_db(self,found_time):
|
||||
self.dbc.execute("insert into shares_archive select * from shares where time <= to_timestamp(%s)",[found_time])
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_cleanup(self,found_time):
|
||||
self.dbc.execute("delete from shares where time <= to_timestamp(%s)",[found_time])
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_get_shares(self,found_time):
|
||||
self.dbc.execute("select * from shares where time <= to_timestamp(%s)",[found_time])
|
||||
return self.dbc
|
||||
|
||||
def import_shares(self,data):
|
||||
log.debug("Importing Shares")
|
||||
# 0 1 2 3 4 5 6 7 8 9 10
|
||||
# data: [worker_name,block_header,block_hash,difficulty,timestamp,is_valid,ip,block_height,prev_hash,invalid_reason,best_diff]
|
||||
checkin_times = {}
|
||||
total_shares = 0
|
||||
best_diff = 0
|
||||
for k,v in enumerate(data):
|
||||
if settings.DATABASE_EXTEND :
|
||||
total_shares += v[3]
|
||||
if v[0] in checkin_times:
|
||||
if v[4] > checkin_times[v[0]] :
|
||||
checkin_times[v[0]]["time"] = v[4]
|
||||
else:
|
||||
checkin_times[v[0]] = {"time": v[4], "shares": 0, "rejects": 0 }
|
||||
|
||||
if v[5] == True :
|
||||
checkin_times[v[0]]["shares"] += v[3]
|
||||
else :
|
||||
checkin_times[v[0]]["rejects"] += v[3]
|
||||
|
||||
if v[10] > best_diff:
|
||||
best_diff = v[10]
|
||||
|
||||
self.dbc.execute("insert into shares " +\
|
||||
"(time,rem_host,username,our_result,upstream_result,reason,solution,block_num,prev_block_hash,useragent,difficulty) " +\
|
||||
"VALUES (to_timestamp(%s),%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
|
||||
(v[4],v[6],v[0],bool(v[5]),False,v[9],'',v[7],v[8],'',v[3]) )
|
||||
else :
|
||||
self.dbc.execute("insert into shares (time,rem_host,username,our_result,upstream_result,reason,solution) VALUES " +\
|
||||
"(to_timestamp(%s),%s,%s,%s,%s,%s,%s)",
|
||||
(v[4],v[6],v[0],bool(v[5]),False,v[9],'') )
|
||||
|
||||
if settings.DATABASE_EXTEND :
|
||||
self.dbc.execute("select value from pool where parameter = 'round_shares'")
|
||||
round_shares = int(self.dbc.fetchone()[0]) + total_shares
|
||||
self.dbc.execute("update pool set value = %s where parameter = 'round_shares'",[round_shares])
|
||||
|
||||
self.dbc.execute("select value from pool where parameter = 'round_best_share'")
|
||||
round_best_share = int(self.dbc.fetchone()[0])
|
||||
if best_diff > round_best_share:
|
||||
self.dbc.execute("update pool set value = %s where parameter = 'round_best_share'",[best_diff])
|
||||
|
||||
self.dbc.execute("select value from pool where parameter = 'bitcoin_difficulty'")
|
||||
difficulty = float(self.dbc.fetchone()[0])
|
||||
|
||||
if difficulty == 0:
|
||||
progress = 0
|
||||
else:
|
||||
progress = (round_shares/difficulty)*100
|
||||
self.dbc.execute("update pool set value = %s where parameter = 'round_progress'",[progress])
|
||||
|
||||
for k,v in checkin_times.items():
|
||||
self.dbc.execute("update pool_worker set last_checkin = to_timestamp(%s), total_shares = total_shares + %s, total_rejects = total_rejects + %s where username = %s",
|
||||
(v["time"],v["shares"],v["rejects"],k))
|
||||
|
||||
self.dbh.commit()
|
||||
|
||||
|
||||
def found_block(self,data):
|
||||
# Note: difficulty = -1 here
|
||||
self.dbc.execute("update shares set upstream_result = %s, solution = %s where id in (select id from shares where time = to_timestamp(%s) and username = %s limit 1)",
|
||||
(bool(data[5]),data[2],data[4],data[0]))
|
||||
if settings.DATABASE_EXTEND and data[5] == True :
|
||||
self.dbc.execute("update pool_worker set total_found = total_found + 1 where username = %s",(data[0],))
|
||||
self.dbc.execute("select value from pool where parameter = 'pool_total_found'")
|
||||
total_found = int(self.dbc.fetchone()[0]) + 1
|
||||
self.dbc.executemany("update pool set value = %s where parameter = %s",[(0,'round_shares'),
|
||||
(0,'round_progress'),
|
||||
(0,'round_best_share'),
|
||||
(time.time(),'round_start'),
|
||||
(total_found,'pool_total_found')
|
||||
])
|
||||
self.dbh.commit()
|
||||
|
||||
def get_user(self, id_or_username):
|
||||
log.debug("Finding user with id or username of %s", id_or_username)
|
||||
cursor = self.dbh.cursor(cursor_factory=extras.DictCursor)
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT *
|
||||
FROM pool_worker
|
||||
WHERE id = %(id)s
|
||||
OR username = %(uname)s
|
||||
""",
|
||||
{
|
||||
"id": id_or_username if id_or_username.isdigit() else -1,
|
||||
"uname": id_or_username
|
||||
}
|
||||
)
|
||||
|
||||
user = cursor.fetchone()
|
||||
cursor.close()
|
||||
return user
|
||||
|
||||
def list_users(self):
|
||||
cursor = self.dbh.cursor(cursor_factory=extras.DictCursor)
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT *
|
||||
FROM pool_worker
|
||||
WHERE id > 0
|
||||
"""
|
||||
)
|
||||
|
||||
while True:
|
||||
results = cursor.fetchmany()
|
||||
if not results:
|
||||
break
|
||||
|
||||
for result in results:
|
||||
yield result
|
||||
|
||||
def delete_user(self, id_or_username):
|
||||
log.debug("Deleting Username")
|
||||
self.dbc.execute(
|
||||
"""
|
||||
delete from pool_worker where id = %(id)s or username = %(uname)s
|
||||
""",
|
||||
{
|
||||
"id": id_or_username if id_or_username.isdigit() else -1,
|
||||
"uname": id_or_username
|
||||
}
|
||||
)
|
||||
self.dbh.commit()
|
||||
|
||||
def insert_user(self,username,password):
|
||||
log.debug("Adding Username/Password")
|
||||
m = hashlib.sha1()
|
||||
m.update(password)
|
||||
m.update(self.salt)
|
||||
self.dbc.execute("insert into pool_worker (username,password) VALUES (%s,%s)",
|
||||
(username, m.hexdigest() ))
|
||||
self.dbh.commit()
|
||||
|
||||
return str(username)
|
||||
|
||||
def update_user(self, id_or_username, password):
|
||||
log.debug("Updating Username/Password")
|
||||
m = hashlib.sha1()
|
||||
m.update(password)
|
||||
m.update(self.salt)
|
||||
self.dbc.execute(
|
||||
"""
|
||||
update pool_worker set password = %(pass)s where id = %(id)s or username = %(uname)s
|
||||
""",
|
||||
{
|
||||
"id": id_or_username if id_or_username.isdigit() else -1,
|
||||
"uname": id_or_username,
|
||||
"pass": m.hexdigest()
|
||||
}
|
||||
)
|
||||
self.dbh.commit()
|
||||
|
||||
def update_worker_diff(self,username,diff):
|
||||
self.dbc.execute("update pool_worker set difficulty = %s where username = %s",(diff,username))
|
||||
self.dbh.commit()
|
||||
|
||||
def clear_worker_diff(self):
|
||||
if settings.DATABASE_EXTEND == True :
|
||||
self.dbc.execute("update pool_worker set difficulty = 0")
|
||||
self.dbh.commit()
|
||||
|
||||
def check_password(self,username,password):
|
||||
log.debug("Checking Username/Password")
|
||||
m = hashlib.sha1()
|
||||
m.update(password)
|
||||
m.update(self.salt)
|
||||
self.dbc.execute("select COUNT(*) from pool_worker where username = %s and password = %s",
|
||||
(username, m.hexdigest() ))
|
||||
data = self.dbc.fetchone()
|
||||
if data[0] > 0 :
|
||||
return True
|
||||
return False
|
||||
|
||||
def update_pool_info(self,pi):
|
||||
self.dbc.executemany("update pool set value = %s where parameter = %s",[(pi['blocks'],"bitcoin_blocks"),
|
||||
(pi['balance'],"bitcoin_balance"),
|
||||
(pi['connections'],"bitcoin_connections"),
|
||||
(pi['difficulty'],"bitcoin_difficulty"),
|
||||
(time.time(),"bitcoin_infotime")
|
||||
])
|
||||
self.dbh.commit()
|
||||
|
||||
def get_pool_stats(self):
|
||||
self.dbc.execute("select * from pool")
|
||||
ret = {}
|
||||
for data in self.dbc.fetchall():
|
||||
ret[data[0]] = data[1]
|
||||
return ret
|
||||
|
||||
def get_workers_stats(self):
|
||||
self.dbc.execute("select username,speed,last_checkin,total_shares,total_rejects,total_found,alive,difficulty from pool_worker")
|
||||
ret = {}
|
||||
for data in self.dbc.fetchall():
|
||||
ret[data[0]] = { "username" : data[0],
|
||||
"speed" : data[1],
|
||||
"last_checkin" : time.mktime(data[2].timetuple()),
|
||||
"total_shares" : data[3],
|
||||
"total_rejects" : data[4],
|
||||
"total_found" : data[5],
|
||||
"alive" : data[6],
|
||||
"difficulty" : data[7] }
|
||||
return ret
|
||||
|
||||
def close(self):
|
||||
self.dbh.close()
|
||||
|
||||
def check_tables(self):
|
||||
log.debug("Checking Tables")
|
||||
|
||||
shares_exist = False
|
||||
self.dbc.execute("select COUNT(*) from pg_catalog.pg_tables where schemaname = %(schema)s and tablename = 'shares'",
|
||||
{"schema": settings.DB_PGSQL_SCHEMA })
|
||||
data = self.dbc.fetchone()
|
||||
if data[0] <= 0 :
|
||||
self.update_version_1()
|
||||
|
||||
if settings.DATABASE_EXTEND == True :
|
||||
self.update_tables()
|
||||
|
||||
|
||||
def update_tables(self):
|
||||
version = 0
|
||||
current_version = 7
|
||||
while version < current_version :
|
||||
self.dbc.execute("select value from pool where parameter = 'DB Version'")
|
||||
data = self.dbc.fetchone()
|
||||
version = int(data[0])
|
||||
if version < current_version :
|
||||
log.info("Updating Database from %i to %i" % (version, version +1))
|
||||
getattr(self, 'update_version_' + str(version) )()
|
||||
|
||||
def update_version_1(self):
|
||||
if settings.DATABASE_EXTEND == True :
|
||||
self.dbc.execute("create table shares" +\
|
||||
"(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("create index shares_username ON shares(username)")
|
||||
self.dbc.execute("create table pool_worker" +\
|
||||
"(id serial primary key,username TEXT, password TEXT, speed INTEGER, last_checkin timestamp)")
|
||||
self.dbc.execute("create index pool_worker_username ON pool_worker(username)")
|
||||
self.dbc.execute("alter table pool_worker add total_shares INTEGER default 0")
|
||||
self.dbc.execute("alter table pool_worker add total_rejects INTEGER default 0")
|
||||
self.dbc.execute("alter table pool_worker add total_found INTEGER default 0")
|
||||
self.dbc.execute("create table pool(parameter TEXT, value TEXT)")
|
||||
self.dbc.execute("insert into pool (parameter,value) VALUES ('DB Version',2)")
|
||||
else :
|
||||
self.dbc.execute("create table shares" + \
|
||||
"(id serial,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT)")
|
||||
self.dbc.execute("create index shares_username ON shares(username)")
|
||||
self.dbc.execute("create table pool_worker(id serial,username TEXT, password TEXT)")
|
||||
self.dbc.execute("create index pool_worker_username ON pool_worker(username)")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_2(self):
|
||||
log.info("running update 2")
|
||||
self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[('bitcoin_blocks',0),
|
||||
('bitcoin_balance',0),
|
||||
('bitcoin_connections',0),
|
||||
('bitcoin_difficulty',0),
|
||||
('pool_speed',0),
|
||||
('pool_total_found',0),
|
||||
('round_shares',0),
|
||||
('round_progress',0),
|
||||
('round_start',time.time())
|
||||
])
|
||||
self.dbc.execute("update pool set value = 3 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_3(self):
|
||||
log.info("running update 3")
|
||||
self.dbc.executemany("insert into pool (parameter,value) VALUES (%s,%s)",[
|
||||
('round_best_share',0),
|
||||
('bitcoin_infotime',0)
|
||||
])
|
||||
self.dbc.execute("alter table pool_worker add alive BOOLEAN")
|
||||
self.dbc.execute("update pool set value = 4 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_4(self):
|
||||
log.info("running update 4")
|
||||
self.dbc.execute("alter table pool_worker add difficulty INTEGER default 0")
|
||||
self.dbc.execute("create table shares_archive" +\
|
||||
"(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("create table shares_archive_found" +\
|
||||
"(id serial primary key,time timestamp,rem_host TEXT, username TEXT, our_result BOOLEAN, upstream_result BOOLEAN, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("update pool set value = 5 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_5(self):
|
||||
log.info("running update 5")
|
||||
# Adding Primary key to table: pool
|
||||
self.dbc.execute("alter table pool add primary key (parameter)")
|
||||
self.dbh.commit()
|
||||
# Adjusting indicies on table: shares
|
||||
self.dbc.execute("DROP INDEX shares_username")
|
||||
self.dbc.execute("CREATE INDEX shares_time_username ON shares(time,username)")
|
||||
self.dbc.execute("CREATE INDEX shares_upstreamresult ON shares(upstream_result)")
|
||||
self.dbh.commit()
|
||||
|
||||
self.dbc.execute("update pool set value = 6 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_6(self):
|
||||
log.info("running update 6")
|
||||
|
||||
try:
|
||||
self.dbc.execute("CREATE EXTENSION pgcrypto")
|
||||
except psycopg2.ProgrammingError:
|
||||
log.info("pgcrypto already added to database")
|
||||
except psycopg2.OperationalError:
|
||||
raise Exception("Could not add pgcrypto extension to database. Have you got it installed? Ubuntu is postgresql-contrib")
|
||||
self.dbh.commit()
|
||||
|
||||
# Optimising table layout
|
||||
self.dbc.execute("ALTER TABLE pool " +\
|
||||
"ALTER COLUMN parameter TYPE character varying(128), ALTER COLUMN value TYPE character varying(512);")
|
||||
self.dbh.commit()
|
||||
|
||||
self.dbc.execute("UPDATE pool_worker SET password = encode(digest(concat(password, %s), 'sha1'), 'hex') WHERE id > 0", [self.salt])
|
||||
self.dbh.commit()
|
||||
|
||||
self.dbc.execute("ALTER TABLE pool_worker " +\
|
||||
"ALTER COLUMN username TYPE character varying(512), ALTER COLUMN password TYPE character(40), " +\
|
||||
"ADD CONSTRAINT username UNIQUE (username)")
|
||||
self.dbh.commit()
|
||||
|
||||
self.dbc.execute("update pool set value = 7 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
299
mining/DB_Sqlite.py
Normal file
299
mining/DB_Sqlite.py
Normal file
@ -0,0 +1,299 @@
|
||||
import time
|
||||
from stratum import settings
|
||||
import stratum.logger
|
||||
log = stratum.logger.get_logger('DB_Sqlite')
|
||||
|
||||
import sqlite3
|
||||
|
||||
class DB_Sqlite():
|
||||
def __init__(self):
|
||||
log.debug("Connecting to DB")
|
||||
self.dbh = sqlite3.connect(settings.DB_SQLITE_FILE)
|
||||
self.dbc = self.dbh.cursor()
|
||||
|
||||
def updateStats(self,averageOverTime):
|
||||
log.debug("Updating Stats")
|
||||
# Note: we are using transactions... so we can set the speed = 0 and it doesn't take affect until we are commited.
|
||||
self.dbc.execute("update pool_worker set speed = 0, alive = 0");
|
||||
stime = '%.2f' % ( time.time() - averageOverTime );
|
||||
self.dbc.execute("select username,SUM(difficulty) from shares where time > :time group by username", {'time':stime})
|
||||
total_speed = 0
|
||||
sqldata = []
|
||||
for name,shares in self.dbc.fetchall():
|
||||
speed = int(int(shares) * pow(2,32)) / ( int(averageOverTime) * 1000 * 1000)
|
||||
total_speed += speed
|
||||
sqldata.append({'speed':speed,'user':name})
|
||||
self.dbc.executemany("update pool_worker set speed = :speed, alive = 1 where username = :user",sqldata)
|
||||
self.dbc.execute("update pool set value = :val where parameter = 'pool_speed'",{'val':total_speed})
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_check(self):
|
||||
# Check for found shares to archive
|
||||
self.dbc.execute("select time from shares where upstream_result = 1 order by time limit 1")
|
||||
data = self.dbc.fetchone()
|
||||
if data is None or (data[0] + settings.ARCHIVE_DELAY) > time.time() :
|
||||
return False
|
||||
return data[0]
|
||||
|
||||
def archive_found(self,found_time):
|
||||
self.dbc.execute("insert into shares_archive_found select * from shares where upstream_result = 1 and time <= :time",{'time':found_time})
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_to_db(self,found_time):
|
||||
self.dbc.execute("insert into shares_archive select * from shares where time <= :time",{'time':found_time})
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_cleanup(self,found_time):
|
||||
self.dbc.execute("delete from shares where time <= :time",{'time':found_time})
|
||||
self.dbc.execute("vacuum")
|
||||
self.dbh.commit()
|
||||
|
||||
def archive_get_shares(self,found_time):
|
||||
self.dbc.execute("select * from shares where time <= :time",{'time':found_time})
|
||||
return self.dbc
|
||||
|
||||
def import_shares(self,data):
|
||||
log.debug("Importing Shares")
|
||||
# 0 1 2 3 4 5 6 7 8 9 10
|
||||
# data: [worker_name,block_header,block_hash,difficulty,timestamp,is_valid,ip,block_height,prev_hash,invalid_reason,share_diff]
|
||||
checkin_times = {}
|
||||
total_shares = 0
|
||||
best_diff = 0
|
||||
sqldata = []
|
||||
for k,v in enumerate(data):
|
||||
if settings.DATABASE_EXTEND :
|
||||
total_shares += v[3]
|
||||
if v[0] in checkin_times:
|
||||
if v[4] > checkin_times[v[0]] :
|
||||
checkin_times[v[0]]["time"] = v[4]
|
||||
else:
|
||||
checkin_times[v[0]] = {"time": v[4], "shares": 0, "rejects": 0 }
|
||||
|
||||
if v[5] == True :
|
||||
checkin_times[v[0]]["shares"] += v[3]
|
||||
else :
|
||||
checkin_times[v[0]]["rejects"] += v[3]
|
||||
|
||||
if v[10] > best_diff:
|
||||
best_diff = v[10]
|
||||
|
||||
sqldata.append({'time':v[4],'rem_host':v[6],'username':v[0],'our_result':v[5],'upstream_result':0,'reason':v[9],'solution':'',
|
||||
'block_num':v[7],'prev_block_hash':v[8],'ua':'','diff':v[3]} )
|
||||
else :
|
||||
sqldata.append({'time':v[4],'rem_host':v[6],'username':v[0],'our_result':v[5],'upstream_result':0,'reason':v[9],'solution':''} )
|
||||
|
||||
if settings.DATABASE_EXTEND :
|
||||
self.dbc.executemany("insert into shares " +\
|
||||
"(time,rem_host,username,our_result,upstream_result,reason,solution,block_num,prev_block_hash,useragent,difficulty) " +\
|
||||
"VALUES (:time,:rem_host,:username,:our_result,:upstream_result,:reason,:solution,:block_num,:prev_block_hash,:ua,:diff)",sqldata)
|
||||
|
||||
|
||||
self.dbc.execute("select value from pool where parameter = 'round_shares'")
|
||||
round_shares = int(self.dbc.fetchone()[0]) + total_shares
|
||||
self.dbc.execute("update pool set value = :val where parameter = 'round_shares'",{'val':round_shares})
|
||||
|
||||
self.dbc.execute("select value from pool where parameter = 'round_best_share'")
|
||||
round_best_share = int(self.dbc.fetchone()[0])
|
||||
if best_diff > round_best_share:
|
||||
self.dbc.execute("update pool set value = :val where parameter = 'round_best_share'",{'val':best_diff})
|
||||
|
||||
self.dbc.execute("select value from pool where parameter = 'bitcoin_difficulty'")
|
||||
difficulty = float(self.dbc.fetchone()[0])
|
||||
|
||||
if difficulty == 0:
|
||||
progress = 0
|
||||
else:
|
||||
progress = (round_shares/difficulty)*100
|
||||
self.dbc.execute("update pool set value = :val where parameter = 'round_progress'",{'val':progress})
|
||||
|
||||
sqldata = []
|
||||
for k,v in checkin_times.items():
|
||||
sqldata.append({'last_checkin':v["time"],'addshares':v["shares"],'addrejects':v["rejects"],'user':k})
|
||||
self.dbc.executemany("update pool_worker set last_checkin = :last_checkin, total_shares = total_shares + :addshares, " +\
|
||||
"total_rejects = total_rejects + :addrejects where username = :user",sqldata)
|
||||
else:
|
||||
self.dbc.executemany("insert into shares (time,rem_host,username,our_result,upstream_result,reason,solution) " +\
|
||||
"VALUES (:time,:rem_host,:username,:our_result,:upstream_result,:reason,:solution)",sqldata)
|
||||
|
||||
self.dbh.commit()
|
||||
|
||||
def found_block(self,data):
|
||||
# Note: difficulty = -1 here
|
||||
self.dbc.execute("update shares set upstream_result = :usr, solution = :sol where time = :time and username = :user",
|
||||
{'usr':data[5],'sol':data[2],'time':data[4],'user':data[0]})
|
||||
if settings.DATABASE_EXTEND and data[5] == True :
|
||||
self.dbc.execute("update pool_worker set total_found = total_found + 1 where username = :user",{'user':data[0]})
|
||||
self.dbc.execute("select value from pool where parameter = 'pool_total_found'")
|
||||
total_found = int(self.dbc.fetchone()[0]) + 1
|
||||
self.dbc.executemany("update pool set value = :val where parameter = :parm", [{'val':0,'parm':'round_shares'},
|
||||
{'val':0,'parm':'round_progress'},
|
||||
{'val':0,'parm':'round_best_share'},
|
||||
{'val':time.time(),'parm':'round_start'},
|
||||
{'val':total_found,'parm':'pool_total_found'}
|
||||
])
|
||||
self.dbh.commit()
|
||||
|
||||
def get_user(self, id_or_username):
|
||||
raise NotImplementedError('Not implemented for SQLite')
|
||||
|
||||
def list_users(self):
|
||||
raise NotImplementedError('Not implemented for SQLite')
|
||||
|
||||
def delete_user(self,id_or_username):
|
||||
raise NotImplementedError('Not implemented for SQLite')
|
||||
|
||||
def insert_user(self,username,password):
|
||||
log.debug("Adding Username/Password")
|
||||
self.dbc.execute("insert into pool_worker (username,password) VALUES (:user,:pass)", {'user':username,'pass':password})
|
||||
self.dbh.commit()
|
||||
|
||||
def update_user(self,username,password):
|
||||
raise NotImplementedError('Not implemented for SQLite')
|
||||
|
||||
def check_password(self,username,password):
|
||||
log.debug("Checking Username/Password")
|
||||
self.dbc.execute("select COUNT(*) from pool_worker where username = :user and password = :pass", {'user':username,'pass':password})
|
||||
data = self.dbc.fetchone()
|
||||
if data[0] > 0 :
|
||||
return True
|
||||
return False
|
||||
|
||||
def update_worker_diff(self,username,diff):
|
||||
self.dbc.execute("update pool_worker set difficulty = :diff where username = :user",{'diff':diff,'user':username})
|
||||
self.dbh.commit()
|
||||
|
||||
def clear_worker_diff(self):
|
||||
if settings.DATABASE_EXTEND == True :
|
||||
self.dbc.execute("update pool_worker set difficulty = 0")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_pool_info(self,pi):
|
||||
self.dbc.executemany("update pool set value = :val where parameter = :parm",[{'val':pi['blocks'],'parm':"bitcoin_blocks"},
|
||||
{'val':pi['balance'],'parm':"bitcoin_balance"},
|
||||
{'val':pi['connections'],'parm':"bitcoin_connections"},
|
||||
{'val':pi['difficulty'],'parm':"bitcoin_difficulty"},
|
||||
{'val':time.time(),'parm':"bitcoin_infotime"}
|
||||
])
|
||||
self.dbh.commit()
|
||||
|
||||
def get_pool_stats(self):
|
||||
self.dbc.execute("select * from pool")
|
||||
ret = {}
|
||||
for data in self.dbc.fetchall():
|
||||
ret[data[0]] = data[1]
|
||||
return ret
|
||||
|
||||
def get_workers_stats(self):
|
||||
self.dbc.execute("select username,speed,last_checkin,total_shares,total_rejects,total_found,alive,difficulty from pool_worker")
|
||||
ret = {}
|
||||
for data in self.dbc.fetchall():
|
||||
ret[data[0]] = { "username" : data[0],
|
||||
"speed" : data[1],
|
||||
"last_checkin" : data[2],
|
||||
"total_shares" : data[3],
|
||||
"total_rejects" : data[4],
|
||||
"total_found" : data[5],
|
||||
"alive" : data[6],
|
||||
"difficulty" : data[7] }
|
||||
return ret
|
||||
|
||||
def close(self):
|
||||
self.dbh.close()
|
||||
|
||||
def check_tables(self):
|
||||
log.debug("Checking Tables")
|
||||
if settings.DATABASE_EXTEND == True :
|
||||
self.dbc.execute("create table if not exists shares" +\
|
||||
"(time DATETIME,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("create table if not exists pool_worker" +\
|
||||
"(username TEXT, password TEXT, speed INTEGER, last_checkin DATETIME)")
|
||||
self.dbc.execute("create table if not exists pool(parameter TEXT, value TEXT)")
|
||||
|
||||
self.dbc.execute("select COUNT(*) from pool where parameter = 'DB Version'")
|
||||
data = self.dbc.fetchone()
|
||||
if data[0] <= 0:
|
||||
self.dbc.execute("alter table pool_worker add total_shares INTEGER default 0")
|
||||
self.dbc.execute("alter table pool_worker add total_rejects INTEGER default 0")
|
||||
self.dbc.execute("alter table pool_worker add total_found INTEGER default 0")
|
||||
self.dbc.execute("insert into pool (parameter,value) VALUES ('DB Version',2)")
|
||||
self.update_tables()
|
||||
else :
|
||||
self.dbc.execute("create table if not exists shares" + \
|
||||
"(time DATETIME,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT)")
|
||||
self.dbc.execute("create table if not exists pool_worker(username TEXT, password TEXT)")
|
||||
self.dbc.execute("create index if not exists pool_worker_username ON pool_worker(username)")
|
||||
|
||||
def update_tables(self):
|
||||
version = 0
|
||||
current_version = 6
|
||||
while version < current_version :
|
||||
self.dbc.execute("select value from pool where parameter = 'DB Version'")
|
||||
data = self.dbc.fetchone()
|
||||
version = int(data[0])
|
||||
if version < current_version :
|
||||
log.info("Updating Database from %i to %i" % (version, version +1))
|
||||
getattr(self, 'update_version_' + str(version) )()
|
||||
|
||||
|
||||
def update_version_2(self):
|
||||
log.info("running update 2")
|
||||
self.dbc.executemany("insert into pool (parameter,value) VALUES (?,?)",[('bitcoin_blocks',0),
|
||||
('bitcoin_balance',0),
|
||||
('bitcoin_connections',0),
|
||||
('bitcoin_difficulty',0),
|
||||
('pool_speed',0),
|
||||
('pool_total_found',0),
|
||||
('round_shares',0),
|
||||
('round_progress',0),
|
||||
('round_start',time.time())
|
||||
])
|
||||
self.dbc.execute("create index if not exists shares_username ON shares(username)")
|
||||
self.dbc.execute("create index if not exists pool_worker_username ON pool_worker(username)")
|
||||
self.dbc.execute("update pool set value = 3 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_3(self):
|
||||
log.info("running update 3")
|
||||
self.dbc.executemany("insert into pool (parameter,value) VALUES (?,?)",[
|
||||
('round_best_share',0),
|
||||
('bitcoin_infotime',0),
|
||||
])
|
||||
self.dbc.execute("alter table pool_worker add alive INTEGER default 0")
|
||||
self.dbc.execute("update pool set value = 4 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_4(self):
|
||||
log.info("running update 4")
|
||||
self.dbc.execute("alter table pool_worker add difficulty INTEGER default 0")
|
||||
self.dbc.execute("create table if not exists shares_archive" +\
|
||||
"(time DATETIME,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("create table if not exists shares_archive_found" +\
|
||||
"(time DATETIME,rem_host TEXT, username TEXT, our_result INTEGER, upstream_result INTEGER, reason TEXT, solution TEXT, " +\
|
||||
"block_num INTEGER, prev_block_hash TEXT, useragent TEXT, difficulty INTEGER)")
|
||||
self.dbc.execute("update pool set value = 5 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
|
||||
def update_version_5(self):
|
||||
log.info("running update 5")
|
||||
# Adding Primary key to table: pool
|
||||
self.dbc.execute("alter table pool rename to pool_old")
|
||||
self.dbc.execute("create table if not exists pool(parameter TEXT, value TEXT, primary key(parameter))")
|
||||
self.dbc.execute("insert into pool select * from pool_old")
|
||||
self.dbc.execute("drop table pool_old")
|
||||
self.dbh.commit()
|
||||
# Adding Primary key to table: pool_worker
|
||||
self.dbc.execute("alter table pool_worker rename to pool_worker_old")
|
||||
self.dbc.execute("CREATE TABLE pool_worker(username TEXT, password TEXT, speed INTEGER, last_checkin DATETIME, total_shares INTEGER default 0, total_rejects INTEGER default 0, total_found INTEGER default 0, alive INTEGER default 0, difficulty INTEGER default 0, primary key(username))")
|
||||
self.dbc.execute("insert into pool_worker select * from pool_worker_old")
|
||||
self.dbc.execute("drop table pool_worker_old")
|
||||
self.dbh.commit()
|
||||
# Adjusting indicies on table: shares
|
||||
self.dbc.execute("DROP INDEX shares_username")
|
||||
self.dbc.execute("CREATE INDEX shares_time_username ON shares(time,username)")
|
||||
self.dbc.execute("CREATE INDEX shares_upstreamresult ON shares(upstream_result)")
|
||||
self.dbh.commit()
|
||||
|
||||
self.dbc.execute("update pool set value = 6 where parameter = 'DB Version'")
|
||||
self.dbh.commit()
|
||||
Loading…
Reference in New Issue
Block a user