bip32 refactoring

This commit is contained in:
4tochka 2018-07-03 13:56:40 +04:00
parent e9add2530d
commit 19eb1e12d2
15 changed files with 492 additions and 475 deletions

View File

@ -9,5 +9,5 @@ Reference
address.rst
transaction.rst
block.rst
wallet.rst

View File

@ -7,13 +7,12 @@ Base function primitives implemented in functional programming paradigm.
Mnemonic
============
Mnemonic(BIP39)
===============
.. autofunction:: pybtc.create_passphrase
.. autofunction:: pybtc.create_mnemonic
.. autofunction:: pybtc.create_wordlist
.. autofunction:: pybtc.add_checksum_ent
.. autofunction:: pybtc.generate_entropy
.. autofunction:: pybtc.load_word_list
.. autofunction:: pybtc.entropy_to_mnemonic
.. autofunction:: pybtc.mnemonic_to_entropy
.. autofunction:: pybtc.mnemonic_to_seed
@ -42,6 +41,19 @@ Public keys
.. autofunction:: pybtc.private_to_public_key
.. autofunction:: pybtc.is_public_key_valid
Extended keys(BIP32)
====================
.. autofunction:: pybtc.create_master_xprivate_key
.. autofunction:: pybtc.xprivate_to_xpublic_key
.. autofunction:: pybtc.derive_xkey
.. autofunction:: pybtc.public_from_xpublic_key
.. autofunction:: pybtc.private_from_xprivate_key
Addresses
=========
@ -53,25 +65,6 @@ Addresses
.. autofunction:: pybtc.is_address_valid
HD Wallets
=========
.. autofunction:: pybtc.create_xmaster_key
.. autofunction:: pybtc.create_xpublic_key
.. autofunction:: pybtc.derive_xkey
.. autofunction:: pybtc.xprivate_to_xpublic_key
.. autofunction:: pybtc.xkey_to_private_key
.. autofunction:: pybtc.xkey_to_public_key
.. autofunction:: pybtc.create_child_privkey
.. autofunction:: pybtc.create_child_pubkey
.. autofunction:: pybtc.create_expanded_key
.. autofunction:: pybtc.create_expanded_hard_key
.. autofunction:: pybtc.is_xprivate_key_valid
.. autofunction:: pybtc.is_xpublic_key_valid
.. autofunction:: pybtc.is_validate_path_level
.. autofunction:: pybtc.serialize_xkey
.. autofunction:: pybtc.deserialize_xkey
Script
======

9
docs/source/wallet.rst Normal file
View File

@ -0,0 +1,9 @@
======
Blocks
======
The class for creating transaction.
.. autoclass:: pybtc.Wallet

View File

@ -4,5 +4,5 @@ from .consensus import *
from .transaction import *
from .block import *
from .address import *
from .hdwallet import *
from .wallet import *
version = "2.0.1"

View File

@ -10,7 +10,7 @@ SIGHASH_ALL = 0x00000001
SIGHASH_NONE = 0x00000002
SIGHASH_SINGLE = 0x00000003
SIGHASH_ANYONECANPAY = 0x00000080
MAX_INT_PRIVATE_KEY = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
ECDSA_SEC256K1_ORDER = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
MAINNET_ADDRESS_BYTE_PREFIX = b'\x00'
TESTNET_ADDRESS_BYTE_PREFIX = b'\x6f'
@ -66,7 +66,7 @@ ECDSA_CONTEXT_SIGN = secp256k1.secp256k1_context_create(FLAG_SIGN)
ECDSA_CONTEXT_VERIFY = secp256k1.secp256k1_context_create(FLAG_VERIFY)
ECDSA_CONTEXT_ALL = secp256k1.secp256k1_context_create(ALL_FLAGS)
secp256k1.secp256k1_context_randomize(ECDSA_CONTEXT_SIGN,
random.SystemRandom().randint(0, MAX_INT_PRIVATE_KEY).to_bytes(32,byteorder="big"))
random.SystemRandom().randint(0, ECDSA_SEC256K1_ORDER).to_bytes(32, byteorder="big"))
SCRIPT_TYPES = {"P2PKH": 0,
"P2SH": 1,
@ -80,10 +80,11 @@ SCRIPT_TYPES = {"P2PKH": 0,
# CONSTANTS hierarchical deterministic wallets (HD Wallets)
MAINNET_PRIVATE_WALLET_VERSION = b'\x04\x88\xAD\xE4'
MAINNET_PUBLIC_WALLET_VERSION = b'\x04\x88\xB2\x1E'
TESTNET_PRIVATE_WALLET_VERSION = b'\x04\x35\x83\x94'
TESTNET_PUBLIC_WALLET_VERSION = b'\x04\x35\x87\xCF'
MAINNET_XPRIVATE_KEY_PREFIX = b'\x04\x88\xAD\xE4'
MAINNET_XPUBLIC_KEY_PREFIX = b'\x04\x88\xB2\x1E'
TESTNET_XPRIVATE_KEY_PREFIX = b'\x04\x35\x83\x94'
TESTNET_XPUBLIC_KEY_PREFIX = b'\x04\x35\x87\xCF'
HARDENED_KEY = 0x80000000
FIRST_HARDENED_CHILD = 0x80000000
PATH_LEVEL_BIP0044 = [0x8000002C, 0x80000000, 0x80000000, 0, 0]
TESTNET_PATH_LEVEL_BIP0044 = [0x8000002C, 0x80000001, 0x80000000, 0, 0]

View File

@ -5,4 +5,5 @@ from .tools import *
from .hash import *
from .block import *
from .encode import *
from .bip39_mnemonic import *
from .bip39_mnemonic import *
from .bip32 import *

View File

@ -0,0 +1,269 @@
import os
import hmac
import struct
from secp256k1 import ffi
from struct import pack, unpack
from hashlib import pbkdf2_hmac
from binascii import hexlify, unhexlify
from pybtc.constants import *
from .encode import *
from .hash import *
from .key import *
def create_master_xprivate_key(seed, testnet=False, base58=True, hex=False):
"""
Create extended private key from seed
:param str,bytes key: seed HEX or bytes string.
:param boolean base58: (optional) return result as base58 encoded string, by default True.
:param boolean hex: (optional) return result as HEX encoded string, by default False.
In case True base58 flag value will be ignored.
:return: extended private key in base58, HEX or bytes string format.
"""
if isinstance(seed, str):
seed = bytes.fromhex(seed)
if not isinstance(seed, bytes):
raise TypeError("seed should be bytes or hex encoded string")
i = hmac_sha512(b"Bitcoin seed", seed)
m, c = i[:32], i[32:]
m_int = int.from_bytes(m, byteorder="big")
if m_int <= 0 or m_int > ECDSA_SEC256K1_ORDER:
return None
prefix = TESTNET_XPRIVATE_KEY_PREFIX if testnet else MAINNET_XPRIVATE_KEY_PREFIX
key = b''.join([prefix,
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00',
c, b'\x00', m])
if base58:
key = b"".join([key, double_sha256(key)[:4]])
return encode_base58(key)
else:
return key
def xprivate_to_xpublic_key(xprivate_key, base58=True, hex=False):
"""
Get extended public key from extended private key using ECDSA secp256k1
:param str,bytes key: extended private key in base58, HEX or bytes string.
:param boolean base58: (optional) return result as base58 encoded string, by default True.
:param boolean hex: (optional) return result as HEX encoded string, by default False.
In case True base58 flag value will be ignored.
:return: extended public key in base58, HEX or bytes string format.
"""
if isinstance(xprivate_key, str):
try:
if len(xprivate_key) == 156:
xprivate_key = bytes.fromhex(xprivate_key)
else:
xprivate_key = decode_base58_with_checksum(xprivate_key)
except:
raise ValueError("invalid extended private key")
if not isinstance(xprivate_key, bytes):
raise TypeError("extended private key should be base58 string or bytes")
if xprivate_key[:4] == TESTNET_XPRIVATE_KEY_PREFIX:
prefix = TESTNET_XPUBLIC_KEY_PREFIX
elif xprivate_key[:4] == MAINNET_XPRIVATE_KEY_PREFIX:
prefix = MAINNET_XPUBLIC_KEY_PREFIX
else:
raise ValueError("invalid extended private key")
key = b"".join([prefix,
xprivate_key[4:45],
private_to_public_key(xprivate_key[46:], hex=False)])
if hex:
return key.hex()
elif base58:
key = b"".join([key, double_sha256(key)[:4]])
return encode_base58(key)
else:
return key
def derive_xkey(xkey, *path_level, base58=True, hex=False):
"""
Child Key derivation for extended private/public keys
:param bytes xkey: extended private/public in base58, HEX or bytes string format.
:param list path_level: list of derivation path levels. For hardened derivation use HARDENED_KEY flag.
:param boolean base58: (optional) return result as base58 encoded string, by default True.
:param boolean hex: (optional) return result as HEX encoded string, by default False.
In case True base58 flag value will be ignored.
:return: extended child private/public key in base58, HEX or bytes string format.
"""
xkey = decode_base58_with_checksum(xkey)
if xkey[:4] in [MAINNET_XPRIVATE_KEY_PREFIX, TESTNET_XPRIVATE_KEY_PREFIX]:
for i in path_level:
xkey = derive_child_xprivate_key(xkey, i)
elif xkey[:4] in [MAINNET_XPUBLIC_KEY_PREFIX, TESTNET_XPUBLIC_KEY_PREFIX]:
for i in path_level:
xkey = derive_child_xpublic_key(xkey, i)
else:
raise ValueError("invalid extended key")
if hex:
return xkey.hex()
elif base58:
return encode_base58_with_checksum(xkey)
else:
return xkey
def derive_child_xprivate_key(xprivate_key, i):
c = xprivate_key[13:45]
k = xprivate_key[45:]
depth = xprivate_key[4] + 1
if depth > 255:
raise ValueError("path depth should be <= 255")
pub = private_to_public_key(k[1:], hex=False)
fingerprint = hash160(pub)[:4]
s = hmac_sha512(c, b"%s%s" % (k if i >= HARDENED_KEY else pub, struct.pack(">L", i)))
p_int = int.from_bytes(s[:32],byteorder='big')
if p_int >= ECDSA_SEC256K1_ORDER:
return None
k_int = (int.from_bytes(k[1:], byteorder='big') + p_int) % ECDSA_SEC256K1_ORDER
if not k_int:
return None
key = int.to_bytes(k_int, byteorder = "big", length=32)
return b"".join([xprivate_key[:4],
bytes([depth]),
fingerprint,
struct.pack(">L", i),
s[32:],
b'\x00',
key])
def derive_child_xpublic_key(xpublic_key, i):
c = xpublic_key[13:45]
k = xpublic_key[45:]
fingerprint = hash160(k)[:4]
depth = xpublic_key[4] + 1
if depth > 255:
raise ValueError("path depth should be <= 255")
if i >= HARDENED_KEY:
raise ValueError("derivation from extended public key impossible")
s = hmac_sha512(c, k + struct.pack(">L", i))
if int.from_bytes(s[:32], byteorder='big') >= ECDSA_SEC256K1_ORDER:
return None
pubkey_ptr = ffi.new('secp256k1_pubkey *')
if not secp256k1.secp256k1_ec_pubkey_parse(ECDSA_CONTEXT_VERIFY, pubkey_ptr, k, len(k)):
raise RuntimeError("secp256k1 parse public key operation failed")
if not secp256k1.secp256k1_ec_pubkey_tweak_add(ECDSA_CONTEXT_ALL, pubkey_ptr, s[:32]):
raise RuntimeError("secp256k1 parse tweak addition operation failed")
pubkey = ffi.new('char [%d]' % 33)
outlen = ffi.new('size_t *', 33)
if not secp256k1.secp256k1_ec_pubkey_serialize(ECDSA_CONTEXT_VERIFY, pubkey, outlen, pubkey_ptr, EC_COMPRESSED):
raise RuntimeError("secp256k1 serialize public key operation failed")
pk = bytes(ffi.buffer(pubkey, 33))
print(len(pk))
return b"".join([xpublic_key[:4],
bytes([depth]),
fingerprint,
struct.pack(">L", i),
s[32:],
pk])
def public_from_xpublic_key(xpublic_key, hex=True):
"""
Get public key from extended public key
:param bytes xpublic_key: extended public in base58, HEX or bytes string format.
:param boolean base58: (optional) return result as base58 encoded string, by default True.
:param boolean hex: (optional) return result as HEX encoded string, by default False.
In case True base58 flag value will be ignored.
:return: public key in HEX or bytes string format.
"""
if isinstance(xpublic_key, str):
if len(xpublic_key) == 156:
xpublic_key = bytes.fromhex(xpublic_key)
else:
xpublic_key = decode_base58_with_checksum(xpublic_key)
if not isinstance(xpublic_key, bytes):
raise TypeError("xpublic_key should be HEX, Base58 or bytes string")
if xpublic_key[:4] not in [MAINNET_XPUBLIC_KEY_PREFIX,
TESTNET_XPUBLIC_KEY_PREFIX]:
raise ValueError("invalid extended public key")
return xpublic_key[45:].hex() if hex else xpublic_key[45:]
def private_from_xprivate_key(xprivate_key, wif=True, hex=False):
"""
Get private key from extended private key
:param bytes xprivate_key: extended public in base58, HEX or bytes string format.
:param boolean wif: (optional) return result as WIF format, by default True.
:param boolean hex: (optional) return result as HEX encoded string, by default False.
In case True WIF flag value will be ignored.
:return: private key in HEX or bytes string format.
"""
if isinstance(xprivate_key, str):
if len(xprivate_key) == 156:
xprivate_key = bytes.fromhex(xprivate_key)
else:
xprivate_key = decode_base58_with_checksum(xprivate_key)
if not isinstance(xprivate_key, bytes):
raise TypeError("xprivate_key should be HEX, Base58 or bytes string")
if xprivate_key[:4] not in [MAINNET_XPRIVATE_KEY_PREFIX,
TESTNET_XPRIVATE_KEY_PREFIX]:
raise ValueError("invalid extended private key")
if hex:
return xprivate_key[46:].hex()
elif wif:
if xprivate_key[:4] == MAINNET_XPRIVATE_KEY_PREFIX:
testnet = False
else:
testnet = True
return private_key_to_wif(xprivate_key[46:], testnet=testnet)
return xprivate_key[46:].hex() if hex else xprivate_key[46:]
def is_xprivate_key_valid(key):
"""
Check the extended private key is valid according to BIP-0032.
:param key: extended private key in BASE58, HEX or bytes string format.
:return: boolean.
"""
if isinstance(key, str):
try:
key = decode_base58_with_checksum(key)
except:
try:
key = bytes.fromhex(key)
except:
pass
if not isinstance(key, bytes) or len(key)!=78:
return False
if key[:4] not in [MAINNET_XPRIVATE_KEY_PREFIX,
TESTNET_XPRIVATE_KEY_PREFIX]:
return False
return True
def is_xpublic_key_valid(key):
"""
Check the extended private key is valid according to BIP-0032.
:param key: extended private key in BASE58, HEX or bytes string format.
:return: boolean.
"""
if isinstance(key, str):
try:
key = decode_base58_with_checksum(key)
except:
try:
key = bytes.fromhex(key)
except:
pass
if not isinstance(key, bytes) or len(key)!=78:
return False
if key[:4] not in [MAINNET_XPUBLIC_KEY_PREFIX,
TESTNET_XPUBLIC_KEY_PREFIX]:
return False
return True

View File

@ -22,14 +22,14 @@ def generate_entropy(strength=256, hex=True):
"""
if strength not in [128, 160, 192, 224, 256]:
raise ValueError('strength should be one of the following [128, 160, 192, 224, 256]')
a = random.SystemRandom().randint(0, MAX_INT_PRIVATE_KEY)
a = random.SystemRandom().randint(0, ECDSA_SEC256K1_ORDER)
i = int((time.time() % 0.01 ) * 100000)
h = a.to_bytes(32, byteorder="big")
# more entropy from system timer and sha256 derivation
while i:
h = hashlib.sha256(h).digest()
i -= 1
if not i and int.from_bytes(h, byteorder="big") > MAX_INT_PRIVATE_KEY:
if not i and int.from_bytes(h, byteorder="big") > ECDSA_SEC256K1_ORDER:
i += 1
return h[:int(strength/8)] if not hex else h[:int(strength/8)].hex()
@ -75,29 +75,15 @@ def entropy_to_mnemonic(entropy, language='english', word_list_dir=None, word_li
if len(entropy) not in [16, 20, 24, 28, 32]:
raise ValueError(
'entropy length should be one of the following: [16, 20, 24, 28, 32]')
if word_list is None:
word_list = load_word_list(language, word_list_dir)
elif not isinstance(word_list, list) or len(word_list) != 2048:
raise TypeError("invalid wordl ist type")
raise TypeError("invalid word list type")
# checksum
mask = 0b10000000
data_int = int.from_bytes(entropy, byteorder="big")
data_bit_len = len(entropy) * 8 // 32
fbyte_hash = sha256(entropy)[0]
while data_bit_len:
data_bit_len -= 1
data_int = (data_int << 1) | 1 if fbyte_hash & mask else data_int << 1
mask = mask >> 1
mnemonic = []
while data_int:
mnemonic.append(word_list[data_int & 0b11111111111])
data_int = data_int >> 11
return " ".join(mnemonic[::-1])
i = int.from_bytes(entropy, byteorder="big")
# append checksum
i = (i << len(entropy) * 8 // 32) | sha256(entropy)[0]
return " ".join([word_list[i.__rshift__(((d - 1) * 11)) & 2047] for d in range(int(i.bit_length() // 11), 0, -1)])
def mnemonic_to_entropy(mnemonic, language='english', word_list_dir=None,
@ -133,8 +119,8 @@ def mnemonic_to_entropy(mnemonic, language='english', word_list_dir=None,
chk_sum = entropy_int & (2 ** chk_sum_bit_len - 1)
entropy_int = entropy_int >> chk_sum_bit_len
entropy = entropy_int.to_bytes((bit_size - chk_sum_bit_len) // 8, byteorder="big")
fb = sha256(entropy)[0]
assert (fb >> (8 - chk_sum_bit_len)) == chk_sum
if (sha256(entropy)[0] >> (8 - chk_sum_bit_len)) != chk_sum:
raise ValueError("invalid mnemonic checksum")
return entropy if not hex else entropy.hex()

View File

@ -1,3 +1,5 @@
from .hash import double_sha256
b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
base32charset = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
base32charset_upcase = "QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L"
@ -122,3 +124,13 @@ def decode_base58(s):
else:
break
return b'\x00' * pad + res
def encode_base58_with_checksum(b):
return encode_base58(b"%s%s" % (b, double_sha256(b)[:4]))
def decode_base58_with_checksum(s):
b = decode_base58(s)
assert double_sha256(b[:-4])[:4] == b[-4:]
return b[:-4]

View File

@ -1,404 +0,0 @@
import os
import hmac
from secp256k1 import ffi
from struct import pack, unpack
from hashlib import pbkdf2_hmac
from binascii import hexlify, unhexlify
from .constants import *
from .functions import *
# Hierarchical Deterministic Wallets (HD Wallets)
# BIP-0032/0044
def create_xmaster_key(seed, testnet=False):
"""
Creating master private key from seed
:param bytes seed: cryptographically secure seed.
:param bool testnet: if True, the check will be executed for TESTNET wallets.
:return: extended private key (xprivate key) in dict format (fields: version, key, chain_code, depth, child, finger_print, is_private).
"""
if testnet:
version = TESTNET_PRIVATE_WALLET_VERSION
else:
version = MAINNET_PRIVATE_WALLET_VERSION
key = b'Bitcoin seed'
intermediary = hmac_sha512(key, seed)
mkey = intermediary[:32]
chain_code = intermediary[32:]
if is_xprivate_key_valid(mkey) and is_xprivate_key_valid(chain_code):
return dict(version=version,
key=mkey,
depth=0,
child=0,
finger_print=b'\x00\x00\x00\x00',
chain_code=chain_code,
is_private=True)
else:
return None
def create_xpublic_key(key):
"""
Creating parent xpublic key from xprivate key
:param dict key: xprivate key.
:return: extended public key (xpublic key) in dict format (fields: version, key, chain_code, depth, child, finger_print, is_private).
"""
if key['is_private']:
if key['version'] == TESTNET_PRIVATE_WALLET_VERSION:
version = TESTNET_PUBLIC_WALLET_VERSION
else:
version = MAINNET_PUBLIC_WALLET_VERSION
pubkey = private_to_public_key(key['key'], hex=False)
return dict(version=version,
key=pubkey,
depth=key['depth'],
child=key['child'],
finger_print=key['finger_print'],
chain_code=key['chain_code'],
is_private=False)
return None
def derive_xkey(seed, *path_level, bip44=True, testnet=True, wif=True):
"""
Key derivation
:param bytes seed: cryptographically secure seed.
:param list path_level: list of levels in BIP32 path. For BIP-0044 of 5 levels. For bip44 is True can be None or empty list.
:param bool bip44: define specification BIP-0044, by default True.
:param bool testnet: if True, the derivation will be executed for TESTNET wallets.
:param bool wif: define xkey wallet import format, by default True.
:return: string (serialized xprivate key).
"""
if not bip44:
if not len(path_level):
raise TypeError("not specified path levels")
mkey = create_xmaster_key(seed, testnet)
xkey = create_child_privkey(mkey, path_level[0])
for idx in path_level[1:]:
xkey = create_child_privkey(xkey, idx)
# сериализация и кодирование ключа
if wif:
result = encode_base58(serialize_xkey(xkey))
else:
result = serialize_xkey(xkey)
return result
else:
if not is_validate_path_level(path_level, testnet):
raise TypeError("path level does not match BIP-0044 - https://github.com/bitcoin/bips/blob/master/bip-0044.mediawiki")
elif not len(path_level):
if testnet:
path_level = TESTNET_PATH_LEVEL_BIP0044
else:
path_level = PATH_LEVEL_BIP0044
mkey = create_xmaster_key(seed, testnet)
xkey = create_child_privkey(mkey, path_level[0])
for idx in path_level[1:]:
xkey = create_child_privkey(xkey, idx)
# сериализация и кодирование ключа
if wif:
result = encode_base58(serialize_xkey(xkey))
else:
result = serialize_xkey(xkey)
return result
def xprivate_to_xpublic_key(xprv, encode_b58=True):
"""
Get xpublic key from xprivate key
:param str xprv: extended private in base58 format (serialized).
:param bool wif: define return format (encoded base58 or bytes string), by default True is encode base58.
:return: string (serialized xpublic key).
"""
if is_xprivate_key_valid(xprv):
xprivkey = deserialize_xkey(xprv)
xpubkey = create_xpublic_key(xprivkey)
if encode_b58:
return encode_base58(serialize_xkey(xpubkey))
return serialize_xkey(xpubkey)
else:
raise TypeError("Private key must be serialized according to BIP-0032 - " \
"https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format")
# получение из расширенного приватного ключа обычный приватный ключ
def xkey_to_private_key(xkey, wif=True, hex=False):
"""
Get private key from xprivate key
:param str xkey: extended private key in base58 format (serialized).
:param bool wif: define xkey return, by default wallet import format. If wif up then hex ignore.
:param bool hex: define xkey return format (hex or bytes string).
:return: string (wif or hex) or bytes string.
"""
if is_xprivate_key_valid(xkey):
xprivkey = deserialize_xkey(xkey)
privkey = xprivkey['key']
if xprivkey['version'] in TESTNET_PRIVATE_WALLET_VERSION:
testnet = True
else:
testnet = False
if wif:
return private_key_to_wif(privkey, testnet=testnet)
elif hex:
return hexlify(privkey).decode()
return privkey
else:
raise TypeError("Private key must be serialized according to BIP-0032 - " \
"https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format")
# получение из расширенного приватного/публичного ключа обычный публичный ключ
def xkey_to_public_key(xkey, hex=False):
"""
Get public key from xprivate/xpublic key
:param str xkey: extended private or extended public key in base58 format (serialized).
:param bool hex: define xkey return format (hex or bytes string).
:return: string or bytes string.
"""
if is_xprivate_key_valid(xkey):
xkey = xprivate_to_xpublic_key(xkey)
if is_xpublic_key_valid(xkey):
xpubkey = deserialize_xkey(xkey)
pubkey = xpubkey['key']
if xpubkey['version'] in TESTNET_PUBLIC_WALLET_VERSION:
testnet = True
else:
testnet = False
if hex:
return hexlify(pubkey).decode()
return pubkey
else:
raise TypeError("Private or public key must be serialized according to BIP-0032 - " \
"https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format")
# Создание дочернего приватного ключа
def create_child_privkey(key, child_idx):
"""
Get child xprivate key from parent xprivate key
:param dict key: extended private key in dict format.
:param int8 child_idx: chidl index.
:return: dict (xprivate key).
"""
if key['is_private']:
if child_idx < FIRST_HARDENED_CHILD:
expanded_privkey = create_expanded_key(key, child_idx)
else:
expanded_privkey = create_expanded_hard_key(key, child_idx)
if expanded_privkey:
child_chain_code = expanded_privkey[32:]
child_privkey = add_private_keys(expanded_privkey[:32], key['key'])
if is_xprivate_key_valid(child_privkey):
finger_print = hash160(private_to_public_key(key['key'], hex=False))[:4]
return dict(version=key['version'],
key=child_privkey,
depth=key['depth'] + 1,
child=child_idx,
finger_print=finger_print,
chain_code=child_chain_code,
is_private=True)
return None
# создание дочернего публичного ключа
def create_child_pubkey(key, child_idx):
"""
Get child xpublic key from parent xpublic key
:param dict key: extended public key in dict format.
:param int8 child_idx: child index.
:return: dict (xpublic key).
"""
if not key['is_private']:
expanded_pubkey = create_expanded_key(key, child_idx)
if expanded_pubkey:
child_chain_code = expanded_pubkey[32:]
ext_value = private_to_public_key(expanded_pubkey[:32], hex=False)
child_pubkey = add_public_keys(ext_value, key['key'])
if is_xpublic_key_valid(child_pubkey):
finger_print = hash160(key['key'])[:4]
return dict(version=key['version'],
key=child_pubkey,
depth=key['depth'] + 1,
child=child_idx,
finger_print=finger_print,
chain_code=child_chain_code,
is_private=False)
return None
# Создание расширенного приватного/публичного ключа
def create_expanded_key(key, child_idx):
"""
Get intermediary expanded key from parent xprivate/xpublic key
:param dict key: extended private or public key in dict format.
:param int8 child_idx: child index.
:return: bytes string.
"""
if isinstance(key, dict):
if not key.get('is_private') and child_idx < FIRST_HARDENED_CHILD:
seed = key['key'] + pack('I', child_idx)
return hmac_sha512(key['chain_code'], seed)
elif key.get('is_private') and child_idx < FIRST_HARDENED_CHILD:
public_key = private_to_public_key(key['key'], hex=False)
seed = public_key + pack('I', child_idx)
return hmac_sha512(key['chain_code'], seed)
return None
# Создание усиленного расширенного приватного ключа
def create_expanded_hard_key(key, child_idx):
"""
Get intermediary hardened key from parent xprivate key
:param dict key: extended private key in dict format.
:param int8 child_idx: child index.
:return: bytes string.
"""
if isinstance(key, dict):
if key.get('is_private') and child_idx >= FIRST_HARDENED_CHILD:
seed = bytes([0]) + key['key'] + pack('I', child_idx)
return hmac_sha512(key['chain_code'], seed)
return None
def add_private_keys(ext_value, key):
ext_value_int = int.from_bytes(ext_value, byteorder="big")
key_int = int.from_bytes(key, byteorder="big")
ext_value_int = (ext_value_int + key_int) % MAX_INT_PRIVATE_KEY
return ext_value_int.to_bytes((ext_value_int.bit_length() + 7) // 8, byteorder="big")
def add_public_keys(ext_value, key):
pubkey_ptr = ffi.new('secp256k1_pubkey *')
if not secp256k1.secp256k1_ec_pubkey_parse(ECDSA_CONTEXT_VERIFY, pubkey_ptr, ext_value, len(ext_value)):
raise TypeError("public key format error")
if secp256k1.secp256k1_ec_pubkey_tweak_add(ECDSA_CONTEXT_ALL, pubkey_ptr, key):
pubkey = ffi.new('char [%d]' % 33)
outlen = ffi.new('size_t *', 33)
if secp256k1.secp256k1_ec_pubkey_serialize(ECDSA_CONTEXT_VERIFY, pubkey, outlen, pubkey_ptr, EC_COMPRESSED):
return bytes(ffi.buffer(pubkey, 33))
return None
def is_xpublic_key_valid(key):
"""
Check extended public key is valid according to BIP-0032.
:param key: extended public key in BASE58 or bytes string format.
:return: boolean.
"""
if isinstance(key, str):
if not key[:4] in ['xpub', 'tpub']:
return False
elif len(key) != 111:
return False
return True
def is_xprivate_key_valid(key):
"""
Check the extended private key is valid according to BIP-0032.
:param key: extended private key in BASE58 or bytes string format.
:return: boolean.
"""
if isinstance(key, bytes):
key_int = int.from_bytes(key, byteorder="big")
if key_int > 0 and key_int < MAX_INT_PRIVATE_KEY and len(key) == 32:
return True
elif isinstance(key, str):
if len(key) == 111 and key[:4] in ['xprv', 'tprv']:
return True
return False
def is_validate_path_level(path_level, testnet):
"""
Check path level is valid according to BIP-0044.
:param list path_level: list of 5 levels in BIP32 path.
:param testnet: if True, the check will be executed for TESTNET wallets.
:return: boolean.
"""
if not len(path_level):
return True
elif len(path_level) == 5:
if path_level[0] != 0x8000002C:
return False
elif testnet and path_level[1] != 0x80000001:
return False
elif not testnet and path_level[1] != 0x80000000:
return False
elif path_level[2] < 0x80000000:
return False
return True
return False
def serialize_xkey(key):
"""
Serialization of extended keys.
:param dict key: extended private or public key in Dict format.
:return: bytes string.
"""
try:
key_bytes = key['key']
if key.get('is_private'):
key_bytes = bytes(1) + key_bytes
result = key['version']
result += pack('B', key['depth'])
result += key['finger_print']
result += pack('I', key['child'])
result += key['chain_code']
result += key_bytes
chk_sum = double_sha256(result)[:4]
return result + chk_sum
except:
raise Exception('Serialization error')
def deserialize_xkey(encode_key):
"""
Deserialization of extended keys.
:param str key: extended private or public key in base58 format.
:return: bytes string.
"""
raw_key = decode_base58(encode_key)
decode_key = dict()
if raw_key[:4] in [MAINNET_PUBLIC_WALLET_VERSION, MAINNET_PRIVATE_WALLET_VERSION]:
decode_key['version'] = raw_key[:4]
decode_key['depth'] = unpack('B', raw_key[4:5])[0]
decode_key['finger_print'] = raw_key[5:9]
decode_key['child'] = unpack('I', raw_key[9:13])[0]
decode_key['chain_code'] = raw_key[13:45]
if decode_key['version'] in [MAINNET_PRIVATE_WALLET_VERSION]:
decode_key['is_private'] = True
decode_key['key'] = raw_key[46:78]
else:
decode_key['is_private'] = False
decode_key['key'] = raw_key[45:78]
chk_sum = raw_key[78:]
if chk_sum != double_sha256(raw_key[:-4])[:4]:
raise TypeError("key checksum does not match")
if decode_key:
return decode_key
return None

142
pybtc/wallet.py Normal file
View File

@ -0,0 +1,142 @@
import os
import hmac
from secp256k1 import ffi
from struct import pack, unpack
from hashlib import pbkdf2_hmac
from binascii import hexlify, unhexlify
from .constants import *
from .functions import *
# Hierarchical Deterministic Wallets (HD Wallets)
# BIP-44 supprt
class Wallet():
"""
The class for creating wallet object.
:param init_vector: (optional) initialization vector should be mnemonic phrase, extended public key,
extended private key, by default None (generate new wallet).
:param compressed: (optional) if set to True private key corresponding compressed public key,
by default set to True. Recommended use only compressed public key.
:param testnet: (optional) if set to True mean that this private key for testnet Bitcoin network.
"""
def __init__(self, init_vector=None, passphrase="", language='english', word_list_dir=None, word_list=None):
if init_vector is None:
e = generate_entropy()
m = entropy_to_mnemonic(e)
self.mnemonic = m
init_vector = create_master_xprivate_key(mnemonic_to_seed(m), base58=False)
else:
if isinstance(init_vector, str):
if is_xprivate_key_valid(init_vector):
if len(init_vector) == 156:
init_vector = bytes.fromhex(init_vector)
else:
init_vector = decode_base58_with_checksum(init_vector)
elif is_xpublic_key_valid(init_vector):
if len(init_vector) == 156:
init_vector = bytes.fromhex(init_vector)
else:
init_vector = decode_base58_with_checksum(init_vector)
else:
try:
self.mnemonic = init_vector
self.passphrase = passphrase
init_vector = create_master_xprivate_key(mnemonic_to_seed(init_vector,
passphrase=passphrase),
base58=False)
except Exception as err:
raise ValueError("invalid initial vector %s" % err)
if not isinstance(init_vector, bytes):
raise ValueError("invalid initial vector")
self.accounts = dict()
self.extended_key = self.deserialize_xkey(init_vector)
def deserialize_xkey(self, xkey):
if isinstance(xkey, str):
xkey = decode_base58_with_checksum(xkey)
extended_key = dict()
extended_key['version'] = xkey[:4].hex()
extended_key['depth'] = unpack('B', xkey[4:5])[0]
extended_key['fingerprint'] = xkey[5:9].hex()
extended_key['child'] = unpack('I', xkey[9:13])[0]
extended_key['chain_code'] = xkey[13:45].hex()
info = ["Derived"] if extended_key['depth'] != 0 else ["Master"]
if xkey[:4] in [MAINNET_XPRIVATE_KEY_PREFIX, MAINNET_XPUBLIC_KEY_PREFIX]:
info.append("Mainnet")
extended_key["testnet"] = False
else:
info.append("Testnet")
extended_key["testnet"] = True
info.append("Extended")
if xkey[:4] in [MAINNET_XPRIVATE_KEY_PREFIX, TESTNET_XPRIVATE_KEY_PREFIX]:
testnet = False if xkey[:4] == MAINNET_XPRIVATE_KEY_PREFIX else True
extended_key['private_key'] = private_key_to_wif(xkey[46:78], testnet=testnet)
info.append("Private")
extended_key["type"] = "private"
else:
info.append("Public")
extended_key['public_key'] = xkey[45:78].hex()
extended_key["type"] = "public"
info.append("Key")
extended_key["info"] = " ".join(info)
extended_key["key"] = encode_base58_with_checksum(xkey)
return extended_key
def create_account(self,name, path):
self.accounts[name] = {"extended_key": self.deserialize_xkey(derive_xkey(self.extended_key["key"],
*path)),
"path": path}
def create_bip44_account(self, account=0):
if self.extended_key["depth"] != 0:
raise Exception("Create bip44 account only possible from Master private key")
if not isinstance(account, int):
raise ValueError("account should be integer")
self.create_account("%s_external" % account, [44|HARDENED_KEY, HARDENED_KEY, account, 0])
self.create_account("%s_internal" % account, [44|HARDENED_KEY, HARDENED_KEY, account, 1])
def get_bip44_address(self, i, chain="external", account_index=0, address_type="P2WPKH"):
print(chain)
if chain not in ("internal", "external"):
raise ValueError("chain should be inetrnal or external")
account_name = "%s_%s" % (account_index, chain)
if account_name not in self.accounts:
self.create_bip44_account(account=account_index)
return self.get_chain_address(i, account=account_name, address_type=address_type)
def get_chain_address(self, i, account=None, address_type="P2WPKH"):
if account is None:
xkey = self.extended_key["key"]
key_type = self.extended_key["type"]
testnet = self.extended_key["testnet"]
else:
xkey = self.accounts[account]["extended_key"]["key"]
key_type = self.accounts[account]["extended_key"]["type"]
testnet = self.accounts[account]["extended_key"]["testnet"]
xkey = derive_xkey(xkey, i)
if key_type == "public":
address = public_key_to_address(public_from_xpublic_key(xkey), testnet=testnet)
r = {"address": address,
"public_key": public_from_xpublic_key(xkey)}
elif key_type == "private":
private_key = private_from_xprivate_key(xkey)
if address_type == "P2WPKH":
address = public_key_to_address(private_to_public_key(private_key), testnet=testnet)
elif address_type == "P2SH_P2WPKH":
address = public_key_to_address(private_to_public_key(private_key), p2sh_p2wpkh=True,
testnet=testnet)
elif address_type == "P2PKH":
address = public_key_to_address(private_to_public_key(private_key), witness_version=None,
testnet=testnet)
r = {"address": address,
"public_key": private_to_public_key(private_key),
"private_key": private_key}
return r

View File

@ -1,12 +1,12 @@
# from .hash_functions import *
# from .integer import *
# from .address_functions import *
# from .address_class import *
# from .ecdsa import *
# from .transaction_deserialize import *
# from .transaction_constructor import *
# from .sighash import *
# from .block import *
from .hash_functions import *
from .integer import *
from .address_functions import *
from .address_class import *
from .ecdsa import *
from .transaction_deserialize import *
from .transaction_constructor import *
from .sighash import *
from .block import *
from .mnemonic import *
# from .script_deserialize import *

View File

@ -22,6 +22,14 @@ class BlockDeserializeTests(unittest.TestCase):
self.assertEqual(mnemonic_to_entropy(mnemonic), entropy)
self.assertEqual(mnemonic_to_seed(mnemonic), seed)
print(generate_entropy())
print(generate_entropy(128))
self.assertEqual(create_master_xprivate_key(seed),
"xprv9s21ZrQH143K2hwbLgL4Rh1Vvk4F44e51kK2gdUWF9UbMXbySexrVp3ekFN2fbAQQpsZeakuk"
"RBpxr5y2cMwTCi7Fuyv7TYpu5zgDFB4UFE")
xpriv = "xprv9s21ZrQH143K2hwbLgL4Rh1Vvk4F44e51kK2gdUWF9UbMXbySexrVp3ekFN2fbAQQpsZeakukRBpxr5y2c" \
"MwTCi7Fuyv7TYpu5zgDFB4UFE"
xpub = "xpub661MyMwAqRbcFC24Shs4npxEUmtjTXMvNyEdV1t7oV1aEKw7zCH73cN8bWyUWRUNzJ6NyVssfhZziyTUFB6" \
"J3HQkd9xe9GGzk1rMK81JL4b"
self.assertEqual(xprivate_to_xpublic_key(xpriv), xpub)
self.assertEqual(private_from_xprivate_key(xpriv), "L2VnL3zxnNE1jRSemyP7U6PvWuNLvuV5iMJdc2RJGALjZ6HYik7y")
self.assertEqual(public_from_xpublic_key(xpub),
private_to_public_key("L2VnL3zxnNE1jRSemyP7U6PvWuNLvuV5iMJdc2RJGALjZ6HYik7y"))

View File

@ -11,7 +11,7 @@ parentPath = os.path.abspath("..")
if parentPath not in sys.path:
sys.path.insert(0, parentPath)
from pybtc.hdwallet import *
from pybtc.wallet import *
from pybtc.tools import encode_base58, decode_base58

View File

@ -3,7 +3,7 @@ import random
import hashlib
import hmac
from binascii import hexlify, unhexlify
from pybtc.hdwallet import *
from pybtc.wallet import *