Initial commit

This commit is contained in:
admin 2018-01-10 18:02:32 +04:00
parent c60963feab
commit 1b20ae72bd
16 changed files with 1803 additions and 0 deletions

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

6
pybtc/__init__.py Normal file
View File

@ -0,0 +1,6 @@
# from .tools import *
# from .opcodes import *
from .consensus import *
from .blockchain import *
version = "2.0.1"

612
pybtc/blockchain.py Normal file
View File

@ -0,0 +1,612 @@
import io
import json
import math
from .opcodes import *
from .tools import *
from .consensus import *
from binascii import hexlify, unhexlify
def get_stream(stream):
if type(stream) != io.BytesIO:
if type(stream) == str:
stream = unhexlify(stream)
if type(stream) == bytes:
stream = io.BytesIO(stream)
else:
raise TypeError
return stream
class Opcode():
""" Class opcode """
def __init__(self, raw_opcode, data, data_length = b""):
self.raw = raw_opcode
if self.raw in RAW_OPCODE:
if self.raw in (OPCODE["OP_PUSHDATA1"], OPCODE["OP_PUSHDATA2"], OPCODE["OP_PUSHDATA4"]):
self.str = '<%s>' % len(data)
else:
self.str = RAW_OPCODE[self.raw]
elif self.raw < b'L':
self.str = '<%s>' % len(data)
else:
self.str = '[?]'
self.data = data
self.data_length = data_length
def __str__(self):
return self.str
@classmethod
def to_raw(cls, name):
if name in OPCODE:
return OPCODE[name]
else:
return b''
@classmethod
def pop_from_stream (cls, stream):
b = stream.read(1)
if not b: return None
data = b''
data_length = b''
if b <= OPCODE["OP_PUSHDATA4"]:
if b < OPCODE["OP_PUSHDATA1"]: s = int.from_bytes(b,'little')
elif b == OPCODE["OP_PUSHDATA1"]:
data_length = stream.read(1)
s = int.from_bytes( data_length ,'little')
elif b == OPCODE["OP_PUSHDATA2"]:
data_length = stream.read(2)
s = int.from_bytes( data_length ,'little')
elif b == OPCODE["OP_PUSHDATA4"]:
data_length = stream.read(4)
s = int.from_bytes( data_length ,'little')
data = stream.read(s)
if len(data)!=s:
return None
raise Exception('opcode read error')
return cls(b,data,data_length)
class Script():
"""
Bitcoin script class
"""
def __init__(self, raw_script, coinbase = False, segwit = True):
if type(raw_script) == str:
raw_script = unhexlify(raw_script)
self.raw = raw_script
stream = io.BytesIO(raw_script)
self.script = []
self.address = list()
self.pattern = bytearray()
self.asm = bytearray()
self.data = b''
self.type = "NON_STANDARD"
self.ntype = 7
self.op_sig_count = 0
if coinbase:
self.pattern = b"<coinbase>"
self.asm = hexlify(raw_script).decode()
return
t = time.time()
while True:
o = Opcode.pop_from_stream(stream)
if o is None:
break
if o.raw == OPCODE["OP_CHECKSIG"] or o.raw == OPCODE["OP_CHECKSIGVERIFY"]:
self.op_sig_count += 1
if o.raw ==OPCODE["OP_CHECKMULTISIG"]:
self.op_sig_count += 20
self.script.append(o)
self.pattern += o.str.encode() + b' '
if o.data:
self.asm += hexlify(o.data) + b' '
else:
self.asm += o.str.encode() + b' '
self.asm = self.asm.decode().rstrip()
self.pattern= self.pattern.decode().rstrip()
# check script type
if self.pattern == "OP_DUP OP_HASH160 <20> OP_EQUALVERIFY OP_CHECKSIG":
self.type = "P2PKH"
self.ntype = 0
self.address.append(self.script[2].data)
elif self.pattern == "OP_HASH160 <20> OP_EQUAL":
self.type = "P2SH"
self.ntype = 1
self.address.append(self.script[1].data)
elif self.pattern == "<65> OP_CHECKSIG" or self.pattern == "<33> OP_CHECKSIG" :
self.type = "PUBKEY"
self.ntype = 2
self.address.append(hash160(self.script[0].data))
elif len(self.script) == 2 and self.script[0].raw == OPCODE["OP_RETURN"]:
# OP_RETURN
if len(self.script[1].data) < NULL_DATA_LIMIT: # <0 to 80 bytes of data>
self.data = self.script[1].data
self.type = "NULL_DATA"
self.ntype = 3
elif len(self.script)>= 4:
if self.script[-1].raw == OPCODE["OP_CHECKMULTISIG"] \
and self.script[-2].raw <= OPCODE["OP_15"] \
and self.script[-2].raw >= OPCODE["OP_1"] : # OP_CHECKMULTISIG "OP_1" "OP_16"
if self.script[0].raw <= OPCODE["OP_15"] \
and self.script[0].raw >= OPCODE["OP_1"]:
self.op_sig_count = 0
for o in self.script[1:-2]:
if not o.data:
self.op_sig_count = 20
break
self.op_sig_count += 1
self.address.append(hash160(o.data))
else:
self.bare_multisig_accepted = ord(self.script[0].raw) - 80
self.bare_multisig_from = ord(self.script[-2].raw) - 80
self.type = "MULTISIG"
self.ntype = 4
elif segwit:
if self.pattern == "OP_0 <20>":
self.type = "P2WPKH"
self.op_sig_count = 1
self.ntype = 5
self.address.append(b"\x00"+self.script[1].data)
elif self.pattern == "OP_0 <32>":
self.type = "P2WSH"
self.ntype = 6
self.address.append(b"\x00"+self.script[1].data)
class Input:
""" Transaction Input class """
# outpoint = (b'00f0f09...',n')
# script = raw bytes
# sequense = int
def __init__(self, outpoint, script, sequence, amount = None, private_key = None):
if type(outpoint[0]) == str:
outpoint = (unhexlify(outpoint[0])[::-1], outpoint[1])
if type(outpoint[0]) == str:
private_key = WIF2priv(private_key)
self.outpoint = outpoint
self.sequence = sequence
self.pk_script = None
self.amount = amount
self.private_key = private_key
self.p2sh_type = None
self.coinbase = False
if outpoint == (b'\x00'*32 ,0xffffffff): self.coinbase = True
self.sig_script = Script(script, self.coinbase)
self.double_spend = None
self.lock = False
self.addresses = []
self.redeem_script = None
if len(self.sig_script.script) > 0:
try:
if len(self.sig_script.script[-1].data) <= 520:
self.redeem_script = Script(self.sig_script.script[-1].data)
else:
pass
except Exception as err:
pass
@classmethod
def deserialize(cls, stream):
stream = get_stream(stream)
outpoint = stream.read(32), int.from_bytes(stream.read(4), 'little')
script_len = from_var_int(read_var_int(stream))
script = stream.read(script_len)
sequence = int.from_bytes(stream.read(4), 'little')
return cls(outpoint, script, sequence)
class Output:
""" Transactin output class """
def __init__(self, value, script):
self.value = value
self.pk_script = Script(script)
@classmethod
def deserialize(cls, stream):
stream = get_stream(stream)
value = int.from_bytes(stream.read(8), 'little')
script_len = from_var_int(read_var_int(stream))
pk_script = stream.read(script_len)
return cls(value, pk_script)
class Witness:
def __init__(self, data, empty = False):
self.empty = empty
self.witness = [b"\x00"] if empty else data
def __str__(self):
return json.dumps([hexlify(w).decode() for w in self.witness])
def hex(self):
return [hexlify(w).decode() for w in self.witness]
@classmethod
def deserialize(cls, stream):
stream = get_stream(stream)
empty = True
witness_len = from_var_int(read_var_int(stream))
witness = []
if witness_len:
for i in range(witness_len):
l = from_var_int(read_var_int(stream))
w = stream.read(l)
witness.append(w)
empty = False
return cls(witness, empty)
def serialize(self):
if self.empty:
return b'\x00'
n = to_var_int(len(self.witness))
for w in self.witness:
n += to_var_int(len(w)) + w
return n
class Transaction():
def __init__(self, version = 1, tx_in = [], tx_out = [] , lock_time = 0,
hash=None, size = 0, timestamp = None,
marker = None, flag = None, witness = [],
whash = None, vsize = None):
self.hash = hash
self.whash = whash
self.vsize = vsize
self.witness = witness
self.marker = marker
self.flag = flag
self.valid = True
self.lock = False
self.orphaned = False
self.in_sum = None
self.tx_fee = None
self.version = version
self.tx_in_count = len(tx_in)
self.tx_in = tx_in
self.tx_out_count = len (tx_out)
self.tx_out = tx_out
self.lock_time = lock_time
if self.tx_in:
self.coinbase = self.tx_in[0].coinbase
else:
self.coinbase = False
self.double_spend = 0
self.data = None
self.ip = None
self.size = size
if timestamp is not None : self.timestamp = timestamp
else: self.timestamp = int(time.time())
self.op_sig_count = 0
self.sum_value_age = 0
self.total_outs_value = 0
for i in self.tx_out:
self.op_sig_count += i.pk_script.op_sig_count
if i.pk_script.type=="NULL_DATA":
self.data = i.pk_script.data
for out in self.tx_out:
self.total_outs_value += out.value
if witness is None:
self.witness = [Witness.deserialize(b"\x00") for i in range(len(tx_in))]
if hash is None:
self.recalculate_txid()
def recalculate_txid(self):
self.hash = double_sha256(self.serialize(segwit=False))
self.whash = double_sha256(self.serialize(segwit=True))
def add_input(self, tx_hash, output_number,
sequence = 0xffffffff,
sig_script = b"",
amount = None,
private_key = None):
self.tx_in.append(Input((tx_hash, output_number), sig_script, sequence, amount, private_key))
self.witness.append(Witness.deserialize(b"\x00"))
self.tx_in_count += 1
self.recalculate_txid()
def add_P2SH_output(self, amount, p2sh_address):
if type(p2sh_address)==str:
p2sh_address = decode_base58(p2sh_address)[1:-4]
if len(p2sh_address) != 20:
raise Exception("Invalid output hash160")
self.tx_out.append(Output(amount,
OPCODE["OP_HASH160"] + b'\x14' + p2sh_address + OPCODE["OP_EQUAL"]))
self.tx_out_count += 1
self.recalculate_txid()
def add_P2PKH_output(self, amount, p2pkh_address):
if type(p2pkh_address)==str:
p2pkh_address = decode_base58(p2pkh_address)[1:-4]
if len(p2pkh_address) != 20:
raise p2pkh_address("Invalid output hash160")
self.tx_out.append(Output(amount,
OPCODE["OP_DUP"] + OPCODE["OP_HASH160"] + b'\x14' + \
p2pkh_address + OPCODE["OP_EQUALVERIFY"] + OPCODE["OP_CHECKSIG"]))
self.tx_out_count += 1
self.recalculate_txid()
def __str__(self):
return 'Transaction object [%s] [%s]'% (hexlify(self.hash[::-1]),id(self))
def serialize(self, segwit = False, hex = False):
version = self.version.to_bytes(4,'little')
ninputs = to_var_int(self.tx_in_count)
inputs = []
for number, i in enumerate(self.tx_in):
input = i.outpoint[0]+i.outpoint[1].to_bytes(4,'little')
input += to_var_int(len(i.sig_script.raw)) + i.sig_script.raw
input += i.sequence.to_bytes(4,'little')
inputs.append(input)
nouts = to_var_int(self.tx_out_count)
outputs = []
for number, i in enumerate(self.tx_out):
outputs.append(i.value.to_bytes(8,'little')+to_var_int(len(i.pk_script.raw))+i.pk_script.raw)
marke_flag = b"\x00\x01" if segwit else b""
witness = b""
if segwit:
for w in self.witness:
witness += w.serialize()
result = version + marke_flag + ninputs + b''.join(inputs) +\
nouts + b''.join(outputs) + witness + self.lock_time.to_bytes(4,'little')
if hex:
return hexlify(result).decode()
else:
return result
def sign_P2SHP2WPKH_input(self, sighash_type, input_index, private_key = None, amount = None):
if type(private_key) == str:
private_key = WIF2priv(private_key)
if amount is not None:
self.tx_in[input_index].amount = amount
else:
amount = self.tx_in[input_index].amount
if private_key is not None:
self.tx_in[input_index].private_key = private_key
else:
private_key = self.tx_in[input_index].private_key
pubkey = priv2pub(private_key, True)
pubkey_hash160 = hash160(pubkey)
scriptCode = b"\x19" + OPCODE["OP_DUP"] + OPCODE["OP_HASH160"]
scriptCode += b'\x14' + pubkey_hash160 + OPCODE["OP_EQUALVERIFY"] + OPCODE["OP_CHECKSIG"]
self.tx_in[input_index].sig_script = Script(b'\x16\x00\x14' + pubkey_hash160) # P2WPKHredeemScript
sighash = self.sighash_segwit(sighash_type, input_index, scriptCode, amount)
signature = sign_message(sighash, private_key) + sighash_type.to_bytes(1,'little')
self.witness[input_index] = Witness([signature, pubkey])
self.recalculate_txid()
def sign_P2PKH_input(self, sighash_type, input_index, compressed = True, private_key = None):
if private_key is not None:
self.tx_in[input_index].private_key = private_key
else:
private_key = self.tx_in[input_index].private_key
pubkey = priv2pub(private_key, compressed)
pubkey_hash160 = hash160(pubkey)
scriptCode = OPCODE["OP_DUP"] + OPCODE["OP_HASH160"] + b'\x14' + \
pubkey_hash160 + OPCODE["OP_EQUALVERIFY"] + OPCODE["OP_CHECKSIG"]
sighash = self.sighash(sighash_type, input_index, scriptCode)
signature = sign_message(sighash, private_key) + sighash_type.to_bytes(1, 'little')
sig_script = len(signature).to_bytes(1, 'little') + signature + \
len(pubkey).to_bytes(1, 'little') + pubkey
self.tx_in[input_index].sig_script = Script(sig_script)
self.recalculate_txid()
def sighash(self, sighash_type, input_index, scriptCode, hex = False):
if type(scriptCode) == str:
scriptCode = unhexlify(scriptCode)
if len(self.tx_in) - 1 < input_index:
raise Exception('Input not exist')
preimage = bytearray()
if ((sighash_type&31) == SIGHASH_SINGLE) and (input_index>(len(self.tx_out)-1)):
return double_sha256(b'\x01'+b'\x00'*31 + sighash_type.to_bytes(4, 'little'))
preimage += self.version.to_bytes(4,'little')
preimage += b'\x01' if sighash_type & SIGHASH_ANYONECANPAY else to_var_int(self.tx_in_count)
for number, i in enumerate(self.tx_in):
if (sighash_type & SIGHASH_ANYONECANPAY) and (input_index != number): continue
input = i.outpoint[0]+i.outpoint[1].to_bytes(4,'little')
if sighash_type == 0 or input_index == number:
input += ((to_var_int(len(scriptCode)) + scriptCode) if sighash_type else \
(to_var_int(len(i.sig_script.raw)) + i.sig_script.raw)) + i.sequence.to_bytes(4,'little')
else:
input += b'\x00' + (i.sequence.to_bytes(4,'little') if \
((sighash_type&31) == SIGHASH_ALL) else b'\x00\x00\x00\x00')
preimage += input
preimage += b'\x00' if (sighash_type&31) == SIGHASH_NONE else ( to_var_int(input_index + 1) if \
(sighash_type&31) == SIGHASH_SINGLE else to_var_int(self.tx_out_count))
if (sighash_type&31) != SIGHASH_NONE:
for number, i in enumerate(self.tx_out):
if number > input_index and (sighash_type&31) == SIGHASH_SINGLE: continue
preimage +=(b'\xff'*8+b'\x00' if (sighash_type&31) == SIGHASH_SINGLE and (input_index != number)\
else i.value.to_bytes(8,'little')+to_var_int(len(i.pk_script.raw))+i.pk_script.raw)
preimage += self.lock_time.to_bytes(4,'little')
preimage += sighash_type.to_bytes(4, 'little')
return double_sha256(preimage) if not hex else hexlify(double_sha256(preimage)).decode()
def sighash_segwit(self, sighash_type, input_index, scriptCode, amount, hex = False):
if type(scriptCode) == str:
scriptCode = unhexlify(scriptCode)
if len(self.tx_in)-1 < input_index:
raise Exception('Input not exist')
preimage = bytearray()
# 1. nVersion of the transaction (4-byte little endian)
preimage += self.version.to_bytes(4,'little')
# 2. hashPrevouts (32-byte hash)
# 3. hashSequence (32-byte hash)
# 4. outpoint (32-byte hash + 4-byte little endian)
# 5. scriptCode of the input (serialized as scripts inside CTxOuts)
# 6. value of the output spent by this input (8-byte little endian)
# 7. nSequence of the input (4-byte little endian)
hp = bytearray()
hs = bytearray()
for n, i in enumerate(self.tx_in):
if not (sighash_type & SIGHASH_ANYONECANPAY):
hp += i.outpoint[0] + i.outpoint[1].to_bytes(4,'little')
if (sighash_type&31) != SIGHASH_SINGLE and (sighash_type&31) != SIGHASH_NONE:
hs += i.sequence.to_bytes(4,'little')
if n == input_index:
outpoint = i.outpoint[0]+i.outpoint[1].to_bytes(4,'little')
nSequence = i.sequence.to_bytes(4,'little')
hashPrevouts = double_sha256(hp) if hp else b'\x00'*32
hashSequence = double_sha256(hs) if hs else b'\x00'*32
value = amount.to_bytes(8,'little')
# 8. hashOutputs (32-byte hash)
ho = bytearray()
for n, o in enumerate(self.tx_out):
if (sighash_type&31) != SIGHASH_SINGLE and (sighash_type&31) != SIGHASH_NONE:
ho += o.value.to_bytes(8,'little')+to_var_int(len(o.pk_script.raw))+o.pk_script.raw
elif (sighash_type&31) == SIGHASH_SINGLE and input_index < len(self.tx_out):
if input_index == n:
ho += o.value.to_bytes(8, 'little') + to_var_int(len(o.pk_script.raw)) + o.pk_script.raw
hashOutputs = double_sha256(ho) if ho else b'\x00'*32
preimage += hashPrevouts + hashSequence + outpoint + scriptCode + value + nSequence + hashOutputs
preimage += self.lock_time.to_bytes(4, 'little')
preimage += sighash_type.to_bytes(4, 'little')
return double_sha256(preimage) if not hex else hexlify(double_sha256(preimage)).decode()
def json(self):
r = dict()
r["txid"] = rh2s(self.hash)
r["wtxid"] = r["txid"] if self.whash is None else rh2s(self.whash)
r["size"] = self.size
r["vsize"] = self.vsize
r["version"] = self.version
r["locktime"] = self.lock_time
r["vin"] = list()
r["vout"] = list()
for i in self.tx_in:
input = {"txid": rh2s(i.outpoint[0]),
"vout": i.outpoint[1],
"scriptSig": {"hex": hexlify(i.sig_script.raw).decode(),
"asm": i.sig_script.asm},
"sequence": i.sequence}
if i.coinbase:
input["coinbase"] = hexlify(i.sig_script.raw).decode()
r["vin"].append(input)
if self.witness is not None:
for index, w in enumerate(self.witness):
r["vin"][index]["witness"] = w.hex()
for index, o in enumerate(self.tx_out):
out = {"value": o.value,
"n": index,
"scriptPubKey": {"hex": hexlify(o.pk_script.raw).decode()},
"asm": o.pk_script.asm,
"type": o.pk_script.type}
r["vout"].append(out)
return json.dumps(r)
@classmethod
def deserialize(cls, stream):
stream = get_stream(stream)
raw_tx = bytearray()
raw_wtx = bytearray()
start = stream.tell()
version = int.from_bytes(stream.read(4), 'little')
marker = stream.read(1)
flag = stream.read(1)
if marker == b"\x00" and flag == b"\x01":
# segwit format
point1 = stream.tell()
tx_in = read_var_list(stream, Input)
tx_out = read_var_list(stream, Output)
point2 = stream.tell()
inputs_count = len(tx_in)
witness = [Witness.deserialize(stream) for i in range(inputs_count)]
point3 = stream.tell()
lock_time = int.from_bytes(stream.read(4), 'little')
# calculate tx_id hash
size = stream.tell() - start
stream.seek(start)
raw_tx += stream.read(4)
stream.seek(2,1)
raw_tx += stream.read(point2 - point1)
stream.seek(point3-point2, 1)
raw_tx += stream.read(4)
tx_id = double_sha256(raw_tx)
for w in witness:
if not w.empty:
# caluculate wtx_id
stream.seek(start)
data = stream.read(size)
wtx_id = double_sha256(data)
break
else:
wtx_id = tx_id
vsize = math.ceil((len(raw_tx) * 3 + size) / 4)
else:
stream.seek(start)
marker = b"\x00"
flag = b"\x01"
version = int.from_bytes(stream.read(4), 'little')
tx_in = read_var_list(stream, Input)
tx_out = read_var_list(stream, Output)
witness = [Witness.deserialize(b"\x00") for i in range(len(tx_in))]
lock_time = int.from_bytes(stream.read(4), 'little')
size = stream.tell() - start
stream.seek(start)
data = stream.read(size)
tx_id = double_sha256(data)
wtx_id = None
vsize = size
return cls(version, tx_in, tx_out, lock_time,
hash = tx_id, size = size,
marker = marker, flag = flag,
witness = witness, whash = wtx_id, vsize = vsize)
class Block():
def __init__(self, version, prev_block, merkle_root,
timestamp, bits, nonce, txs, block_size,hash=None):
self.hash = hash
self.version = version
self.prev_block = prev_block
self.merkle_root = merkle_root
self.timestamp = timestamp
self.bits = bits
self.nonce = nonce
self.txs = txs
self.block_size = block_size
self.height = None
self.id = None
self.chain = None
self.amount = 0
self.mountpoint = None
self.side_branch_set = None
self.tx_hash_list = list()
self.op_sig_count = 0
for t in txs:
if t.hash in txs:
raise Exception("CVE-2012-2459") # merkle tree malleability
self.op_sig_count += t.op_sig_count
self.tx_hash_list.append(t.hash)
self.target = None
self.fee = 0
@classmethod
def deserialize(cls, stream):
stream = get_stream(stream)
header = stream.read(80)
stream.seek(-80, 1)
kwargs = {
'hash': hashlib.sha256(hashlib.sha256(header).digest()).digest(),
'version': int.from_bytes(stream.read(4), 'little'),
'prev_block': stream.read(32),
'merkle_root': stream.read(32),
'timestamp': int.from_bytes(stream.read(4), 'little'),
'bits': int.from_bytes(stream.read(4), 'little'),
'nonce': int.from_bytes(stream.read(4), 'little'),
'txs': read_var_list(stream, Transaction),
'block_size': stream.tell()
}
return cls(**kwargs)

68
pybtc/consensus.py Normal file
View File

@ -0,0 +1,68 @@
MAX_BLOCK_SIZE = 1000000
MAX_STANDARD_TX_SIZE = 100000
MAX_P2SH_SIGOPS = 15
MAX_BLOCK_SIGOPS = MAX_BLOCK_SIZE/50
MAX_STANDARD_TX_SIGOPS = MAX_BLOCK_SIGOPS/5
MIN_FEE = 10
MAX_SCRIPT_ELEMENT_SIZE = 520
MAX_OPS_PER_SCRIPT = 201
MAX_PUBKEYS_PER_MULTISIG = 20
NULL_DATA_LIMIT = 80
# SCRIPT VERIFICATION FLAGS
SCRIPT_VERIFY_NONE = 0b0000000000000001
# ??
SCRIPT_VERIFY_P2SH = 0b0000000000000010
# Evaluate P2SH subscripts (softfork safe, BIP16).
SCRIPT_VERIFY_STRICTENC = 0b0000000000000100
# Passing a non-strict-DER signature or one with undefined hashtype to a
# checksig operation causes script failure.
# Evaluating a pubkey that is not (0x04 + 64 bytes) or (0x02 or 0x03 + 32 bytes)
# by checksig causes script failure.
# (softfork safe, but not used or intended as a consensus rule).
SCRIPT_VERIFY_DERSIG = 0b0000000000001000
# Passing a non-strict-DER signature to a checksig operation causes script failure
# (softfork safe, BIP62 rule 1)
SCRIPT_VERIFY_LOW_S = 0b0000000000010000
# Passing a non-strict-DER signature or one with S > order/2 to a checksig operation
# causes script failure
# (softfork safe, BIP62 rule 5).
SCRIPT_VERIFY_NULLDUMMY = 0b0000000000100000
# verify dummy stack item consumed by CHECKMULTISIG is of zero-length
# (softfork safe, BIP62 rule 7).
SCRIPT_VERIFY_SIGPUSHONLY = 0b0000000001000000
# Using a non-push operator in the scriptSig causes script failure
# (softfork safe, BIP62 rule 2).
SCRIPT_VERIFY_MINIMALDATA = 0b0000000010000000
# Require minimal encodings for all push operations (OP_0... OP_16, OP_1NEGATE
# where possible, direct pushes up to 75 bytes, OP_PUSHDATA up to 255 bytes,
# OP_PUSHDATA2 for anything larger). Evaluating any other push causes the script
# to fail (BIP62 rule 3). In addition, whenever a stack element is interpreted
# as a number, it must be of minimal length (BIP62 rule 4).
SCRIPT_VERIFY_DISCOURAGE_UPGRADABLE_NOPS = 0b0000000100000000
# Discourage use of NOPs reserved for upgrades (NOP1-10)
# Provided so that nodes can avoid accepting or mining transactions
# containing executed NOP's whose meaning may change after a soft-fork,
# thus rendering the script invalid; with this flag set executing
# discouraged NOPs fails the script. This verification flag will never be
# a mandatory flag applied to scripts in a block. NOPs that are not
# executed, e.g. within an unexecuted IF ENDIF block, are *not* rejected.
SCRIPT_VERIFY_CLEANSTACK = 0b0000001000000000
# Require that only a single stack element remains after evaluation. This changes the success criterion from
# "At least one stack element must remain, and when interpreted as a boolean, it must be true" to
# "Exactly one stack element must remain, and when interpreted as a boolean, it must be true".
# (softfork safe, BIP62 rule 6)
# Note: CLEANSTACK should never be used without P2SH.
SCRIPT_VERIFY_CHECKLOCKTIMEVERIFY = 0b0000010000000000
# See BIP65 for details.
SCRIPT_VERIFY_CHECKSEQUENCEVERIFY = 0b0000100000000000
# See BIP112 for details
MANDATORY_SCRIPT_VERIFY_FLAGS = SCRIPT_VERIFY_P2SH
STANDARD_SCRIPT_VERIFY_FLAGS = MANDATORY_SCRIPT_VERIFY_FLAGS |\
SCRIPT_VERIFY_STRICTENC |\
SCRIPT_VERIFY_MINIMALDATA |\
SCRIPT_VERIFY_NULLDUMMY |\
SCRIPT_VERIFY_DISCOURAGE_UPGRADABLE_NOPS |\
SCRIPT_VERIFY_CLEANSTACK

148
pybtc/opcodes.py Normal file
View File

@ -0,0 +1,148 @@
OPCODE = {"OP_0": b'\x00',
"OP_PUSHDATA1": b'L',
"OP_PUSHDATA2": b'M',
"OP_PUSHDATA4": b'N',
"OP_1NEGATE": b'O',
"OP_RESERVED": b'P',
"OP_1": b'Q',
"OP_2": b'R',
"OP_3": b'S',
"OP_4": b'T',
"OP_5": b'U',
"OP_6": b'V',
"OP_7": b'W',
"OP_8": b'X',
"OP_9": b'Y',
"OP_10": b'Z',
"OP_11": b'[',
"OP_12": b'\\',
"OP_13": b']',
"OP_14": b'^',
"OP_15": b'_',
"OP_16": b'`',
"OP_NOP": b'a',
"OP_VER": b'b',
"OP_IF": b'c',
"OP_NOTIF": b'd',
"OP_VERIF": b'e',
"OP_VERNOTIF": b'f',
"OP_ELSE": b'g',
"OP_ENDIF": b'h',
"OP_VERIFY": b'i',
"OP_RETURN": b'j',
"OP_TOALTSTACK": b'k',
"OP_FROMALTSTACK": b'l',
"OP_2DROP": b'm',
"OP_2DUP": b'n',
"OP_3DUP": b'o',
"OP_2OVER": b'p',
"OP_2ROT": b'q',
"OP_2SWAP": b'r',
"OP_IFDUP": b's',
"OP_DEPTH": b't',
"OP_DROP": b'u',
"OP_DUP": b'v',
"OP_NIP": b'w',
"OP_OVER": b'x',
"OP_PICK": b'y',
"OP_ROLL": b'z',
"OP_ROT": b'{',
"OP_SWAP": b'|',
"OP_TUCK": b'}',
"OP_CAT": b'~',
"OP_SUBSTR": b'\x7f',
"OP_LEFT": b'\x80',
"OP_RIGHT": b'\x81',
"OP_SIZE": b'\x82',
"OP_INVERT": b'\x83',
"OP_AND": b'\x84',
"OP_OR": b'\x85',
"OP_XOR": b'\x86',
"OP_EQUAL": b'\x87',
"OP_EQUALVERIFY": b'\x88',
"OP_RESERVED1": b'\x89',
"OP_RESERVED2": b'\x8a',
"OP_1ADD": b'\x8b',
"OP_1SUB": b'\x8c',
"OP_2MUL": b'\x8d',
"OP_2DIV": b'\x8e',
"OP_NEGATE": b'\x8f',
"OP_ABS": b'\x90',
"OP_NOT": b'\x91',
"OP_0NOTEQUAL": b'\x92',
"OP_ADD": b'\x93',
"OP_SUB": b'\x94',
"OP_MUL": b'\x95',
"OP_DIV": b'\x96',
"OP_MOD": b'\x97',
"OP_LSHIFT": b'\x98',
"OP_RSHIFT": b'\x99',
"OP_BOOLAND": b'\x9a',
"OP_BOOLOR": b'\x9b',
"OP_NUMEQUAL": b'\x9c',
"OP_NUMEQUALVERIFY": b'\x9d',
"OP_NUMNOTEQUAL": b'\x9e',
"OP_LESSTHAN": b'\x9f',
"OP_GREATERTHAN": b'\xa0',
"OP_LESSTHANOREQUAL": b'\xa1',
"OP_GREATERTHANOREQUAL": b'\xa2',
"OP_MIN": b'\xa3',
"OP_MAX": b'\xa4',
"OP_WITHIN": b'\xa5',
"OP_RIPEMD160": b'\xa6',
"OP_SHA1": b'\xa7',
"OP_SHA256": b'\xa8',
"OP_HASH160": b'\xa9',
"OP_HASH256": b'\xaa',
"OP_CODESEPARATOR": b'\xab',
"OP_CHECKSIG": b'\xac',
"OP_CHECKSIGVERIFY": b'\xad',
"OP_CHECKMULTISIG": b'\xae',
"OP_CHECKMULTISIGVERIFY": b'\xaf',
"OP_NOP1": b'\xb0',
"OP_NOP2": b'\xb1',
"OP_NOP3": b'\xb2',
"OP_NOP4": b'\xb3',
"OP_NOP5": b'\xb4',
"OP_NOP6": b'\xb5',
"OP_NOP7": b'\xb6',
"OP_NOP8": b'\xb7',
"OP_NOP9": b'\xb8',
"OP_NOP10": b'\xb9',
"OP_NULLDATA": b'\xfc',
"OP_PUBKEYHASH": b'\xfd',
"OP_PUBKEY": b'\xfe',
"OP_INVALIDOPCODE": b'\xff'}
RAW_OPCODE = dict ( (OPCODE[i], i) for i in OPCODE )
DISABLED_OPCODE = set ((
# OPCODE["OP_RETURN"],
OPCODE["OP_CAT"],
OPCODE["OP_SUBSTR"],
OPCODE["OP_LEFT"],
OPCODE["OP_RIGHT"],
OPCODE["OP_LEFT"],
OPCODE["OP_LEFT"],
OPCODE["OP_AND"],
OPCODE["OP_OR"],
OPCODE["OP_XOR"],
OPCODE["OP_2MUL"],
OPCODE["OP_2DIV"],
OPCODE["OP_MUL"],
OPCODE["OP_DIV"],
OPCODE["OP_MOD"],
OPCODE["OP_LSHIFT"],
OPCODE["OP_RSHIFT"],
OPCODE["OP_RESERVED"],
# OPCODE["OP_VER"],
OPCODE["OP_VERIF"],
OPCODE["OP_VERNOTIF"],
OPCODE["OP_RESERVED1"],
OPCODE["OP_RESERVED2"],
OPCODE["OP_PUBKEYHASH"],
OPCODE["OP_PUBKEY"],
OPCODE["OP_INVALIDOPCODE"]
))

586
pybtc/tools.py Normal file
View File

@ -0,0 +1,586 @@
import hashlib
from binascii import hexlify, unhexlify
import time
import random
import struct
import hmac
from secp256k1 import lib as secp256k1
from secp256k1 import ffi
SIGHASH_ALL = 0x00000001
SIGHASH_NONE = 0x00000002
SIGHASH_SINGLE = 0x00000003
SIGHASH_ANYONECANPAY = 0x00000080
MAX_INT_PRIVATE_KEY = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
EC_COMPRESSED = secp256k1.SECP256K1_EC_COMPRESSED
EC_UNCOMPRESSED = secp256k1.SECP256K1_EC_UNCOMPRESSED
FLAG_SIGN = secp256k1.SECP256K1_CONTEXT_SIGN
FLAG_VERIFY = secp256k1.SECP256K1_CONTEXT_VERIFY
ALL_FLAGS = FLAG_SIGN | FLAG_VERIFY
NO_FLAGS = secp256k1.SECP256K1_CONTEXT_NONE
HAS_RECOVERABLE = hasattr(secp256k1, 'secp256k1_ecdsa_sign_recoverable')
HAS_SCHNORR = hasattr(secp256k1, 'secp256k1_schnorr_sign')
HAS_ECDH = hasattr(secp256k1, 'secp256k1_ecdh')
ECDSA_CONTEXT_SIGN = secp256k1.secp256k1_context_create(FLAG_SIGN)
ECDSA_CONTEXT_VERIFY = secp256k1.secp256k1_context_create(FLAG_VERIFY)
ECDSA_CONTEXT_ALL = secp256k1.secp256k1_context_create(ALL_FLAGS)
secp256k1.secp256k1_context_randomize(ECDSA_CONTEXT_SIGN,
random.SystemRandom().randint(0,MAX_INT_PRIVATE_KEY).to_bytes(32,byteorder="big"))
SCRIPT_TYPES = { "P2PKH": 0,
"P2SH" : 1,
"PUBKEY": 2,
"NULL_DATA": 3,
"MULTISIG": 4,
"NON_STANDART": 5,
"SP2PKH": 6
}
b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
#
# Encoding functions
#
def encode_base58(b):
"""Encode bytes to a base58-encoded string"""
# Convert big-endian bytes to integer
n = int('0x0' + hexlify(b).decode('utf8'), 16)
# Divide that integer into bas58
res = []
while n > 0:
n, r = divmod(n, 58)
res.append(b58_digits[r])
res = ''.join(res[::-1])
# Encode leading zeros as base58 zeros
czero = 0
pad = 0
for c in b:
if c == czero:
pad += 1
else:
break
return b58_digits[0] * pad + res
def decode_base58(s):
"""Decode a base58-encoding string, returning bytes"""
if not s:
return b''
# Convert the string to an integer
n = 0
for c in s:
n *= 58
if c not in b58_digits:
raise Exception('Character %r is not a valid base58 character' % c)
digit = b58_digits.index(c)
n += digit
# Convert the integer to bytes
h = '%x' % n
if len(h) % 2:
h = '0' + h
res = unhexlify(h.encode('utf8'))
# Add padding back.
pad = 0
for c in s[:-1]:
if c == b58_digits[0]:
pad += 1
else:
break
return b'\x00' * pad + res
#
# Hash functions
#
def sha256(bytes):
return hashlib.sha256(bytes).digest()
def double_sha256(bytes):
return sha256(sha256(bytes))
def hmac_sha512(key, data):
return hmac.new(key, data, hashlib.sha512).digest()
def ripemd160(bytes):
h = hashlib.new('ripemd160')
h.update(bytes)
return h.digest()
def hash160(bytes):
return ripemd160(sha256(bytes))
#
# Bitcoin keys/ addresses
#
def create_priv():
"""
:return: 32 bytes private key
"""
q = time.time()
rnd = random.SystemRandom()
a = rnd.randint(0,MAX_INT_PRIVATE_KEY)
i = int((time.time()%0.01)*100000)
h = a.to_bytes(32,byteorder="big")
while True:
h = hashlib.sha256(h).digest()
if i>1: i -= 1
else:
if int.from_bytes(h,byteorder="big")<MAX_INT_PRIVATE_KEY:
break
return h
def priv_from_int(k):
return int.to_bytes(k,byteorder="big",length=32)
def priv2WIF(h, compressed = False, testnet = False):
#uncompressed: 0x80 + [32-byte secret] + [4 bytes of Hash() of previous 33 bytes], base58 encoded
#compressed: 0x80 + [32-byte secret] + 0x01 + [4 bytes of Hash() previous 34 bytes], base58 encoded
prefix = b'\x80'
if testnet:
prefix = b'\xef'
h = prefix + h
if compressed: h += b'\x01'
h += hashlib.sha256(hashlib.sha256(h).digest()).digest()[:4]
return encode_base58(h)
def WIF2priv(h):
h = decode_base58(h)
return h[1:33]
def is_WIF_valid(wif):
if wif[0] not in ['5', 'K', 'L', '9', 'c']:
return False
h = decode_base58(wif)
if len(h) != 37: return False
checksum = h[-4:]
if hashlib.sha256(hashlib.sha256(h[:-4]).digest()).digest()[:4] != checksum: return False
return True
def priv2pub(private_key, compressed = True, hex = False):
if type(private_key)!= bytes:
if type(private_key) == bytearray:
private_key = bytes(private_key)
elif type(private_key) == str:
private_key = unhexlify(private_key)
else:
raise TypeError("private key must be a bytes or hex encoded string")
pubkey_ptr = ffi.new('secp256k1_pubkey *')
r = secp256k1.secp256k1_ec_pubkey_create(ECDSA_CONTEXT_ALL, pubkey_ptr, private_key)
assert r == 1
len_key = 33 if compressed else 65
pubkey = ffi.new('char [%d]' % len_key)
outlen = ffi.new('size_t *', len_key)
compflag = EC_COMPRESSED if compressed else EC_UNCOMPRESSED
r = secp256k1.secp256k1_ec_pubkey_serialize(ECDSA_CONTEXT_VERIFY, pubkey, outlen, pubkey_ptr, compflag)
assert r == 1
pub = bytes(ffi.buffer(pubkey, len_key))
return hexlify(pub).decode() if hex else pub
def is_valid_pub(key):
if len(key) < 33:
return False
if key[0] == 0x04 and len(key) != 65:
return False
elif key[0] == 0x02 or key[0] == 0x03:
if len(key) != 33:
return False
# else: return False
return True
#
# Bitcoin addresses
#
def hash1602address(hash160, testnet = False, p2sh = False):
if type(hash160) == str:
hash160 = unhexlify(hash160)
if not p2sh:
prefix = b'\x6f' if testnet else b'\x00'
else:
prefix = b'\xc4' if testnet else b'\x05'
hash160 = prefix + hash160
hash160 += double_sha256(hash160)[:4]
return encode_base58(hash160)
def address2hash160(address):
return decode_base58(address)[1:-4]
def address_type(address):
if address[0] in ('2', '3'):
return 'P2SH'
if address[0] in ('1', 'm', 'n'):
return 'P2PKH'
return 'UNKNOWN'
def pub2address(pubkey, testnet = False, p2sh = False):
h = hash160(pubkey)
return hash1602address(h, testnet = testnet, p2sh = p2sh)
def pub2segwit(pubkey, testnet = False):
return hash1602address(pub2hash160segwit(pubkey),
testnet=testnet,
p2sh=True)
def pub2hash160segwit(pubkey):
return hash160(b'\x00\x14' + hash160(pubkey))
def is_address_valid(addr, testnet = False):
if testnet:
if addr[0] not in ('m', 'n', '2'):
return False
else:
if addr[0] not in ('1','3'):
return False
h = decode_base58(addr)
if len(h) != 25: return False
checksum = h[-4:]
if hashlib.sha256(hashlib.sha256(h[:-4]).digest()).digest()[:4] != checksum: return False
return True
#
# ECDSA
#
def verify_signature(sig, pubKey, msg):
if type(sig) != bytes:
if type(sig) == bytearray:
sig = bytes(sig)
elif type(sig) == str:
sig = unhexlify(sig)
else :
raise TypeError("signature must be a bytes or hex encoded string")
if type(pubKey) != bytes:
if type(pubKey) == bytearray:
pubKey = bytes(pubKey)
elif type(pubKey) == str:
pubKey = unhexlify(pubKey)
else :
raise TypeError("public key must be a bytes or hex encoded string")
if type(msg) != bytes:
if type(msg) == bytearray:
msg = bytes(msg)
elif type(msg) == str:
msg = unhexlify(msg)
else:
raise TypeError("message must be a bytes or hex encoded string")
raw_sig = ffi.new('secp256k1_ecdsa_signature *')
raw_pubkey = ffi.new('secp256k1_pubkey *')
if not secp256k1.secp256k1_ecdsa_signature_parse_der(ECDSA_CONTEXT_VERIFY , raw_sig, sig, len(sig)):
raise TypeError("signature must be DER encoded")
if not secp256k1.secp256k1_ec_pubkey_parse(ECDSA_CONTEXT_VERIFY, raw_pubkey, pubKey, len(pubKey)):
raise TypeError("public key format error")
result = secp256k1.secp256k1_ecdsa_verify(ECDSA_CONTEXT_VERIFY, raw_sig, msg, raw_pubkey)
return True if result else False
def sign_message(msg, private_key, hex = False):
"""
:param msg: message to sign
:param private_key: private key (bytes, hex encoded string)
:param hex:
:return: DER encoded sinature
"""
if type(msg) != bytes:
if type(msg) == bytearray:
msg = bytes(msg)
elif type(msg) == str:
msg = unhexlify(msg)
else :
raise TypeError("message must be a bytes or hex encoded string")
if type(private_key)!= bytes:
if type(private_key) == bytearray:
private_key = bytes(private_key)
elif type(private_key) == str:
private_key = unhexlify(private_key)
else:
raise TypeError("private key must be a bytes or hex encoded string")
raw_sig = ffi.new('secp256k1_ecdsa_signature *')
signed = secp256k1.secp256k1_ecdsa_sign(ECDSA_CONTEXT_SIGN, raw_sig, msg, private_key, ffi.NULL, ffi.NULL)
assert signed == 1
len_sig = 74
output = ffi.new('unsigned char[%d]' % len_sig)
outputlen = ffi.new('size_t *', len_sig)
res = secp256k1.secp256k1_ecdsa_signature_serialize_der(ECDSA_CONTEXT_SIGN, output, outputlen, raw_sig)
assert res == 1
signature = bytes(ffi.buffer(output, outputlen[0]))
return hexlify(signature).decode() if hex else signature
def is_valid_signature_encoding(sig):
# Format: 0x30 [total-length] 0x02 [R-length] [R] 0x02 [S-length] [S] [sighash]
# * total-length: 1-byte length descriptor of everything that follows,
# excluding the sighash byte.
# * R-length: 1-byte length descriptor of the R value that follows.
# * R: arbitrary-length big-endian encoded R value. It must use the shortest
# possible encoding for a positive integers (which means no null bytes at
# the start, except a single one when the next byte has its highest bit set).
# * S-length: 1-byte length descriptor of the S value that follows.
# * S: arbitrary-length big-endian encoded S value. The same rules apply.
# * sighash: 1-byte value indicating what data is hashed (not part of the DER
# signature)
length = len(sig)
# Minimum and maximum size constraints.
if (length < 9) or (length > 73):
return False
# A signature is of type 0x30 (compound).
if sig[0] != 0x30:
return False
# Make sure the length covers the entire signature.
if sig[1] != (length - 3):
return False
# Extract the length of the R element.
lenR = sig[3]
# Make sure the length of the S element is still inside the signature.
if (5 + lenR) >= length:
return False
# Extract the length of the S element.
lenS = sig[5 + lenR]
# Verify that the length of the signature matches the sum of the length
# of the elements.
if (lenR + lenS + 7) != length:
return False
# Check whether the R element is an integer.
if sig[2] != 0x02:
return False
# Zero-length integers are not allowed for R.
if lenR == 0:
return False
# Negative numbers are not allowed for R.
if sig[4] & 0x80:
return False
# Null bytes at the start of R are not allowed, unless R would
# otherwise be interpreted as a negative number.
if (lenR > 1) and (sig[4] == 0x00) and (not sig[5] & 0x80):
return False
# Check whether the S element is an integer.
if sig[lenR + 4] != 0x02:
return False
# Zero-length integers are not allowed for S.
if lenS == 0:
return False
# Negative numbers are not allowed for S.
if sig[lenR + 6] & 0x80:
return False
# Null bytes at the start of S are not allowed, unless S would otherwise be
# interpreted as a negative number.
if (lenS > 1) and (sig[lenR + 6] == 0x00) and (not sig[lenR + 7] & 0x80):
return False
return True
#
# Transaction encoding
#
def rh2s(tthash):
return hexlify(tthash[::-1]).decode()
def s2rh(hash_string):
return unhexlify(hash_string)[::-1]
def merkleroot(tx_hash_list):
tx_hash_list = list(tx_hash_list)
if len(tx_hash_list) == 1:
return tx_hash_list[0]
while True:
new_hash_list = list()
while tx_hash_list:
h1 = tx_hash_list.pop()
try:
h2 = tx_hash_list.pop()
except:
h2 = h1
new_hash_list.insert(0, double_sha256(h1 + h2))
if len(new_hash_list) > 1:
tx_hash_list = new_hash_list
else:
return new_hash_list[0]
#
#
#
def var_int(data):
e, s = 1, 0
if data[:1] == b'\xfd':
s, e = 1, 3
elif data[:1] == b'\xfe':
s = 1
e = 5
elif data[:1] == b'\xff':
s = 1
e = 9
i = int.from_bytes(data[s:e], byteorder='little', signed=False)
return (i, e)
def from_var_int(data):
# retrun
e = 1
s = 0
if data[:1] == b'\xfd':
s = 1
e = 3
elif data[:1] == b'\xfe':
s = 1
e = 5
elif data[:1] == b'\xff':
s = 1
e = 9
i = int.from_bytes(data[s:e], byteorder='little', signed=False)
return i
def var_int_len(byte):
e = 1
if byte == 253:
e = 3
elif byte == 254:
e = 5
elif byte == 255:
e = 9
return e
def to_var_int(i):
if i < 253:
return i.to_bytes(1, byteorder='little')
if i <= 0xffff:
return b'\xfd' + i.to_bytes(2, byteorder='little')
if i <= 0xffffffff:
return b'\xfe' + i.to_bytes(4, byteorder='little')
return b'\xff' + i.to_bytes(8, byteorder='little')
def read_var_int(stream):
l = stream.read(1)
bytes_length = var_int_len(l[0])
return l + stream.read(bytes_length - 1)
def read_var_list(stream, data_type):
count = from_var_int(read_var_int(stream))
return [data_type.deserialize(stream) for i in range(count)]
# generic big endian MPI format
def bn_bytes(v, have_ext=False):
ext = 0
if have_ext:
ext = 1
return ((v.bit_length() + 7) // 8) + ext
def bn2bin(v):
s = bytearray()
i = bn_bytes(v)
while i > 0:
s.append((v >> ((i - 1) * 8)) & 0xff)
i -= 1
return s
def bin2bn(s):
l = 0
for ch in s:
l = (l << 8) | ch
return l
def bn2mpi(v):
have_ext = False
if v.bit_length() > 0:
have_ext = (v.bit_length() & 0x07) == 0
neg = False
if v < 0:
neg = True
v = -v
s = struct.pack(b">I", bn_bytes(v, have_ext))
ext = bytearray()
if have_ext:
ext.append(0)
v_bin = bn2bin(v)
if neg:
if have_ext:
ext[0] |= 0x80
else:
v_bin[0] |= 0x80
return s + ext + v_bin
def mpi2bn(s):
if len(s) < 4:
return None
s_size = bytes(s[:4])
v_len = struct.unpack(b">I", s_size)[0]
if len(s) != (v_len + 4):
return None
if v_len == 0:
return 0
v_str = bytearray(s[4:])
neg = False
i = v_str[0]
if i & 0x80:
neg = True
i &= ~0x80
v_str[0] = i
v = bin2bn(v_str)
if neg:
return -v
return v
# bitcoin-specific little endian format, with implicit size
def mpi2vch(s):
r = s[4:] # strip size
# if r:
r = r[::-1] # reverse string, converting BE->LE
# else: r=b'\x00'
return r
def bn2vch(v):
return bytes(mpi2vch(bn2mpi(v)))
def vch2mpi(s):
r = struct.pack(b">I", len(s)) # size
r += s[::-1] # reverse string, converting LE->BE
return r
def vch2bn(s):
return mpi2bn(vch2mpi(s))
def i2b(i): return bn2vch(i)
def b2i(b): return vch2bn(b)

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
secp256k1

13
setup.py Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
# coding: utf-8
# todo install libsec256 part
from distutils.core import setup
setup(name='pybtc',
version='1.0.1',
description='Bitcoin library',
author='Alexsei Karpov',
author_email='admin@bitaps.com',
url='https://github.com/bitaps-com/pybtc',
packages=['pybtc', ],
)

8
test.py Normal file
View File

@ -0,0 +1,8 @@
import unittest
import test
testLoad = unittest.TestLoader()
suites = testLoad.loadTestsFromModule(test)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suites)

6
test/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .script_deserialize import *
from .hash_functions import *
from .address_functions import *
from .transaction_deserialize import *
from .sighash import *
from .ecdsa import *

14
test/address_functions.py Normal file
View File

@ -0,0 +1,14 @@
import unittest
from pybtc import tools
from binascii import unhexlify
class AddressFunctionsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting address functions:\n")
def test_pub2segwit(self):
print("pub2segwit")
self.assertEqual(tools.pub2segwit(unhexlify("03db633162d49193d1178a5bbb90bde2f3c196ba0296f010b12a2320a7c6568582")),
"3PjV3gFppqmDEHjLvqDWv3Y4riLMQg7X1y")

26
test/ecdsa.py Normal file
View File

@ -0,0 +1,26 @@
import unittest
from pybtc import *
class ECDSATests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting ECDSA:\n")
def test_private_to_public(self):
"""
["raw_transaction, script, input_index, hashType, signature_hash (result)"],
:return:
"""
print("\nPrivate key to Public key ")
k = bytearray.fromhex("eb696a065ef48a2192da5b28b694f87544b30fae8327c4510137a922f32c6dcf")
self.assertEqual(priv2pub(k, hex=True),
"03ad1d8e89212f0b92c74d23bb710c00662ad1470198ac48c43f7d6f93a2a26873")
print("Sign message")
msg = bytearray.fromhex('64f3b0f4dd2bb3aa1ce8566d220cc74dda9df97d8490cc81d89d735c92e59fb6')
self.assertEqual(sign_message(msg, k, True),
"3044022047ac8e878352d3ebbde1c94ce3a10d057c24175747116f8288e5d794d12d482f0220217f36a485cae903c713331d877c1f64677e3622ad4010726870540656fe9dcb")
print("Verify signature")
s = '3044022047ac8e878352d3ebbde1c94ce3a10d057c24175747116f8288e5d794d12d482f0220217f36a485cae903c713331d877c1f64677e3622ad4010726870540656fe9dcb'
self.assertEqual(verify_signature(s,priv2pub(k, hex=True), msg), True)

25
test/hash_functions.py Normal file
View File

@ -0,0 +1,25 @@
import unittest
from pybtc import tools
from binascii import unhexlify
class HashFunctionsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting hash functions:\n")
def test_double_sha256(self):
print("Double SHA256")
self.assertEqual(tools.double_sha256(b"test double sha256"),
unhexlify("1ab3067efb509c48bda198f48c473f034202537c28b7b4c3b2ab2c4bf4a95c8d"))
def test_ripemd160(self):
print("RIPEMD160")
self.assertEqual(tools.ripemd160(b"test ripemd160"),
unhexlify("45b17861a7defaac439f740d890f3dac4813cc37"))
def test_hash160(self):
print("HASH160")
self.assertEqual(tools.ripemd160(b"test hash160"),
unhexlify("46a80bd289028559818a222eea64552d7a6a966f"))

165
test/script_deserialize.py Normal file
View File

@ -0,0 +1,165 @@
import unittest
from pybtc import blockchain
from binascii import unhexlify
from pybtc import address2hash160
class ScriptDeserializeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting Script class deserialization:\n")
def test_p2pkh(self):
print("Deserialize P2PKH")
s = blockchain.Script("76a9143520dd524f6ca66f63182bb23efff6cc8ee3ee6388ac")
self.assertEqual(s.type, "P2PKH")
self.assertEqual(s.ntype, 0)
self.assertEqual(s.asm, "OP_DUP OP_HASH160 3520dd524f6ca66f63182bb23efff6cc8ee3ee63 OP_EQUALVERIFY OP_CHECKSIG")
self.assertEqual(s.address[0], address2hash160("15qvBdqSWQCuLQPXVoWViG2GvjeARmpYPw"))
self.assertEqual(s.pattern, "OP_DUP OP_HASH160 <20> OP_EQUALVERIFY OP_CHECKSIG")
self.assertEqual(s.op_sig_count, 1)
def test_p2sh(self):
print("Deserialize P2SH")
s = blockchain.Script("a91469f37572ab1b69f304f987b119e2450e0b71bf5c87")
self.assertEqual(s.type, "P2SH")
self.assertEqual(s.ntype, 1)
self.assertEqual(s.asm, "OP_HASH160 69f37572ab1b69f304f987b119e2450e0b71bf5c OP_EQUAL")
self.assertEqual(s.address[0], address2hash160("3BMEXVsYyfKB5h3m53XRSFHkqi1zPwsvcK"))
self.assertEqual(s.pattern, "OP_HASH160 <20> OP_EQUAL")
self.assertEqual(s.op_sig_count, 0)
def test_null_data(self):
print("Deserialize NULL_DATA")
# 20 bytes valid
s = blockchain.Script("6a144279b52d6ee8393a9a755e8c6f633b5dd034bd67")
self.assertEqual(s.type, "NULL_DATA")
self.assertEqual(s.ntype, 3)
self.assertEqual(s.asm, "OP_RETURN 4279b52d6ee8393a9a755e8c6f633b5dd034bd67")
self.assertEqual(len(s.address), 0)
self.assertEqual(s.pattern, "OP_RETURN <20>")
self.assertEqual(s.op_sig_count, 0)
# 81 bytes invalid
s = blockchain.Script("6a4c51000000000000000000000000000000000000000000000000000000000000"
"000000000000000000000000000000000000000000000000000000000000"
"000000000000000000000000000000000000000000")
self.assertEqual(s.asm, "OP_RETURN 000000000000000000000000000000000000000000000000000"
"0000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000")
self.assertEqual(s.pattern, "OP_RETURN <81>")
self.assertEqual(len(s.address), 0)
self.assertEqual(s.op_sig_count, 0)
self.assertEqual(s.type, "NON_STANDARD")
self.assertEqual(s.ntype, 7)
def test_multisig(self):
print("Deserialize MULTISIG")
# 15 from 15
s = blockchain.Script("5f210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"71210378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c"
"715fae")
self.assertEqual(s.pattern, "OP_15 <33> <33> <33> <33> <33> <33> <33> <33> <33> <33> <33> <33> "
"<33> <33> <33> OP_15 OP_CHECKMULTISIG")
self.assertEqual(s.type, "MULTISIG")
self.assertEqual(s.ntype, 4)
self.assertEqual(s.asm, "OP_15 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 0378d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab"
"35c71 OP_15 OP_CHECKMULTISIG")
self.assertEqual(len(s.address), 15)
self.assertEqual(s.op_sig_count, 15)
# 1 from 3
s = blockchain.Script("512102953397b893148acec2a9da8341159e9e7fb3d32987c3563e8bdf22116213623"
"441048da561da64584fb1457e906bc2840a1f963b401b632ab98761d12d74dd795bbf"
"410c7b6d6fd39acf9d870cb9726578eaf8ba430bb296eac24957f3fb3395b8b042060"
"466616fb675310aeb024f957b4387298dc28305bc7276bf1f7f662a6764bcdffb6a97"
"40de596f89ad8000f8fa6741d65ff1338f53eb39e98179dd18c6e6be8e3953ae")
self.assertEqual(s.pattern, "OP_1 <33> <65> <66> OP_3 OP_CHECKMULTISIG")
self.assertEqual(s.type, "MULTISIG")
self.assertEqual(s.ntype, 4)
self.assertEqual(s.asm, "OP_1 02953397b893148acec2a9da8341159e9e7fb3d32987c3563e8bdf22116213"
"6234 048da561da64584fb1457e906bc2840a1f963b401b632ab98761d12d74dd79"
"5bbf410c7b6d6fd39acf9d870cb9726578eaf8ba430bb296eac24957f3fb3395b8b"
"0 060466616fb675310aeb024f957b4387298dc28305bc7276bf1f7f662a6764bcd"
"ffb6a9740de596f89ad8000f8fa6741d65ff1338f53eb39e98179dd18c6e6be8e39"
" OP_3 OP_CHECKMULTISIG")
self.assertEqual(len(s.address), 3)
self.assertEqual(s.op_sig_count, 3)
def test_segwit_p2wpkh(self):
print("Deserialize segwit P2WPKH")
s = blockchain.Script("00144160bb1870159a08724557f75c7bb665a3a132e0")
self.assertEqual(s.type, "P2WPKH")
self.assertEqual(s.ntype, 5)
self.assertEqual(s.asm, "OP_0 4160bb1870159a08724557f75c7bb665a3a132e0")
self.assertEqual(s.address[0], unhexlify("004160bb1870159a08724557f75c7bb665a3a132e0"))
self.assertEqual(s.pattern, "OP_0 <20>")
self.assertEqual(s.op_sig_count, 1)
s = blockchain.Script("00144160bb1870159a08724557f75c7bb665a3a132e0", segwit=False)
self.assertEqual(s.type, "NON_STANDARD")
self.assertEqual(s.ntype, 7)
self.assertEqual(s.asm, "OP_0 4160bb1870159a08724557f75c7bb665a3a132e0")
self.assertEqual(len(s.address), 0)
self.assertEqual(s.pattern, "OP_0 <20>")
self.assertEqual(s.op_sig_count, 0)
def test_segwit_p2wsh(self):
print("Deserialize segwit P2WSH")
s = blockchain.Script("0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70")
self.assertEqual(s.type, "P2WSH")
self.assertEqual(s.ntype, 6)
self.assertEqual(s.asm, "OP_0 cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70")
self.assertEqual(s.address[0], unhexlify("00cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70"))
self.assertEqual(s.pattern, "OP_0 <32>")
self.assertEqual(s.op_sig_count, 0)
s = blockchain.Script("0020cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70", segwit=False)
self.assertEqual(s.type, "NON_STANDARD")
self.assertEqual(s.ntype, 7)
self.assertEqual(s.asm, "OP_0 cdbf909e935c855d3e8d1b61aeb9c5e3c03ae8021b286839b1a72f2e48fdba70")
self.assertEqual(len(s.address), 0)
self.assertEqual(s.pattern, "OP_0 <32>")
self.assertEqual(s.op_sig_count, 0)
# class TransactionDeserializeTests(unittest.TestCase):
# @classmethod
# def setUpClass(cls):
# print("\nTesting Transaction class deserialization:\n")
#
# def test_p2pkh(self):
# print("Deserialize P2PKH")
# tx = blockchain.Transaction.deserialize("0200000001df2d507c1c765e526f8dbe6527fc34be1c30b27fdb88c521060b82"
# "45dd9f4e2e010000006b483045022100b00c4efde901f2efb92679681bef359c"
# "7639ba79979e0c436cb10a266d0b4c5d0220730d854aa51fbe715387256956f3"
# "71d2199297f7b4ea6bf9b4b87bf8bba8417e012102530c548d402670b13ad888"
# "7ff99c294e67fc18097d236d57880c69261b42def70000000001b88800000000"
# "00001600144160bb1870159a08724557f75c7bb665a3a132e000000000")
# "0200000001df2d507c1c765e526f8dbe6527fc34be1c30b27fdb88c521060b8245dd9f4e2e010000006b483045022100b00c4efde901f2efb92679681bef359c7639ba79979e0c436cb10a266d0b4c5d0220730d854aa51fbe715387256956f371d2199297f7b4ea6bf9b4b87bf8bba8417e012102530c548d402670b13ad8887ff99c294e67fc18097d236d57880c69261b42def70000000001b8880000000000001600144160bb1870159a08724557f75c7bb665a3a132e000000000"
# t = "0100000000010198f4f454fbb0aeae5bd167879ca65560b3b7073cd57aaee9cb5903d6f0f883860000000000ffffffff01204e0000000000001976a9143e2585f7b9e642ad9030c0812490ea5b83c511fa88ac02483045022100849b9929f619933224743d14dde08ad521540d9c2b1b85a20ec1ce5db77e4fa1022035222f713f5753119783e049e1c3ef8f819b2af4946cf150c98fe66fd1898409012102530c548d402670b13ad8887ff99c294e67fc18097d236d57880c69261b42def700000000"
# tx = blockchain.Transaction.deserialize(t)
# print(tx.json())

88
test/sighash.py Normal file
View File

@ -0,0 +1,88 @@
import unittest
from pybtc import *
from binascii import unhexlify
from pybtc import address2hash160
class SighashTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting sighash:\n")
def test_sighash_segwit(self):
"""
["raw_transaction, script, input_index, hashType, signature_hash (result)"],
:return:
"""
print("\nNative P2WPKH")
raw_tx = "0100000002fff7f7881a8099afa6940d42d1e7f6362bec38171ea3edf433541db4e4ad969f0000000000eeffffffef51e1b804cc89d182d279655c3aa89e815b1b309fe287d9b2b55d57b90ec68a0100000000ffffffff02202cb206000000001976a9148280b37df378db99f66f85c95a783a76ac7a6d5988ac9093510d000000001976a9143bde42dbee7e4dbe6a21b2d50ce2f0167faa815988ac11000000"
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_ALL,
1,
"1976a9141d0f172a0ecb48aee1be1f2687d2963ae33f71a188ac",
600000000,
True)),
"c37af31116d1b27caf68aae9e3ac82f1477929014d5b917657d0eb49478cb670")
print(Script("c37af31116d1b27caf68aae9e3ac82f1477929014d5b917657d0eb49478cb670").type)
print("P2SH-P2WPKH")
raw_tx = "0100000001db6b1b20aa0fd7b23880be2ecbd4a98130974cf4748fb66092ac4d3ceb1a54770100000000feffffff02b8b4eb0b000000001976a914a457b684d7f0d539a46a45bbc043f35b59d0d96388ac0008af2f000000001976a914fd270b1ee6abcaea97fea7ad0402e8bd8ad6d77c88ac92040000"
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_ALL,
0,
"1976a91479091972186c449eb1ded22b78e40d009bdf008988ac",
1000000000,
True)),
"64f3b0f4dd2bb3aa1ce8566d220cc74dda9df97d8490cc81d89d735c92e59fb6")
print("Native P2WSH")
raw_tx = "0100000002fe3dc9208094f3ffd12645477b3dc56f60ec4fa8e6f5d67c565d1c6b9216b36e0000000000ffffffff0815cf020f013ed6cf91d29f4202e8a58726b1ac6c79da47c23d1bee0a6925f80000000000ffffffff0100f2052a010000001976a914a30741f8145e5acadf23f751864167f32e0963f788ac00000000"
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_SINGLE,
1,
"23210255a9626aebf5e29c0e6538428ba0d1dcf6ca98ffdf086aa8ced5e0d0215ea465ac",
4900000000,
True)),
"fef7bd749cce710c5c052bd796df1af0d935e59cea63736268bcbe2d2134fc47")
print("P2SH-P2WSH SIGHASH_ALL")
raw_tx = "010000000136641869ca081e70f394c6948e8af409e18b619df2ed74aa106c1ca29787b96e0100000000ffffffff0200e9a435000000001976a914389ffce9cd9ae88dcc0631e88a821ffdbe9bfe2688acc0832f05000000001976a9147480a33f950689af511e6e84c138dbbd3c3ee41588ac00000000"
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_ALL,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"185c0be5263dce5b4bb50a047973c1b6272bfbd0103a89444597dc40b248ee7c")
print("P2SH-P2WSH SIGHASH_NONE")
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_NONE,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"e9733bc60ea13c95c6527066bb975a2ff29a925e80aa14c213f686cbae5d2f36")
print("P2SH-P2WSH SIGHASH_SINGLE")
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_SINGLE,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"1e1f1c303dc025bd664acb72e583e933fae4cff9148bf78c157d1e8f78530aea")
print("P2SH-P2WSH SIGHASH_ALL + SIGHASH_ANYONECANPAY")
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_ALL + SIGHASH_ANYONECANPAY,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"2a67f03e63a6a422125878b40b82da593be8d4efaafe88ee528af6e5a9955c6e")
print("P2SH-P2WSH SIGHASH_NONE + SIGHASH_ANYONECANPAY")
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_NONE + SIGHASH_ANYONECANPAY,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"781ba15f3779d5542ce8ecb5c18716733a5ee42a6f51488ec96154934e2c890a")
print("P2SH-P2WSH SIGHASH_SINGLE + SIGHASH_ANYONECANPAY")
self.assertEqual((Transaction.deserialize(raw_tx).sighash_segwit(SIGHASH_SINGLE + SIGHASH_ANYONECANPAY,
0,
"cf56210307b8ae49ac90a048e9b53357a2354b3334e9c8bee813ecb98e99a7e07e8c3ba32103b28f0c28bfab54554ae8c658ac5c3e0ce6e79ad336331f78c428dd43eea8449b21034b8113d703413d57761b8b9781957b8c0ac1dfe69f492580ca4195f50376ba4a21033400f6afecb833092a9a21cfdf1ed1376e58c5d1f47de74683123987e967a8f42103a6d48b1131e94ba04d9737d61acdaa1322008af9602b3b14862c07a1789aac162102d8b661b0b3302ee2f162b09e07a55ad5dfbe673a9f01d9f0c19617681024306b56ae",
987654321,
True)),
"511e8e52ed574121fc1b654970395502128263f62662e076dc6baf05c2e6a99b")

View File

@ -0,0 +1,31 @@
import unittest
from pybtc import blockchain
from binascii import unhexlify
from pybtc import address2hash160
class TransactionDeserializeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("\nTesting Transaction class deserialization:\n")
def test_segwit_deserialize(self):
non_segwit_view = "020000000140d43a99926d43eb0e619bf0b3d83b4a31f60c176beecfb9d35bf45e54d0f7420100000017160014a4b4ca48de0b3fffc15404a1acdc8dbaae226955ffffffff0100e1f5050000000017a9144a1154d50b03292b3024370901711946cb7cccc38700000000"
segwit_view = "0200000000010140d43a99926d43eb0e619bf0b3d83b4a31f60c176beecfb9d35bf45e54d0f7420100000017160014a4b4ca48de0b3fffc15404a1acdc8dbaae226955ffffffff0100e1f5050000000017a9144a1154d50b03292b3024370901711946cb7cccc387024830450221008604ef8f6d8afa892dee0f31259b6ce02dd70c545cfcfed8148179971876c54a022076d771d6e91bed212783c9b06e0de600fab2d518fad6f15a2b191d7fbd262a3e0121039d25ab79f41f75ceaf882411fd41fa670a4c672c23ffaf0e361a969cde0692e800000000"
print("Deserialize Segwit transaction")
ns = blockchain.Transaction.deserialize(non_segwit_view)
s = blockchain.Transaction.deserialize(segwit_view)
s.serialize(True,True)
self.assertEqual(s.serialize(False,True), non_segwit_view)
self.assertEqual(s.serialize(True,True), segwit_view)
self.assertEqual(ns.serialize(False,True), non_segwit_view)
non_segwit_view = "01000000060c02c24bbfefd94cdc4f58a3f83f93e05b14ad968ec6aba54190c3dcba6eef1b00000000da00483045022100f4dbf2ca7b5da97bd78818635d48004e6bf1a6f7e5e24bcecb7d93f554e49eaf02200a05025d93475b6372d14bd8fe8366fe10570ade772b19d124d3b0175b9f6eda0147304402202290feb53fc4cb077c5d3eed0ed5367fef4011ac708c1acaaa5003e4ed680ddf022012c52160ae6b85fc59ceed80c7cacd5358b80712371733de7c76fef552114ee60147522103ee30ea502d692ecfc79dfcb6fb2d6914fc70ba80db971beebacccdbaed40d7c52102f52db885d04dc43ca8562471759dd621b899faf508caba70963d9228bfd2415e52ae00000000dab020ee0a80a818e4d20a52aa7ba367a0a2d430d22c26ccb4572527e259e14a01000000d900473044022064745ac8cae859bb19305a550981b8d39b69efec55b9a423dca645cd3b5c845502205cf375839d7f056235feb550a8138a03e75fa140503a2ce61fe239a3bfe42145014730440220728b0393d5427d8abb56a608c6b1a0b14c5455f3abeb56ce8a5c7f70035af71d022052a99e4e389790b364f6180caf1523d6da2b3daabe85952705023be2b5409b360147522103e296937dbdafdae4d0a45b33d9a691b0c0e71d97bd2ffc82da895a251f74bd7e2103ead7ad0c398f928cbe851a6b387a5e74d139487caf4d4ac3dc3d478254dbbb4452ae0000000067a6c2e2f14fc412b3f5627fafac2fe43009bc403ec680839579444df2ce042b00000000da00483045022100d3bdc055fa5dcce335a1d572b1c3ccb8cc9ba91960c6361c77e29ded716e233102200e7ebb43fd39fb98c714098d4fda32d94cdbefdd96c0c918b788aacc6164953c0147304402202f4338d2710edb1960dcf7136411f065a16bee4e44b86604d64c660315bc55040220238c1c3216feb31051f7798297317819da1dfa830d24a6a9e38a37367a68ebd101475221037b2987df626510ce25e6ce5fdb716705e23fecb7398b2cbb0a1c0af7ca5da717210345e358653b4580b5bd68d263089a0a2bf9fcc1e145fcf2a3d4b9ab5cd7e1a76752ae000000004f28d63103dfb86a5d92d2daf328bbb35d72239766a5853b7076a90d1745813200000000da00483045022100e89ac8215ee87186de284c419b2522ebfb2ecb8063d0f91942f2ad63f383d3d4022036485902bb1f2e0b2cc645aab8781def27f25e91d8256d48dd48d5cfca1a21c20147304402201449379f1d57f2b7ad1dc0882f59627287a6c32180ffa7637941b0eaa666dd4b022028eb0eed77e1b92de046098c855834a5feeadea55d17160bc6d11d47184e8b51014752210283db605dc305201ab9be509a22d2c83b388002fb54ecd82d86efe83c0a1d35822103146f745eff0ae31fe899aafd27d51d2c0f5b0c03f2f47b3c65bb26ec7581ad8652ae0000000021cb3b00d1f22455e76e86872e00ef556578bcc112071e6a5b4ac02ab682fdb301000000232200206ea344e9a4a8f8a8983479af2ae3ed29fab153955af14457780a304a6832b9c50000000016dcc4b40a514c43ed61d6c01a9006d7f21a6d30b99b3e580d21578e35002502000000002322002049ea1f7c280b32fee0dce2e1801df2218df59d64614c4fe76c043ee2c80116700000000002005ed0b20000000017a91495c5c19257aa52bd4b702ba1a5e29b8d72a75a3a876d6b4e010000000017a91487b6255a5df746188f0bd22ed0194a40ec98f2de87be810700"
segwit_view = "010000000001060c02c24bbfefd94cdc4f58a3f83f93e05b14ad968ec6aba54190c3dcba6eef1b00000000da00483045022100f4dbf2ca7b5da97bd78818635d48004e6bf1a6f7e5e24bcecb7d93f554e49eaf02200a05025d93475b6372d14bd8fe8366fe10570ade772b19d124d3b0175b9f6eda0147304402202290feb53fc4cb077c5d3eed0ed5367fef4011ac708c1acaaa5003e4ed680ddf022012c52160ae6b85fc59ceed80c7cacd5358b80712371733de7c76fef552114ee60147522103ee30ea502d692ecfc79dfcb6fb2d6914fc70ba80db971beebacccdbaed40d7c52102f52db885d04dc43ca8562471759dd621b899faf508caba70963d9228bfd2415e52ae00000000dab020ee0a80a818e4d20a52aa7ba367a0a2d430d22c26ccb4572527e259e14a01000000d900473044022064745ac8cae859bb19305a550981b8d39b69efec55b9a423dca645cd3b5c845502205cf375839d7f056235feb550a8138a03e75fa140503a2ce61fe239a3bfe42145014730440220728b0393d5427d8abb56a608c6b1a0b14c5455f3abeb56ce8a5c7f70035af71d022052a99e4e389790b364f6180caf1523d6da2b3daabe85952705023be2b5409b360147522103e296937dbdafdae4d0a45b33d9a691b0c0e71d97bd2ffc82da895a251f74bd7e2103ead7ad0c398f928cbe851a6b387a5e74d139487caf4d4ac3dc3d478254dbbb4452ae0000000067a6c2e2f14fc412b3f5627fafac2fe43009bc403ec680839579444df2ce042b00000000da00483045022100d3bdc055fa5dcce335a1d572b1c3ccb8cc9ba91960c6361c77e29ded716e233102200e7ebb43fd39fb98c714098d4fda32d94cdbefdd96c0c918b788aacc6164953c0147304402202f4338d2710edb1960dcf7136411f065a16bee4e44b86604d64c660315bc55040220238c1c3216feb31051f7798297317819da1dfa830d24a6a9e38a37367a68ebd101475221037b2987df626510ce25e6ce5fdb716705e23fecb7398b2cbb0a1c0af7ca5da717210345e358653b4580b5bd68d263089a0a2bf9fcc1e145fcf2a3d4b9ab5cd7e1a76752ae000000004f28d63103dfb86a5d92d2daf328bbb35d72239766a5853b7076a90d1745813200000000da00483045022100e89ac8215ee87186de284c419b2522ebfb2ecb8063d0f91942f2ad63f383d3d4022036485902bb1f2e0b2cc645aab8781def27f25e91d8256d48dd48d5cfca1a21c20147304402201449379f1d57f2b7ad1dc0882f59627287a6c32180ffa7637941b0eaa666dd4b022028eb0eed77e1b92de046098c855834a5feeadea55d17160bc6d11d47184e8b51014752210283db605dc305201ab9be509a22d2c83b388002fb54ecd82d86efe83c0a1d35822103146f745eff0ae31fe899aafd27d51d2c0f5b0c03f2f47b3c65bb26ec7581ad8652ae0000000021cb3b00d1f22455e76e86872e00ef556578bcc112071e6a5b4ac02ab682fdb301000000232200206ea344e9a4a8f8a8983479af2ae3ed29fab153955af14457780a304a6832b9c50000000016dcc4b40a514c43ed61d6c01a9006d7f21a6d30b99b3e580d21578e35002502000000002322002049ea1f7c280b32fee0dce2e1801df2218df59d64614c4fe76c043ee2c80116700000000002005ed0b20000000017a91495c5c19257aa52bd4b702ba1a5e29b8d72a75a3a876d6b4e010000000017a91487b6255a5df746188f0bd22ed0194a40ec98f2de87000000000400473044022100d0d2ded141c9369bcc99de23d3d41d1d99d6cff47126df1b0c4d4797f947eacf021f790f1c112b3425ebc3251d719aae6ef0f9830b688585275591a5353f1f973801483045022100bf06c762e6ab64258d2f2777a66fe32ddd8f36e232b80bf5afa6ff9b9aa73ee0022049caf991fce808e60a9b17499f5e0dc11f6163e3ef7bca8109b72b5695d674210147522103edd556806048b319d71f43466c4415001bb32d8afe3aac06532d3ac210fd0e86210215e16727cf1389b4ee377487385f3ec595841a6bb747eb9c3a5cd559e9b1c8dc52ae040047304402204e9cc87526e148d236d692fa70104d26b8df632f30f4e3be38a2e99cec76d0f80220354ae575c3537c0ad2399a6037a9164b0cb147b12f262efc906649ca7950e2eb0147304402203553bcd1565804ec71c997c87006bd91c639b74a004b19a239c7f551aab5635a0220753f74e065c0b7cdf67d16b00f6a20dda7159a49f20aa493a7889b2851b2fea30147522103b09ac1fa65a55fa4feadea57c4cf417d7490065d8b844ada60c242a441e0e3a42103c0625169b46dbbde3492db7c62f1be8f582131467620cef17335306bad7ef88a52aebe810700"
print("Deserialize Segwit transaction")
ns = blockchain.Transaction.deserialize(non_segwit_view)
s = blockchain.Transaction.deserialize(segwit_view)
s.serialize(True, True)
self.assertEqual(s.serialize(False, True), non_segwit_view)
self.assertEqual(s.serialize(True, True), segwit_view)
self.assertEqual(ns.serialize(False, True), non_segwit_view)