PEP8 formating

This commit is contained in:
4tochka 2018-05-29 11:33:47 +04:00
parent fa00b7ff21
commit a0a7c8ed08
6 changed files with 152 additions and 125 deletions

View File

@ -19,7 +19,7 @@ class PrivateKey():
self.testnet = testnet self.testnet = testnet
return return
assert type(key) == str assert type(key) == str
self.raw_key = WIF_to_private_key(key) self.raw_key = wif_to_private_key(key)
if key[0] in (MAINNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX, if key[0] in (MAINNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX,
TESTNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX): TESTNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX):
self.compressed = False self.compressed = False
@ -39,7 +39,7 @@ class PrivateKey():
compressed = self.compressed compressed = self.compressed
if testnet is None: if testnet is None:
testnet = self.testnet testnet = self.testnet
return private_key_to_WIF(self.raw_key, compressed, testnet) return private_key_to_wif(self.raw_key, compressed, testnet)
class PublicKey(): class PublicKey():
@ -95,7 +95,7 @@ class Address():
if address_type == "P2SH_P2WPKH": if address_type == "P2SH_P2WPKH":
self.script_hash = True self.script_hash = True
self.redeem_script = public_key_to_P2SH_P2WPKH_script(self.public_key.raw_key) self.redeem_script = public_key_to_p2sh_p2wpkh_script(self.public_key.raw_key)
self.redeem_script_hex = hexlify(self.redeem_script).decode() self.redeem_script_hex = hexlify(self.redeem_script).decode()
self.hash = hash160(self.redeem_script) self.hash = hash160(self.redeem_script)
else: else:

View File

@ -196,7 +196,7 @@ class OLDTransaction():
def sign_P2SHP2WPKH_input(self, sighash_type, input_index, private_key = None, amount = None): def sign_P2SHP2WPKH_input(self, sighash_type, input_index, private_key = None, amount = None):
if type(private_key) == str: if type(private_key) == str:
private_key = WIF_to_private_key(private_key) private_key = wif_to_private_key(private_key)
if amount is not None: if amount is not None:
self.tx_in[input_index].amount = amount self.tx_in[input_index].amount = amount
else: else:

View File

@ -1,5 +1,4 @@
from secp256k1 import lib as secp256k1 from secp256k1 import lib as secp256k1
from secp256k1 import ffi
import random import random
SIGHASH_ALL = 0x00000001 SIGHASH_ALL = 0x00000001

View File

@ -1,5 +1,6 @@
import time import time
import struct import struct
from secp256k1 import ffi
from .constants import * from .constants import *
from .opcodes import * from .opcodes import *
from .hash import * from .hash import *
@ -14,22 +15,24 @@ def create_private_key(hex=False):
""" """
:return: 32 bytes private key :return: 32 bytes private key
""" """
a = random.SystemRandom().randint(0,MAX_INT_PRIVATE_KEY) a = random.SystemRandom().randint(0, MAX_INT_PRIVATE_KEY)
i = int((time.time()%0.01)*100000) i = int((time.time() % 0.01)*100000)
h = a.to_bytes(32,byteorder="big") h = a.to_bytes(32, byteorder="big")
while True: while True:
h = hashlib.sha256(h).digest() h = hashlib.sha256(h).digest()
if i>1: i -= 1 if i > 1:
i -= 1
else: else:
if int.from_bytes(h,byteorder="big")<MAX_INT_PRIVATE_KEY: if int.from_bytes(h, byteorder="big") < MAX_INT_PRIVATE_KEY:
break break
if hex: if hex:
return hexlify(h).decode() return hexlify(h).decode()
return h return h
def private_key_to_WIF(h, compressed = True, testnet = False):
#uncompressed: 0x80 + [32-byte secret] + [4 bytes of Hash() of previous 33 bytes], base58 encoded def private_key_to_wif(h, compressed=True, testnet=False):
#compressed: 0x80 + [32-byte secret] + 0x01 + [4 bytes of Hash() previous 34 bytes], base58 encoded # uncompressed: 0x80 + [32-byte secret] + [4 bytes of Hash() of previous 33 bytes], base58 encoded
# compressed: 0x80 + [32-byte secret] + 0x01 + [4 bytes of Hash() previous 34 bytes], base58 encoded
if type(h) == str: if type(h) == str:
h = unhexlify(h) h = unhexlify(h)
assert len(h) == 32 assert len(h) == 32
@ -37,18 +40,21 @@ def private_key_to_WIF(h, compressed = True, testnet = False):
h = TESTNET_PRIVATE_KEY_BYTE_PREFIX + h h = TESTNET_PRIVATE_KEY_BYTE_PREFIX + h
else: else:
h = MAINNET_PRIVATE_KEY_BYTE_PREFIX + h h = MAINNET_PRIVATE_KEY_BYTE_PREFIX + h
if compressed: h += b'\x01' if compressed:
h += b'\x01'
h += double_sha256(h)[:4] h += double_sha256(h)[:4]
return encode_base58(h) return encode_base58(h)
def WIF_to_private_key(h, hex = False):
assert is_WIF_valid(h) def wif_to_private_key(h, hex=False):
assert is_wif_valid(h)
h = decode_base58(h) h = decode_base58(h)
if hex: if hex:
return hexlify(h[1:33]).decode() return hexlify(h[1:33]).decode()
return h[1:33] return h[1:33]
def is_WIF_valid(wif):
def is_wif_valid(wif):
assert type(wif) == str assert type(wif) == str
if wif[0] not in PRIVATE_KEY_PREFIX_LIST: if wif[0] not in PRIVATE_KEY_PREFIX_LIST:
return False return False
@ -67,18 +73,19 @@ def is_WIF_valid(wif):
return False return False
return True return True
def private_to_public_key(private_key, compressed = True, hex = False):
def private_to_public_key(private_key, compressed=True, hex=False):
if type(private_key)!= bytes: if type(private_key)!= bytes:
if type(private_key) == bytearray: if type(private_key) == bytearray:
private_key = bytes(private_key) private_key = bytes(private_key)
elif type(private_key) == str: elif type(private_key) == str:
if not is_WIF_valid(private_key): if not is_wif_valid(private_key):
private_key = unhexlify(private_key) private_key = unhexlify(private_key)
else: else:
if private_key[0] in (MAINNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX, if private_key[0] in (MAINNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX,
TESTNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX): TESTNET_PRIVATE_KEY_UNCOMPRESSED_PREFIX):
compressed = False compressed = False
private_key = WIF_to_private_key(private_key) private_key = wif_to_private_key(private_key)
else: else:
raise TypeError("private key must be a bytes or WIF or hex encoded string") raise TypeError("private key must be a bytes or WIF or hex encoded string")
pubkey_ptr = ffi.new('secp256k1_pubkey *') pubkey_ptr = ffi.new('secp256k1_pubkey *')
@ -93,6 +100,7 @@ def private_to_public_key(private_key, compressed = True, hex = False):
pub = bytes(ffi.buffer(pubkey, len_key)) pub = bytes(ffi.buffer(pubkey, len_key))
return hexlify(pub).decode() if hex else pub return hexlify(pub).decode() if hex else pub
def is_valid_public_key(key): def is_valid_public_key(key):
if len(key) < 33: if len(key) < 33:
return False return False
@ -109,8 +117,8 @@ def is_valid_public_key(key):
# #
def hash_to_address(address_hash, testnet = False, def hash_to_address(address_hash, testnet=False,
script_hash = False, witness_version = 0): script_hash=False, witness_version=0):
if type(address_hash) == str: if type(address_hash) == str:
address_hash = unhexlify(address_hash) address_hash = unhexlify(address_hash)
if not script_hash: if not script_hash:
@ -120,7 +128,7 @@ def hash_to_address(address_hash, testnet = False,
prefix = TESTNET_ADDRESS_BYTE_PREFIX prefix = TESTNET_ADDRESS_BYTE_PREFIX
else: else:
prefix = MAINNET_ADDRESS_BYTE_PREFIX prefix = MAINNET_ADDRESS_BYTE_PREFIX
address_hash = prefix + address_hash address_hash = prefix + address_hash
address_hash += double_sha256(address_hash)[:4] address_hash += double_sha256(address_hash)[:4]
return encode_base58(address_hash) return encode_base58(address_hash)
else: else:
@ -130,7 +138,7 @@ def hash_to_address(address_hash, testnet = False,
prefix = TESTNET_SCRIPT_ADDRESS_BYTE_PREFIX prefix = TESTNET_SCRIPT_ADDRESS_BYTE_PREFIX
else: else:
prefix = MAINNET_SCRIPT_ADDRESS_BYTE_PREFIX prefix = MAINNET_SCRIPT_ADDRESS_BYTE_PREFIX
address_hash = prefix + address_hash address_hash = prefix + address_hash
address_hash += double_sha256(address_hash)[:4] address_hash += double_sha256(address_hash)[:4]
return encode_base58(address_hash) return encode_base58(address_hash)
if testnet: if testnet:
@ -139,15 +147,15 @@ def hash_to_address(address_hash, testnet = False,
else: else:
prefix = MAINNET_SEGWIT_ADDRESS_BYTE_PREFIX prefix = MAINNET_SEGWIT_ADDRESS_BYTE_PREFIX
hrp = MAINNET_SEGWIT_ADDRESS_PREFIX hrp = MAINNET_SEGWIT_ADDRESS_PREFIX
address_hash = witness_version.to_bytes(1, "big") + rebase_8_to_5( address_hash) address_hash = witness_version.to_bytes(1, "big") + rebase_8_to_5(address_hash)
checksum = bech32_polymod(prefix + address_hash + b"\x00" * 6) checksum = bech32_polymod(prefix + address_hash + b"\x00" * 6)
checksum = rebase_8_to_5(checksum.to_bytes(5, "big"))[2:] checksum = rebase_8_to_5(checksum.to_bytes(5, "big"))[2:]
return "%s1%s" % (hrp, rebase_5_to_32(address_hash + checksum).decode()) return "%s1%s" % (hrp, rebase_5_to_32(address_hash + checksum).decode())
def address_to_hash(address, hex = False): def address_to_hash(address, hex=False):
if address[0] in ADDRESS_PREFIX_LIST: if address[0] in ADDRESS_PREFIX_LIST:
h = decode_base58(address)[1:-4] h = decode_base58(address)[1:-4]
elif address[:2] in (MAINNET_SEGWIT_ADDRESS_PREFIX, elif address[:2] in (MAINNET_SEGWIT_ADDRESS_PREFIX,
TESTNET_SEGWIT_ADDRESS_PREFIX): TESTNET_SEGWIT_ADDRESS_PREFIX):
address = address.split("1")[1] address = address.split("1")[1]
@ -159,18 +167,20 @@ def address_to_hash(address, hex = False):
else: else:
return h return h
def get_witness_version(address): def get_witness_version(address):
address = address.split("1")[1] address = address.split("1")[1]
h = rebase_32_to_5(address) h = rebase_32_to_5(address)
return h[0] return h[0]
def address_type(address, num = False):
def address_type(address, num=False):
if address[0] in (TESTNET_SCRIPT_ADDRESS_PREFIX, if address[0] in (TESTNET_SCRIPT_ADDRESS_PREFIX,
MAINNET_SCRIPT_ADDRESS_PREFIX): MAINNET_SCRIPT_ADDRESS_PREFIX):
t = 'P2SH' t = 'P2SH'
elif address[0] in (MAINNET_ADDRESS_PREFIX, elif address[0] in (MAINNET_ADDRESS_PREFIX,
TESTNET_ADDRESS_PREFIX, TESTNET_ADDRESS_PREFIX,
TESTNET_ADDRESS_PREFIX_2): TESTNET_ADDRESS_PREFIX_2):
t = 'P2PKH' t = 'P2PKH'
elif address[:2] in (MAINNET_SEGWIT_ADDRESS_PREFIX, elif address[:2] in (MAINNET_SEGWIT_ADDRESS_PREFIX,
TESTNET_SEGWIT_ADDRESS_PREFIX): TESTNET_SEGWIT_ADDRESS_PREFIX):
@ -184,7 +194,8 @@ def address_type(address, num = False):
return SCRIPT_TYPES['NON_STANDARD'] if num else 'UNKNOWN' return SCRIPT_TYPES['NON_STANDARD'] if num else 'UNKNOWN'
return SCRIPT_TYPES[t] if num else t return SCRIPT_TYPES[t] if num else t
def script_to_hash(s, witness = False, hex = False):
def script_to_hash(s, witness=False, hex=False):
if type(s) == str: if type(s) == str:
s = unhexlify(s) s = unhexlify(s)
if witness: if witness:
@ -192,7 +203,8 @@ def script_to_hash(s, witness = False, hex = False):
else: else:
return hash160(s, hex) return hash160(s, hex)
def address_to_script(address, hex = False):
def address_to_script(address, hex=False):
if address[0] in (TESTNET_SCRIPT_ADDRESS_PREFIX, if address[0] in (TESTNET_SCRIPT_ADDRESS_PREFIX,
MAINNET_SCRIPT_ADDRESS_PREFIX): MAINNET_SCRIPT_ADDRESS_PREFIX):
s = [BYTE_OPCODE["OP_HASH160"], s = [BYTE_OPCODE["OP_HASH160"],
@ -200,33 +212,34 @@ def address_to_script(address, hex = False):
address_to_hash(address), address_to_hash(address),
BYTE_OPCODE["OP_EQUAL"]] BYTE_OPCODE["OP_EQUAL"]]
elif address[0] in (MAINNET_ADDRESS_PREFIX, elif address[0] in (MAINNET_ADDRESS_PREFIX,
TESTNET_ADDRESS_PREFIX, TESTNET_ADDRESS_PREFIX,
TESTNET_ADDRESS_PREFIX_2): TESTNET_ADDRESS_PREFIX_2):
s = [BYTE_OPCODE["OP_DUP"], s = [BYTE_OPCODE["OP_DUP"],
BYTE_OPCODE["OP_HASH160"], BYTE_OPCODE["OP_HASH160"],
b'\x14', b'\x14',
address_to_hash(address), address_to_hash(address),
BYTE_OPCODE["OP_EQUALVERIFY"], BYTE_OPCODE["OP_EQUALVERIFY"],
BYTE_OPCODE["OP_CHECKSIG"]] BYTE_OPCODE["OP_CHECKSIG"]]
elif address[:2] in (TESTNET_SEGWIT_ADDRESS_PREFIX, elif address[:2] in (TESTNET_SEGWIT_ADDRESS_PREFIX,
MAINNET_SEGWIT_ADDRESS_PREFIX): MAINNET_SEGWIT_ADDRESS_PREFIX):
h = address_to_hash(address) h = address_to_hash(address)
s = [BYTE_OPCODE["OP_0"], s = [BYTE_OPCODE["OP_0"],
bytes([len(h)]), bytes([len(h)]),
h] h]
else: else:
assert False assert False
s = b''.join(s) s = b''.join(s)
return hexlify(s).decode() if hex else s return hexlify(s).decode() if hex else s
def public_key_to_P2SH_P2WPKH_script(pubkey):
def public_key_to_p2sh_p2wpkh_script(pubkey):
assert len(pubkey) == 33 assert len(pubkey) == 33
return b'\x00\x14' + hash160(pubkey) return b'\x00\x14' + hash160(pubkey)
def public_key_to_address(pubkey, testnet = False, def public_key_to_address(pubkey, testnet=False,
p2sh_p2wpkh = False, p2sh_p2wpkh=False,
witness_version = 0): witness_version=0):
if type(pubkey) == str: if type(pubkey) == str:
pubkey = unhexlify(pubkey) pubkey = unhexlify(pubkey)
if p2sh_p2wpkh: if p2sh_p2wpkh:
@ -237,9 +250,10 @@ def public_key_to_address(pubkey, testnet = False,
if witness_version is not None: if witness_version is not None:
assert len(pubkey) == 33 assert len(pubkey) == 33
h = hash160(pubkey) h = hash160(pubkey)
return hash_to_address(h, testnet = testnet, return hash_to_address(h, testnet=testnet,
script_hash = p2sh_p2wpkh, script_hash=p2sh_p2wpkh,
witness_version = witness_version) witness_version=witness_version)
def parse_script(script, segwit=True): def parse_script(script, segwit=True):
if type(script) == str: if type(script) == str:
@ -324,7 +338,8 @@ def parse_script(script, segwit=True):
s += 1 s += 1
return {"nType": 7, "type": "NON_STANDARD", "reqSigs": req_sigs, "script": script} return {"nType": 7, "type": "NON_STANDARD", "reqSigs": req_sigs, "script": script}
def decode_script(script, asm = False):
def decode_script(script, asm=False):
if type(script) == str: if type(script) == str:
try: try:
script = unhexlify(script) script = unhexlify(script)
@ -353,9 +368,7 @@ def decode_script(script, asm = False):
return ' '.join(result) return ' '.join(result)
def is_address_valid(address, testnet=False):
def is_address_valid(address, testnet = False):
if not address or type(address) != str: if not address or type(address) != str:
return False return False
if address[0] in (MAINNET_ADDRESS_PREFIX, if address[0] in (MAINNET_ADDRESS_PREFIX,
@ -373,7 +386,8 @@ def is_address_valid(address, testnet = False):
MAINNET_SCRIPT_ADDRESS_PREFIX): MAINNET_SCRIPT_ADDRESS_PREFIX):
return False return False
h = decode_base58(address) h = decode_base58(address)
if len(h) != 25: return False if len(h) != 25:
return False
checksum = h[-4:] checksum = h[-4:]
if double_sha256(h[:-4])[:4] != checksum: if double_sha256(h[:-4])[:4] != checksum:
return False return False
@ -392,10 +406,10 @@ def is_address_valid(address, testnet = False):
if not i.isupper() or i not in base32charset_upcase: if not i.isupper() or i not in base32charset_upcase:
return False return False
else: else:
if i.isupper() or i not in base32charset: if i.isupper() or i not in base32charset:
return False return False
payload = payload.lower() payload = payload.lower()
prefix = prefix.lower() prefix = prefix.lower()
if testnet: if testnet:
if prefix != TESTNET_SEGWIT_ADDRESS_PREFIX: if prefix != TESTNET_SEGWIT_ADDRESS_PREFIX:
return False return False
@ -414,30 +428,25 @@ def is_address_valid(address, testnet = False):
return True return True
# #
# ECDSA # ECDSA
# #
def verify_signature(sig, pubKey, msg): def verify_signature(sig, pub_key, msg):
if type(sig) != bytes: if type(sig) != bytes:
if type(sig) == bytearray: if type(sig) == bytearray:
sig = bytes(sig) sig = bytes(sig)
elif type(sig) == str: elif type(sig) == str:
sig = unhexlify(sig) sig = unhexlify(sig)
else : else:
raise TypeError("signature must be a bytes or hex encoded string") raise TypeError("signature must be a bytes or hex encoded string")
if type(pubKey) != bytes: if type(pub_key) != bytes:
if type(pubKey) == bytearray: if type(pub_key) == bytearray:
pubKey = bytes(pubKey) pub_key = bytes(pub_key)
elif type(pub_key) == str:
elif type(pubKey) == str: pub_key = unhexlify(pub_key)
pubKey = unhexlify(pubKey) else:
else :
raise TypeError("public key must be a bytes or hex encoded string") raise TypeError("public key must be a bytes or hex encoded string")
if type(msg) != bytes: if type(msg) != bytes:
if type(msg) == bytearray: if type(msg) == bytearray:
msg = bytes(msg) msg = bytes(msg)
@ -445,17 +454,17 @@ def verify_signature(sig, pubKey, msg):
msg = unhexlify(msg) msg = unhexlify(msg)
else: else:
raise TypeError("message must be a bytes or hex encoded string") raise TypeError("message must be a bytes or hex encoded string")
raw_sig = ffi.new('secp256k1_ecdsa_signature *') raw_sig = ffi.new('secp256k1_ecdsa_signature *')
raw_pubkey = ffi.new('secp256k1_pubkey *') raw_pubkey = ffi.new('secp256k1_pubkey *')
if not secp256k1.secp256k1_ecdsa_signature_parse_der(ECDSA_CONTEXT_VERIFY , raw_sig, sig, len(sig)): if not secp256k1.secp256k1_ecdsa_signature_parse_der(ECDSA_CONTEXT_VERIFY, raw_sig, sig, len(sig)):
raise TypeError("signature must be DER encoded") raise TypeError("signature must be DER encoded")
if not secp256k1.secp256k1_ec_pubkey_parse(ECDSA_CONTEXT_VERIFY, raw_pubkey, pubKey, len(pubKey)): if not secp256k1.secp256k1_ec_pubkey_parse(ECDSA_CONTEXT_VERIFY, raw_pubkey, pub_key, len(pub_key)):
raise TypeError("public key format error") raise TypeError("public key format error")
result = secp256k1.secp256k1_ecdsa_verify(ECDSA_CONTEXT_VERIFY, raw_sig, msg, raw_pubkey) result = secp256k1.secp256k1_ecdsa_verify(ECDSA_CONTEXT_VERIFY, raw_sig, msg, raw_pubkey)
return True if result else False return True if result else False
def sign_message(msg, private_key, hex = False):
def sign_message(msg, private_key, hex=False):
""" """
:param msg: message to sign :param msg: message to sign
:param private_key: private key (bytes, hex encoded string) :param private_key: private key (bytes, hex encoded string)
@ -470,7 +479,7 @@ def sign_message(msg, private_key, hex = False):
msg = unhexlify(msg) msg = unhexlify(msg)
else : else :
raise TypeError("message must be a bytes or hex encoded string") raise TypeError("message must be a bytes or hex encoded string")
if type(private_key)!= bytes: if type(private_key) != bytes:
if type(private_key) == bytearray: if type(private_key) == bytearray:
private_key = bytes(private_key) private_key = bytes(private_key)
elif type(private_key) == str: elif type(private_key) == str:
@ -478,16 +487,19 @@ def sign_message(msg, private_key, hex = False):
else: else:
raise TypeError("private key must be a bytes or hex encoded string") raise TypeError("private key must be a bytes or hex encoded string")
raw_sig = ffi.new('secp256k1_ecdsa_signature *') raw_sig = ffi.new('secp256k1_ecdsa_signature *')
signed = secp256k1.secp256k1_ecdsa_sign(ECDSA_CONTEXT_SIGN, raw_sig, msg, private_key, ffi.NULL, ffi.NULL) signed = secp256k1.secp256k1_ecdsa_sign(ECDSA_CONTEXT_SIGN, raw_sig, msg,
private_key, ffi.NULL, ffi.NULL)
assert signed == 1 assert signed == 1
len_sig = 74 len_sig = 74
output = ffi.new('unsigned char[%d]' % len_sig) output = ffi.new('unsigned char[%d]' % len_sig)
outputlen = ffi.new('size_t *', len_sig) outputlen = ffi.new('size_t *', len_sig)
res = secp256k1.secp256k1_ecdsa_signature_serialize_der(ECDSA_CONTEXT_SIGN, output, outputlen, raw_sig) res = secp256k1.secp256k1_ecdsa_signature_serialize_der(ECDSA_CONTEXT_SIGN,
output, outputlen, raw_sig)
assert res == 1 assert res == 1
signature = bytes(ffi.buffer(output, outputlen[0])) signature = bytes(ffi.buffer(output, outputlen[0]))
return hexlify(signature).decode() if hex else signature return hexlify(signature).decode() if hex else signature
def is_valid_signature_encoding(sig): def is_valid_signature_encoding(sig):
# Format: 0x30 [total-length] 0x02 [R-length] [R] 0x02 [S-length] [S] [sighash] # Format: 0x30 [total-length] 0x02 [R-length] [R] 0x02 [S-length] [S] [sighash]
# * total-length: 1-byte length descriptor of everything that follows, # * total-length: 1-byte length descriptor of everything that follows,
@ -500,7 +512,6 @@ def is_valid_signature_encoding(sig):
# * S: arbitrary-length big-endian encoded S value. The same rules apply. # * S: arbitrary-length big-endian encoded S value. The same rules apply.
# * sighash: 1-byte value indicating what data is hashed (not part of the DER # * sighash: 1-byte value indicating what data is hashed (not part of the DER
# signature) # signature)
length = len(sig) length = len(sig)
# Minimum and maximum size constraints. # Minimum and maximum size constraints.
if (length < 9) or (length > 73): if (length < 9) or (length > 73):
@ -512,41 +523,41 @@ def is_valid_signature_encoding(sig):
if sig[1] != (length - 3): if sig[1] != (length - 3):
return False return False
# Extract the length of the R element. # Extract the length of the R element.
lenR = sig[3] len_r = sig[3]
# Make sure the length of the S element is still inside the signature. # Make sure the length of the S element is still inside the signature.
if (5 + lenR) >= length: if (5 + len_r) >= length:
return False return False
# Extract the length of the S element. # Extract the length of the S element.
lenS = sig[5 + lenR] len_s = sig[5 + len_r]
# Verify that the length of the signature matches the sum of the length # Verify that the length of the signature matches the sum of the length
# of the elements. # of the elements.
if (lenR + lenS + 7) != length: if (len_r + len_s + 7) != length:
return False return False
# Check whether the R element is an integer. # Check whether the R element is an integer.
if sig[2] != 0x02: if sig[2] != 0x02:
return False return False
# Zero-length integers are not allowed for R. # Zero-length integers are not allowed for R.
if lenR == 0: if len_r == 0:
return False return False
# Negative numbers are not allowed for R. # Negative numbers are not allowed for R.
if sig[4] & 0x80: if sig[4] & 0x80:
return False return False
# Null bytes at the start of R are not allowed, unless R would # Null bytes at the start of R are not allowed, unless R would
# otherwise be interpreted as a negative number. # otherwise be interpreted as a negative number.
if (lenR > 1) and (sig[4] == 0x00) and (not sig[5] & 0x80): if (len_r > 1) and (sig[4] == 0x00) and (not sig[5] & 0x80):
return False return False
# Check whether the S element is an integer. # Check whether the S element is an integer.
if sig[lenR + 4] != 0x02: if sig[len_r + 4] != 0x02:
return False return False
# Zero-length integers are not allowed for S. # Zero-length integers are not allowed for S.
if lenS == 0: if len_s == 0:
return False return False
# Negative numbers are not allowed for S. # Negative numbers are not allowed for S.
if sig[lenR + 6] & 0x80: if sig[len_r + 6] & 0x80:
return False return False
# Null bytes at the start of S are not allowed, unless S would otherwise be # Null bytes at the start of S are not allowed, unless S would otherwise be
# interpreted as a negative number. # interpreted as a negative number.
if (lenS > 1) and (sig[lenR + 6] == 0x00) and (not sig[lenR + 7] & 0x80): if (len_s > 1) and (sig[len_r + 6] == 0x00) and (not sig[len_r + 7] & 0x80):
return False return False
return True return True
@ -556,15 +567,20 @@ def is_valid_signature_encoding(sig):
# #
def rh2s(tthash): def rh2s(tthash):
# raw hash to string
return hexlify(tthash[::-1]).decode() return hexlify(tthash[::-1]).decode()
def s2rh(hash_string): def s2rh(hash_string):
# string to raw hash
return unhexlify(hash_string)[::-1] return unhexlify(hash_string)[::-1]
def s2rh_step4(hash_string): def s2rh_step4(hash_string):
h = unhexlify(hash_string) h = unhexlify(hash_string)
return reverse_hash(h) return reverse_hash(h)
def reverse_hash(h): def reverse_hash(h):
return struct.pack('>IIIIIIII', *struct.unpack('>IIIIIIII', h)[::-1])[::-1] return struct.pack('>IIIIIIII', *struct.unpack('>IIIIIIII', h)[::-1])[::-1]
@ -572,6 +588,7 @@ def reverse_hash(h):
# #
# #
def merkleroot(tx_hash_list): def merkleroot(tx_hash_list):
tx_hash_list = list(tx_hash_list) tx_hash_list = list(tx_hash_list)
if len(tx_hash_list) == 1: if len(tx_hash_list) == 1:
@ -590,6 +607,7 @@ def merkleroot(tx_hash_list):
else: else:
return new_hash_list[0] return new_hash_list[0]
def merkle_branches(tx_hash_list): def merkle_branches(tx_hash_list):
tx_hash_list = list(tx_hash_list) tx_hash_list = list(tx_hash_list)
branches = [] branches = []
@ -613,6 +631,7 @@ def merkle_branches(tx_hash_list):
branches.append(new_hash_list.pop(0)) branches.append(new_hash_list.pop(0))
return branches return branches
def merkleroot_from_branches(merkle_branches, coinbase_hash_bin): def merkleroot_from_branches(merkle_branches, coinbase_hash_bin):
merkle_root = coinbase_hash_bin merkle_root = coinbase_hash_bin
for h in merkle_branches: for h in merkle_branches:
@ -621,6 +640,7 @@ def merkleroot_from_branches(merkle_branches, coinbase_hash_bin):
merkle_root = double_sha256(merkle_root + h) merkle_root = double_sha256(merkle_root + h)
return merkle_root return merkle_root
def bits_to_target(bits): def bits_to_target(bits):
if type(bits) == str: if type(bits) == str:
bits = unhexlify(bits) bits = unhexlify(bits)
@ -631,12 +651,15 @@ def bits_to_target(bits):
target = (bits & 0xffffff) * (1 << (8 * (shift - 3))) target = (bits & 0xffffff) * (1 << (8 * (shift - 3)))
return target return target
def target_to_difficulty(target): def target_to_difficulty(target):
return 0x00000000FFFF0000000000000000000000000000000000000000000000000000 / target return 0x00000000FFFF0000000000000000000000000000000000000000000000000000 / target
def bits_to_difficulty(bits): def bits_to_difficulty(bits):
return target_to_difficulty(bits_to_target(bits)) return target_to_difficulty(bits_to_target(bits))
def difficulty_to_target(difficulty): def difficulty_to_target(difficulty):
return int(0x00000000FFFF0000000000000000000000000000000000000000000000000000 / difficulty) return int(0x00000000FFFF0000000000000000000000000000000000000000000000000000 / difficulty)
@ -647,12 +670,14 @@ def difficulty_to_target(difficulty):
def bytes_needed(n): def bytes_needed(n):
if n == 0: if n == 0:
return 1 return 1
return math.ceil(n.bit_length()/8) return math.ceil(n.bit_length()/8)
def int_to_bytes(i, byteorder='big'): def int_to_bytes(i, byteorder='big'):
return i.to_bytes(bytes_needed(i), byteorder=byteorder, signed=False) return i.to_bytes(bytes_needed(i), byteorder=byteorder, signed=False)
def bytes_to_int(i, byteorder='big'): def bytes_to_int(i, byteorder='big'):
return int.from_bytes(i, byteorder=byteorder, signed=False) return int.from_bytes(i, byteorder=byteorder, signed=False)
@ -668,6 +693,7 @@ def int_to_var_int(i):
return b'\xfe' + struct.pack('<L', i) return b'\xfe' + struct.pack('<L', i)
return b'\xff' + struct.pack('<Q', i) return b'\xff' + struct.pack('<Q', i)
def var_int_to_int(data): def var_int_to_int(data):
if data[0] == 0xfd: if data[0] == 0xfd:
return struct.unpack('<H', data[1:3])[0] return struct.unpack('<H', data[1:3])[0]
@ -677,6 +703,7 @@ def var_int_to_int(data):
return struct.unpack('<Q', data[1:9])[0] return struct.unpack('<Q', data[1:9])[0]
return data[0] return data[0]
def var_int_len(n): def var_int_len(n):
if n <= 0xfc: if n <= 0xfc:
return 1 return 1
@ -686,26 +713,30 @@ def var_int_len(n):
return 5 return 5
return 9 return 9
def get_var_int_len(byte): def get_var_int_len(byte):
if byte[0] == 253: if byte[0] == 253:
return 3 return 3
elif byte[0] == 254: elif byte[0] == 254:
return 5 return 5
elif byte[0] == 255: elif byte[0] == 255:
return 9 return 9
return 1 return 1
def read_var_int(stream): def read_var_int(stream):
l = stream.read(1) l = stream.read(1)
bytes_length = get_var_int_len(l) bytes_length = get_var_int_len(l)
return l + stream.read(bytes_length - 1) return l + stream.read(bytes_length - 1)
def read_var_list(stream, data_type): def read_var_list(stream, data_type):
count = var_int_to_int(read_var_int(stream)) count = var_int_to_int(read_var_int(stream))
return [data_type.deserialize(stream) for i in range(count)] return [data_type.deserialize(stream) for i in range(count)]
# compressed integer # compressed integer
def int_to_c_int(n, base_bytes=1): def int_to_c_int(n, base_bytes=1):
if n == 0: if n == 0:
return b'\x00' return b'\x00'
@ -722,11 +753,12 @@ def int_to_c_int(n, base_bytes=1):
if l < base_bytes * 8: if l < base_bytes * 8:
l = base_bytes * 8 l = base_bytes * 8
prefix = prefix << l prefix = prefix << l
if prefix.bit_length() % 8: if prefix.bit_length() % 8:
prefix = prefix << 8 - prefix.bit_length() % 8 prefix = prefix << 8 - prefix.bit_length() % 8
n ^= prefix n ^= prefix
return n.to_bytes(math.ceil(n.bit_length()/8), byteorder="big") return n.to_bytes(math.ceil(n.bit_length()/8), byteorder="big")
def c_int_to_int(b, base_bytes=1): def c_int_to_int(b, base_bytes=1):
byte_length = 0 byte_length = 0
f = 0 f = 0
@ -745,6 +777,7 @@ def c_int_to_int(b, base_bytes=1):
return n & ((1 << (byte_length+base_bytes) * 8 - byte_length) - 1) return n & ((1 << (byte_length+base_bytes) * 8 - byte_length) - 1)
return n return n
def c_int_len(n, base_bytes=1): def c_int_len(n, base_bytes=1):
if n == 0: if n == 0:
return 1 return 1
@ -753,11 +786,7 @@ def c_int_len(n, base_bytes=1):
if l <= min_bits + 1: if l <= min_bits + 1:
return 1 return 1
payload_bytes = math.ceil((l)/8) - base_bytes payload_bytes = math.ceil((l)/8) - base_bytes
return int(math.ceil((l+payload_bytes)/8)) return int(math.ceil((l+payload_bytes)/8))
# generic big endian MPI format # generic big endian MPI format
@ -788,12 +817,10 @@ def bn2mpi(v):
have_ext = False have_ext = False
if v.bit_length() > 0: if v.bit_length() > 0:
have_ext = (v.bit_length() & 0x07) == 0 have_ext = (v.bit_length() & 0x07) == 0
neg = False neg = False
if v < 0: if v < 0:
neg = True neg = True
v = -v v = -v
s = struct.pack(b">I", bn_bytes(v, have_ext)) s = struct.pack(b">I", bn_bytes(v, have_ext))
ext = bytearray() ext = bytearray()
if have_ext: if have_ext:
@ -816,7 +843,6 @@ def mpi2bn(s):
return None return None
if v_len == 0: if v_len == 0:
return 0 return 0
v_str = bytearray(s[4:]) v_str = bytearray(s[4:])
neg = False neg = False
i = v_str[0] i = v_str[0]
@ -824,7 +850,6 @@ def mpi2bn(s):
neg = True neg = True
i &= ~0x80 i &= ~0x80
v_str[0] = i v_str[0] = i
v = bin2bn(v_str) v = bin2bn(v_str)
if neg: if neg:
@ -861,6 +886,7 @@ def i2b(i): return bn2vch(i)
def b2i(b): return vch2bn(b) def b2i(b): return vch2bn(b)
def get_stream(stream): def get_stream(stream):
if type(stream) != io.BytesIO: if type(stream) != io.BytesIO:
if type(stream) == str: if type(stream) == str:

View File

@ -20,36 +20,36 @@ class AddressFunctionsTests(unittest.TestCase):
pum = "5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf" pum = "5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf"
put = "93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L" put = "93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L"
pct = "cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6" pct = "cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6"
self.assertEqual(tools.private_key_to_WIF(p, compressed=1, testnet=0), pcm) self.assertEqual(tools.private_key_to_wif(p, compressed=1, testnet=0), pcm)
self.assertEqual(tools.private_key_to_WIF(p, compressed=0, testnet=0), pum) self.assertEqual(tools.private_key_to_wif(p, compressed=0, testnet=0), pum)
self.assertEqual(tools.private_key_to_WIF(p, compressed=1, testnet=1), pct) self.assertEqual(tools.private_key_to_wif(p, compressed=1, testnet=1), pct)
self.assertEqual(tools.private_key_to_WIF(p, compressed=0, testnet=1), put) self.assertEqual(tools.private_key_to_wif(p, compressed=0, testnet=1), put)
def test_is_WIF_valid(self): def test_is_WIF_valid(self):
self.assertEqual(tools.is_WIF_valid("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX"),1) self.assertEqual(tools.is_wif_valid("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX"), 1)
self.assertEqual(tools.is_WIF_valid("5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf"),1) self.assertEqual(tools.is_wif_valid("5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf"), 1)
self.assertEqual(tools.is_WIF_valid("5KPPLXhtga99qqMcWRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf"),0) self.assertEqual(tools.is_wif_valid("5KPPLXhtga99qqMcWRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf"), 0)
self.assertEqual(tools.is_WIF_valid("93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L"),1) self.assertEqual(tools.is_wif_valid("93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L"), 1)
self.assertEqual(tools.is_WIF_valid("cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6"),1) self.assertEqual(tools.is_wif_valid("cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6"), 1)
self.assertEqual(tools.is_WIF_valid("cUWo47XLYiyByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6"),0) self.assertEqual(tools.is_wif_valid("cUWo47XLYiyByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6"), 0)
def test_WIF_to_private_key(self): def test_WIF_to_private_key(self):
p = "ceda1ae4286015d45ec5147fe3f63e9377ccd6d4e98bcf0847df9937da1944a4" p = "ceda1ae4286015d45ec5147fe3f63e9377ccd6d4e98bcf0847df9937da1944a4"
self.assertEqual(tools.WIF_to_private_key("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX", self.assertEqual(tools.wif_to_private_key("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX",
hex=1),p) hex=1),p)
self.assertEqual(tools.WIF_to_private_key("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX", self.assertEqual(tools.wif_to_private_key("L49obCXV7fGz2YRzLCSJgeZBYmGeBbKPT7xiehUeYX2S4URkPFZX",
hex=0),unhexlify(p)) hex=0),unhexlify(p))
self.assertEqual(tools.WIF_to_private_key("5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf", self.assertEqual(tools.wif_to_private_key("5KPPLXhtga99qqMceRo4Z6LXV3Kx6a9hRx3ez2U7EwP5KZfy2Wf",
hex=1),p) hex=1),p)
self.assertEqual(tools.WIF_to_private_key("93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L", self.assertEqual(tools.wif_to_private_key("93A1vGXSGoDHotruGmgyRgtV8hgfFjgtmtuc4epcag886W9d44L",
hex=1),p) hex=1),p)
self.assertEqual(tools.WIF_to_private_key("cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6", self.assertEqual(tools.wif_to_private_key("cUWo47XLYiyFByuFicFS3y4FAza3r3R5XA7Bm7wA3dgSKDYox7h6",
hex=1),p) hex=1),p)
def test_create_private_key(self): def test_create_private_key(self):
p = tools.create_private_key() p = tools.create_private_key()
pw = tools.private_key_to_WIF(p) pw = tools.private_key_to_wif(p)
self.assertEqual(tools.is_WIF_valid(pw), True) self.assertEqual(tools.is_wif_valid(pw), True)

View File

@ -1,14 +1,16 @@
import unittest import unittest
import os, sys import os
import sys
import time import time
parentPath = os.path.abspath("..") parentPath = os.path.abspath("..")
if parentPath not in sys.path: if parentPath not in sys.path:
sys.path.insert(0, parentPath) sys.path.insert(0, parentPath)
from pybtc.tools import * from pybtc.tools import *
from pybtc.hash import * from pybtc.hash import *
from pybtc.transaction import * from pybtc.transaction import *
from binascii import unhexlify from binascii import unhexlify
from pybtc import address_to_hash as address2hash160 from pybtc import address_to_hash as address2hash160
def decode_block_tx(block): def decode_block_tx(block):