# Copyright (C) 2012-2014 The python-bitcoinlib developers
#
# This file is part of python-bitcoinlib.
#
# It is subject to the license terms in the LICENSE file found in the top-level
# directory of this distribution.
#
# No part of python-bitcoinlib, including this file, may be copied, modified,
# propagated, or distributed except according to the terms contained in the
# LICENSE file.
from __future__ import absolute_import, division, print_function, unicode_literals
import binascii
import hashlib
import socket
import struct
import sys
import time
from .script import CScript
from .serialize import *
# Core definitions
COIN = 100000000
MAX_MONEY = 21000000 * COIN
MAX_BLOCK_SIZE = 1000000
MAX_BLOCK_SIGOPS = MAX_BLOCK_SIZE/50
BIP0031_VERSION = 60000
PROTO_VERSION = 60002
MIN_PROTO_VERSION = 209
CADDR_TIME_VERSION = 31402
[docs]def MoneyRange(nValue):
return 0 <= nValue <= MAX_MONEY
def py2_x(h):
"""Convert a hex string to bytes"""
return binascii.unhexlify(h)
[docs]def x(h):
"""Convert a hex string to bytes"""
return binascii.unhexlify(h.encode('utf8'))
def py2_b2x(b):
"""Convert bytes to a hex string"""
return binascii.hexlify(b)
[docs]def b2x(b):
"""Convert bytes to a hex string"""
return binascii.hexlify(b).decode('utf8')
def py2_lx(h):
"""Convert a little-endian hex string to bytes
Lets you write uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.unhexlify(h)[::-1]
[docs]def lx(h):
"""Convert a little-endian hex string to bytes
Lets you write uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.unhexlify(h.encode('utf8'))[::-1]
def py2_b2lx(b):
"""Convert bytes to a little-endian hex string
Lets you show uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.hexlify(b[::-1])
[docs]def b2lx(b):
"""Convert bytes to a little-endian hex string
Lets you show uint256's and uint160's the way the Satoshi codebase shows
them.
"""
return binascii.hexlify(b[::-1]).decode('utf8')
if not (sys.version > '3'):
x = py2_x
b2x = py2_b2x
lx = py2_lx
b2lx = py2_b2lx
del py2_x
del py2_b2x
del py2_lx
del py2_b2lx
[docs]def str_money_value(value):
"""Convert an integer money value to a fixed point string"""
r = '%i.%08i' % (value // COIN, value % COIN)
r = r.rstrip('0')
if r[-1] == '.':
r += '0'
return r
[docs]class ValidationError(Exception):
"""Base class for all blockchain validation errors
Everything that is related to validating the blockchain, blocks,
transactions, scripts, etc. is derived from this class.
"""
def __make_mutable(cls):
# For speed we use a class decorator that removes the immutable
# restrictions directly. In addition the modified behavior of GetHash() and
# hash() is undone.
cls.__setattr__ = object.__setattr__
cls.__delattr__ = object.__delattr__
cls.GetHash = Serializable.GetHash
cls.__hash__ = Serializable.__hash__
return cls
[docs]class COutPoint(ImmutableSerializable):
"""The combination of a transaction hash and an index n into its vout"""
__slots__ = ['hash', 'n']
def __init__(self, hash=b'\x00'*32, n=0xffffffff):
if not len(hash) == 32:
raise ValueError('COutPoint: hash must be exactly 32 bytes; got %d bytes' % len(hash))
object.__setattr__(self, 'hash', hash)
if not (0 <= n <= 0xffffffff):
raise ValueError('COutPoint: n must be in range 0x0 to 0xffffffff; got %x' % n)
object.__setattr__(self, 'n', n)
@classmethod
[docs] def stream_deserialize(cls, f):
hash = ser_read(f,32)
n = struct.unpack(b"<I", ser_read(f,4))[0]
return cls(hash, n)
[docs] def stream_serialize(self, f):
assert len(self.hash) == 32
f.write(self.hash)
f.write(struct.pack(b"<I", self.n))
[docs] def is_null(self):
return ((self.hash == b'\x00'*32) and (self.n == 0xffffffff))
def __repr__(self):
if self.is_null():
return 'COutPoint()'
else:
return 'COutPoint(lx(%r), %i)' % (b2lx(self.hash), self.n)
@classmethod
[docs] def from_outpoint(cls, outpoint):
"""Create an immutable copy of an existing OutPoint
If output is already immutable (outpoint.__class__ is COutPoint) it is
returned directly.
"""
if output.__class__ is COutPoint:
return output
else:
return cls(outpoint.hash, outpoint.n)
@__make_mutable
[docs]class CMutableOutPoint(COutPoint):
"""A mutable COutPoint"""
__slots__ = []
@classmethod
[docs] def from_outpoint(cls, outpoint):
"""Create a mutable copy of an existing COutPoint"""
return cls(outpoint.hash, outpoint.n)
[docs]class CTxIn(ImmutableSerializable):
"""An input of a transaction
Contains the location of the previous transaction's output that it claims,
and a signature that matches the output's public key.
"""
__slots__ = ['prevout', 'scriptSig', 'nSequence']
def __init__(self, prevout=COutPoint(), scriptSig=CScript(), nSequence = 0xffffffff):
if not (0 <= nSequence <= 0xffffffff):
raise ValueError('CTxIn: nSequence must be an integer between 0x0 and 0xffffffff; got %x' % nSequence)
object.__setattr__(self, 'nSequence', nSequence)
object.__setattr__(self, 'prevout', prevout)
object.__setattr__(self, 'scriptSig', scriptSig)
@classmethod
[docs] def stream_deserialize(cls, f):
prevout = COutPoint.stream_deserialize(f)
scriptSig = script.CScript(BytesSerializer.stream_deserialize(f))
nSequence = struct.unpack(b"<I", ser_read(f,4))[0]
return cls(prevout, scriptSig, nSequence)
[docs] def stream_serialize(self, f):
COutPoint.stream_serialize(self.prevout, f)
BytesSerializer.stream_serialize(self.scriptSig, f)
f.write(struct.pack(b"<I", self.nSequence))
[docs] def is_final(self):
return (self.nSequence == 0xffffffff)
def __repr__(self):
return "CTxIn(%s, %s, 0x%x)" % (repr(self.prevout), repr(self.scriptSig), self.nSequence)
@classmethod
[docs] def from_txin(cls, txin):
"""Create an immutable copy of an existing TxIn
If txin is already immutable (txin.__class__ is CTxIn) it is returned
directly.
"""
if txin.__class__ is CTxIn:
return txin
else:
return cls(COutPoint.from_txout(txin.prevout), txin.scriptSig, txin.nSequence)
@__make_mutable
[docs]class CMutableTxIn(CTxIn):
"""A mutable CTxIn"""
__slots__ = []
def __init__(self, prevout=None, scriptSig=CScript(), nSequence = 0xffffffff):
if not (0 <= nSequence <= 0xffffffff):
raise ValueError('CTxIn: nSequence must be an integer between 0x0 and 0xffffffff; got %x' % nSequence)
self.nSequence = nSequence
if prevout is None:
prevout = CMutableOutPoint()
self.prevout = prevout
self.scriptSig = scriptSig
@classmethod
[docs] def from_txin(cls, txin):
"""Create a fully mutable copy of an existing TxIn"""
prevout = CMutableOutPoint.from_outpoint(txin.prevout)
return cls(prevout, txin.scriptSig, txin.nSequence)
[docs]class CTxOut(ImmutableSerializable):
"""An output of a transaction
Contains the public key that the next input must be able to sign with to
claim it.
"""
__slots__ = ['nValue', 'scriptPubKey']
def __init__(self, nValue=-1, scriptPubKey=script.CScript()):
object.__setattr__(self, 'nValue', int(nValue))
object.__setattr__(self, 'scriptPubKey', scriptPubKey)
@classmethod
[docs] def stream_deserialize(cls, f):
nValue = struct.unpack(b"<q", ser_read(f,8))[0]
scriptPubKey = script.CScript(BytesSerializer.stream_deserialize(f))
return cls(nValue, scriptPubKey)
[docs] def stream_serialize(self, f):
f.write(struct.pack(b"<q", self.nValue))
BytesSerializer.stream_serialize(self.scriptPubKey, f)
[docs] def is_valid(self):
if not MoneyRange(self.nValue):
return False
if not self.scriptPubKey.is_valid():
return False
return True
def __repr__(self):
if self.nValue >= 0:
return "CTxOut(%s*COIN, %r)" % (str_money_value(self.nValue), self.scriptPubKey)
else:
return "CTxOut(%d, %r)" % (self.nValue, self.scriptPubKey)
@classmethod
[docs] def from_txout(cls, txout):
"""Create an immutable copy of an existing TxOut
If txout is already immutable (txout.__class__ is CTxOut) then it will
be returned directly.
"""
if txout.__class__ is CTxOut:
return txout
else:
return cls(txout.nValue, txout.scriptPubKey)
@__make_mutable
[docs]class CMutableTxOut(CTxOut):
"""A mutable CTxOut"""
__slots__ = []
@classmethod
[docs] def from_txout(cls, txout):
"""Create a fullly mutable copy of an existing TxOut"""
return cls(txout.nValue, txout.scriptPubKey)
[docs]class CTransaction(ImmutableSerializable):
"""A transaction"""
__slots__ = ['nVersion', 'vin', 'vout', 'nLockTime']
def __init__(self, vin=(), vout=(), nLockTime=0, nVersion=1):
"""Create a new transaction
vin and vout are iterables of transaction inputs and outputs
respectively. If their contents are not already immutable, immutable
copies will be made.
"""
if not (0 <= nLockTime <= 0xffffffff):
raise ValueError('CTransaction: nLockTime must be in range 0x0 to 0xffffffff; got %x' % nLockTime)
object.__setattr__(self, 'nLockTime', nLockTime)
object.__setattr__(self, 'nVersion', nVersion)
object.__setattr__(self, 'vin', tuple(CTxIn.from_txin(txin) for txin in vin))
object.__setattr__(self, 'vout', tuple(CTxOut.from_txout(txout) for txout in vout))
@classmethod
[docs] def stream_deserialize(cls, f):
nVersion = struct.unpack(b"<i", ser_read(f,4))[0]
vin = VectorSerializer.stream_deserialize(CTxIn, f)
vout = VectorSerializer.stream_deserialize(CTxOut, f)
nLockTime = struct.unpack(b"<I", ser_read(f,4))[0]
return cls(vin, vout, nLockTime, nVersion)
[docs] def stream_serialize(self, f):
f.write(struct.pack(b"<i", self.nVersion))
VectorSerializer.stream_serialize(CTxIn, self.vin, f)
VectorSerializer.stream_serialize(CTxOut, self.vout, f)
f.write(struct.pack(b"<I", self.nLockTime))
[docs] def is_coinbase(self):
return len(self.vin) == 1 and self.vin[0].prevout.is_null()
def __repr__(self):
return "CTransaction(%r, %r, %i, %i)" % (self.vin, self.vout, self.nLockTime, self.nVersion)
@classmethod
[docs] def from_tx(cls, tx):
"""Create an immutable copy of a pre-existing transaction
If tx is already immutable (tx.__class__ is CTransaction) then it will
be returned directly.
"""
if tx.__class__ is CTransaction:
return tx
else:
return cls(tx.vin, tx.vout, tx.nLockTime, tx.nVersion)
@__make_mutable
[docs]class CMutableTransaction(CTransaction):
"""A mutable transaction"""
__slots__ = []
def __init__(self, vin=None, vout=None, nLockTime=0, nVersion=1):
if not (0 <= nLockTime <= 0xffffffff):
raise ValueError('CTransaction: nLockTime must be in range 0x0 to 0xffffffff; got %x' % nLockTime)
self.nLockTime = nLockTime
if vin is None:
vin = []
self.vin = vin
if vout is None:
vout = []
self.vout = vout
self.nVersion = nVersion
@classmethod
[docs] def from_tx(cls, tx):
"""Create a fully mutable copy of a pre-existing transaction"""
vin = [CMutableTxIn.from_txin(txin) for txin in tx.vin]
vout = [CMutableTxOut.from_txout(txout) for txout in tx.vout]
return cls(vin, vout, tx.nLockTime, tx.nVersion)
[docs]class CBlock(CBlockHeader):
"""A block including all transactions in it"""
__slots__ = ['vtx', 'vMerkleTree']
@staticmethod
[docs] def build_merkle_tree_from_txids(txids):
"""Build a full CBlock merkle tree from txids
txids - iterable of txids
Returns a new merkle tree in deepest first order. The last element is
the merkle root.
WARNING! If you're reading this because you're learning about crypto
and/or designing a new system that will use merkle trees, keep in mind
that the following merkle tree algorithm has a serious flaw related to
duplicate txids, resulting in a vulnerability. (CVE-2012-2459) Bitcoin
has since worked around the flaw, but for new applications you should
use something different; don't just copy-and-paste this code without
understanding the problem first.
"""
merkle_tree = list(txids)
size = len(txids)
j = 0
while size > 1:
for i in range(0, size, 2):
i2 = min(i+1, size-1)
merkle_tree.append(Hash(merkle_tree[j+i] + merkle_tree[j+i2]))
j += size
size = (size + 1) // 2
return merkle_tree
@staticmethod
[docs] def build_merkle_tree_from_txs(txs):
"""Build a full merkle tree from transactions"""
txids = [tx.GetHash() for tx in txs]
return CBlock.build_merkle_tree_from_txids(txids)
[docs] def calc_merkle_root(self):
"""Calculate the merkle root
The calculated merkle root is not cached; every invocation
re-calculates it from scratch.
"""
if not len(self.vtx):
raise ValueError('Block contains no transactions')
return self.build_merkle_tree_from_txs(self.vtx)[-1]
def __init__(self, nVersion=2, hashPrevBlock=b'\x00'*32, hashMerkleRoot=b'\x00'*32, nTime=0, nBits=0, nNonce=0, vtx=()):
"""Create a new block"""
super(CBlock, self).__init__(nVersion, hashPrevBlock, hashMerkleRoot, nTime, nBits, nNonce)
vMerkleTree = tuple(CBlock.build_merkle_tree_from_txs(vtx))
object.__setattr__(self, 'vMerkleTree', vMerkleTree)
object.__setattr__(self, 'vtx', tuple(CTransaction.from_tx(tx) for tx in vtx))
@classmethod
[docs] def stream_deserialize(cls, f):
self = super(CBlock, cls).stream_deserialize(f)
vtx = VectorSerializer.stream_deserialize(CTransaction, f)
vMerkleTree = tuple(CBlock.build_merkle_tree_from_txs(vtx))
object.__setattr__(self, 'vMerkleTree', vMerkleTree)
object.__setattr__(self, 'vtx', tuple(vtx))
return self
[docs] def stream_serialize(self, f):
super(CBlock, self).stream_serialize(f)
VectorSerializer.stream_serialize(CTransaction, self.vtx, f)
[docs] def GetHash(self):
"""Return the block hash
Note that this is the hash of the header, not the entire serialized
block.
"""
try:
return self._cached_GetHash
except AttributeError:
_cached_GetHash = self.get_header().GetHash()
object.__setattr__(self, '_cached_GetHash', _cached_GetHash)
return _cached_GetHash
[docs]class CoreChainParams(object):
"""Define consensus-critical parameters of a given instance of the Bitcoin system"""
GENESIS_BLOCK = None
PROOF_OF_WORK_LIMIT = None
SUBSIDY_HALVING_INTERVAL = None
NAME = None
[docs]class CoreMainParams(CoreChainParams):
NAME = 'mainnet'
GENESIS_BLOCK = CBlock.deserialize(x('0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000'))
SUBSIDY_HALVING_INTERVAL = 210000
PROOF_OF_WORK_LIMIT = 2**256-1 >> 32
[docs]class CoreTestNetParams(CoreMainParams):
NAME = 'testnet'
GENESIS_BLOCK = CBlock.deserialize(x('0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff001d1aa4ae180101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000'))
[docs]class CoreRegTestParams(CoreTestNetParams):
NAME = 'regtest'
GENESIS_BLOCK = CBlock.deserialize(x('0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000'))
SUBSIDY_HALVING_INTERVAL = 150
PROOF_OF_WORK_LIMIT = 2**256-1 >> 1
"""Master global setting for what core chain params we're using"""
coreparams = CoreMainParams()
def _SelectCoreParams(name):
"""Select the core chain parameters to use
Don't use this directly, use bitcoin.SelectParams() instead so both
consensus-critical and general parameters are set properly.
"""
global coreparams
if name == 'mainnet':
coreparams = CoreMainParams()
elif name == 'testnet':
coreparams = CoreTestNetParams()
elif name == 'regtest':
coreparams = CoreRegTestParams()
else:
raise ValueError('Unknown chain %r' % name)
[docs]class CheckTransactionError(ValidationError):
pass
[docs]def CheckTransaction(tx):
"""Basic transaction checks that don't depend on any context.
Raises CheckTransactionError
"""
if not tx.vin:
raise CheckTransactionError("CheckTransaction() : vin empty")
if not tx.vout:
raise CheckTransactionError("CheckTransaction() : vout empty")
# Size limits
if len(tx.serialize()) > MAX_BLOCK_SIZE:
raise CheckTransactionError("CheckTransaction() : size limits failed")
# Check for negative or overflow output values
nValueOut = 0
for txout in tx.vout:
if txout.nValue < 0:
raise CheckTransactionError("CheckTransaction() : txout.nValue negative")
if txout.nValue > MAX_MONEY:
raise CheckTransactionError("CheckTransaction() : txout.nValue too high")
nValueOut += txout.nValue
if not MoneyRange(nValueOut):
raise CheckTransactionError("CheckTransaction() : txout total out of range")
# Check for duplicate inputs
vin_outpoints = set()
for txin in tx.vin:
if txin.prevout in vin_outpoints:
raise CheckTransactionError("CheckTransaction() : duplicate inputs")
vin_outpoints.add(txin.prevout)
if tx.is_coinbase():
if not (2 <= len(tx.vin[0].scriptSig) <= 100):
raise CheckTransactionError("CheckTransaction() : coinbase script size")
else:
for txin in tx.vin:
if txin.prevout.is_null():
raise CheckTransactionError("CheckTransaction() : prevout is null")
[docs]class CheckProofOfWorkError(CheckBlockHeaderError):
pass
[docs]def CheckProofOfWork(hash, nBits):
"""Check a proof-of-work
Raises CheckProofOfWorkError
"""
target = uint256_from_compact(nBits)
# Check range
if not (0 < target <= coreparams.PROOF_OF_WORK_LIMIT):
raise CheckProofOfWorkError("CheckProofOfWork() : nBits below minimum work")
# Check proof of work matches claimed amount
hash = uint256_from_str(hash)
if hash > target:
raise CheckProofOfWorkError("CheckProofOfWork() : hash doesn't match nBits")
[docs]class CheckBlockError(CheckBlockHeaderError):
pass
[docs]def GetLegacySigOpCount(tx):
nSigOps = 0
for txin in tx.vin:
nSigOps += txin.scriptSig.GetSigOpCount(False)
for txout in tx.vout:
nSigOps += txout.scriptPubKey.GetSigOpCount(False)
return nSigOps
[docs]def CheckBlock(block, fCheckPoW = True, fCheckMerkleRoot = True, cur_time=None):
"""Context independent CBlock checks.
CheckBlockHeader() is called first, which may raise a CheckBlockHeader
exception, followed the block tests. CheckTransaction() is called for every
transaction.
fCheckPoW - Check proof-of-work.
fCheckMerkleRoot - Check merkle root matches transactions.
cur_time - Current time. Defaults to time.time()
"""
# Block header checks
CheckBlockHeader(block.get_header(), fCheckPoW=fCheckPoW, cur_time=cur_time)
# Size limits
if not block.vtx:
raise CheckBlockError("CheckBlock() : vtx empty")
if len(block.serialize()) > MAX_BLOCK_SIZE:
raise CheckBlockError("CheckBlock() : block larger than MAX_BLOCK_SIZE")
# First transaction must be coinbase
if not block.vtx[0].is_coinbase():
raise CheckBlockError("CheckBlock() : first tx is not coinbase")
# Check rest of transactions. Note how we do things "all at once", which
# could potentially be a consensus failure if there was some obscure bug.
# For unique txid uniqueness testing. If coinbase tx is included twice
# it'll be caught by the "more than one coinbase" test.
unique_txids = set()
nSigOps = 0
for tx in block.vtx[1:]:
if tx.is_coinbase():
raise CheckBlockError("CheckBlock() : more than one coinbase")
CheckTransaction(tx)
txid = tx.GetHash()
if txid in unique_txids:
raise CheckBlockError("CheckBlock() : duplicate transaction")
unique_txids.add(txid)
nSigOps += GetLegacySigOpCount(tx)
if nSigOps > MAX_BLOCK_SIGOPS:
raise CheckBlockError("CheckBlock() : out-of-bounds SigOpCount")
# Check merkle root
if fCheckMerkleRoot and block.hashMerkleRoot != block.calc_merkle_root():
raise CheckBlockError("CheckBlock() : hashMerkleRoot mismatch")