Source code for solidbyte.deploy.objects

""" Contract deployer """
import sys
from typing import TYPE_CHECKING, Union, Any, Optional, Dict, List, Tuple, Set
from attrdict import AttrDict
from getpass import getpass
from eth_utils.exceptions import ValidationError
from web3.eth import Contract as Web3Contract
from ..accounts import Accounts
from ..compile import link_library, clean_bytecode
from ..compile.artifacts import contract_artifacts
from ..compile.linker import hash_linked_bytecode
from ..common import pop_key_from_dict, MAX_PRODUCTION_NETWORK_ID
from ..common.web3 import (
    web3c,
    normalize_hexstring,
    create_deploy_tx,
)
from ..common import store
from ..common.exceptions import DeploymentError, DeploymentValidationError
from ..common.logging import getLogger

# datetime.fromisoformat() isn't available until Python 3.7.  Monkeypatch!
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
    from ..common.utils import Py36Datetime as datetime
else:
    from datetime import datetime

if TYPE_CHECKING:
    from ..common.metafile import MetaFile
    from web3 import Web3

log = getLogger(__name__)

# Typing
T = Union[Any, None]
MultiDict = Union[AttrDict, dict]

DISABLED_REMOTE_NOTICE = (
    "Deployment with remote accounts is currently disabled. If you're interested in having this "
    "enabled, please add your feedback to this issue: "
    "https://github.com/mikeshultz/solidbyte/issues/32"
)
ROOT_LEAF_NAME = "__root__"


[docs]def get_lineage(leaf: 'ContractLeaf') -> Set['ContractLeaf']: """ Climb a deptree and return all elements "above" the provided leaf """ _parents: Set['ContractLeaf'] = set() if not leaf.parent or leaf.parent.name == ROOT_LEAF_NAME: return _parents else: _parents.add(leaf.parent) _parents.update(get_lineage(leaf.parent)) return _parents
[docs]class ContractLeaf: """ A leaf object in the dependency tree :Definitions: - dependent: Leaves that this leaf depends on - dependency: A leaf that depends on this leaf """
[docs] def __init__(self, name: str, tree: 'ContractDependencyTree', parent: 'ContractLeaf' = None) -> None: self.name = name self.tree = tree self.parent = parent self.dependents: Set['ContractLeaf'] = set()
def __repr__(self) -> str: return self.name
[docs] def add_dependent(self, name: str) -> 'ContractLeaf': """ Add a dependent leaf """ # If this element already exists, move it to be our dependent el, _ = self.tree.search_tree(name) if el is not None: self.tree.move(name, self) return el # Otherwise, create a new elemenet new_leaf = ContractLeaf(name, self.tree, self) self.dependents.add(new_leaf) return new_leaf
[docs] def attach_dependent(self, el: 'ContractLeaf') -> None: """ Attach an element to this Leaf as dependent """ self.dependents.add(el)
[docs] def get_parent(self) -> Optional['ContractLeaf']: """ Return the parent ContractLeaf """ return self.parent
[docs] def is_root(self) -> bool: """ Is this the root leaf? """ return self.parent is None
[docs] def has_dependencies(self) -> bool: """ Does this leaf have dependencies? """ return self.parent is not None and self.parent.name != ROOT_LEAF_NAME
[docs] def has_dependents(self) -> bool: """ Does this leaf have dependents """ return len(self.dependents) > 0
[docs] def get_dependents(self) -> Set['ContractLeaf']: """ Resolve and return all dependents in a flat set """ _dependents: Set['ContractLeaf'] = self.dependents for d in self.dependents: if d.has_dependents(): _dependents.update(d.get_dependents()) return _dependents
[docs] def get_dependencies(self) -> Set['ContractLeaf']: """ Resolve and return all dependencies in a flat set """ return get_lineage(self)
[docs]class ContractDependencyTree: """ A tree of Leafs describing contract library dependencies :Example: >>> deptree = ContractDependencyTree() """
[docs] def __init__(self): self.root = ContractLeaf(ROOT_LEAF_NAME, self)
def __repr__(self): strong = '[root]\n' for el in self.root.dependents: strong += '- {} (Dependants: {}, Has Dependencies: {})\n'.format( el, el.get_dependents(), el.has_dependencies() ) return strong def _search(self, name: str, el: ContractLeaf = None, depth: int = 0) -> Tuple[Optional[ContractLeaf], int]: """ Internal recursive tree search function :param name: The name of the leaf to look for :param el: The current leaf to search down from; for recursive use :param depth: depth tracking, for recursive use """ if el is None: el = self.root depth = -1 if el.name == name: return (el, depth) else: found = None for dep in el.dependents: found, found_depth = self._search(name, dep, depth+1) if found: return (found, found_depth) return (None, depth)
[docs] def search_tree(self, name: str) -> Tuple[Optional[ContractLeaf], int]: """ Search a tree for a named leaf :param name: The name of the leaf to look for """ return self._search(name)
[docs] def has_dependents(self, name: str): """ Check of name has dependents """ el, _ = self.search_tree(name) if el and len(el.dependents) > 0: return True return False
[docs] def has_dependencies(self, name: str): """ Check of name has dependencies """ el, depth = self.search_tree(name) if el and depth > 0: return True return False
[docs] def add_dependent(self, name: str, parent: str = None) -> ContractLeaf: """ Add a child dependent """ if parent: el, _ = self.search_tree(parent) if el is None: el = self.root return el.add_dependent(name)
[docs] def move(self, name: str, new_parent: ContractLeaf) -> ContractLeaf: """ Move an element to be a child of another """ el, _ = self.search_tree(name) if el is None or el.parent is None: raise Exception('Element to move was not found') el.parent.dependents.remove(el) new_parent.attach_dependent(el) el.parent = new_parent return el
[docs]class Deployment: """ representation of a simgle contract deployment """
[docs] def __init__(self, network: str, address: str, bytecode_hash: str, date: datetime, abi: List[Dict[str, T]]): self.network = network self.address = address self.bytecode_hash = bytecode_hash self.date = date self.abi = abi
[docs]class Contract: """ The representation of a smart contract deployment state on a specific network. This object is exposed to users' in their deploy script, instantiated for each of their contracts. The general use is to provide information about the contract state, a Web3 Contract instance and deploy it if necessary. There's a bunch of properties, but two primary methods to interact with the contract. * check_needs_deployment(bytecode: str) -> bool: This function will take the provided bytecode and return whether or not there's been a change compared to the known, deployed bytecode. * deployed(*args: T, **kwargs: T) -> web3.eth.Contract: This is the primary interaction a user has with this object in a deploy script. It will return an instantiated web3.eth.Contract instance and in the process, deploy the smart contract if necessary. """
[docs] def __init__(self, name: str, network_name: str, from_account: str, metafile: 'MetaFile', web3: 'Web3' = None): """ Initialize the Contract :param name: The name of... me. The name of the contract I represent. :param network_name: The name of of the network, as defined in networks.yml. :param from_account: The address of the account to deploy with. :param metafile: An instantiated MetaFile object. :param web3: An instantiated Web3 object :Example: >>> from solidbyte.deploy.objects import Contract >>> MyContract = Contract( ... 'TestContract', ... 'test', ... '0xdeadbeef00000000000000000000000000000000', ... MetaFile(), ... Web3(), ... ) >>> my_contract = MyContract.deployed() """ log.debug("Contract.__init__({}, {}, {}, {}, {})".format(name, network_name, from_account, metafile, web3)) self.name = name self.new_deployment = False self.deployedHash = None self.source_bytecode = None self.source_abi = None self.links = None # This will only populate after a _deploy() self.deployments: List = [] self.from_account = from_account if web3: self.web3 = web3 else: self.web3 = web3c.get_web3(network_name) self.network_id = self.web3.net.chainId or self.web3.net.version self.accounts = Accounts(web3=self.web3) self.metafile = metafile self.refresh()
def __repr__(self) -> str: return self.name @property def address(self) -> Optional[str]: """ The latest deployed address """ if len(self.deployments) < 1: return None return self.deployments[-1].address @property def abi(self) -> Optional[List[Dict[str, T]]]: """ The latest deployed ABI """ if len(self.deployments) < 1: return None return self.deployments[-1].abi @property def bytecode_hash(self)-> Optional[str]: """ The latest deployed bytecode hash """ if len(self.deployments) < 1: return None return self.deployments[-1].bytecode_hash
[docs] def is_deployed(self) -> bool: """ Return if this contract has deployments """ return len(self.deployments) > 0
[docs] def refresh(self) -> None: """ Refresh metadata from MetaFile and the compiled artifacts """ self._load_metafile_contract() self._load_artifacts()
[docs] def check_needs_deployment(self, bytecode: str) -> bool: """ Check if this contract has been changed since last deployment **NOTE**: This method does not take into account dependencies. Check with Deployer :param bytecode: The hex bytecode to compare to the latest known deployment. :returns: If the bytecode differs from the last known deployment. :Example: >>> from solidbyte.deploy.objects import Contract >>> MyContract = Contract('test', '0xdeadbeef00000000000000000000000000000000', { ... 'abi': [], ... 'bytecode': '0x1234...' ... 'name': 'MyContract' ... }, {}, MetaFile()) >>> assert Mycontract.check_needs_deployment('0x2234...') """ if not bytecode: raise DeploymentValidationError("bytecode is required") bytecode_hash = hash_linked_bytecode(bytecode) self.refresh() return ( not self.bytecode_hash or bytecode_hash != self.bytecode_hash )
[docs] def deployed(self, *args, **kwargs): """ Return an instantiated web3.eth.Contract tinstance and deploy the contract if necessary. :param *args: Any args to provide the constructor. :param **kargs: Any kwargs to provide the constructor OR one of the following special kwargs: - gas: The gas limit for the deploy transaction. - gas_price: The gas price to use, in wei. - links: This is a dict of {name,address} of library links for the contract. :returns: If the bytecode differs from the last known deployment. :Example: >>> from solidbyte.deploy.objects import Contract >>> MyContract = Contract('test', '0xdeadbeef00000000000000000000000000000000', { ... 'abi': [], ... 'bytecode': '0x1234...' ... 'name': 'MyContract' ... }, {}, MetaFile()) >>> contract = Mycontract.deploy(links={ 'MyLibrary': '0xdeadbeef00000000000000000000000000000001' }) >>> assert contract.functions.owner().call() == contract.from_account """ if not self.check_needs_deployment(self.source_bytecode): return self._get_web3_contract() try: return self._deploy(*args, **kwargs) except Exception as e: log.exception("Unknown error deploying {}".format(self.name)) raise e
def _process_instances(self, metafile_instances: List[Dict[str, T]]) -> None: """ Process the deployedInstances dict for this contract from the metafile. :param metafile_instances: A list of deployedInstances from metafile.json. """ self.deployments: List[Deployment] = [] last_date = None for inst in metafile_instances: # We want the latest deployed address for the active network if inst.get('date'): this_date = datetime.fromisoformat(inst['date']) if last_date is None or this_date > last_date: last_date = this_date self.deployments.append(Deployment( bytecode_hash=inst.get('hash'), date=this_date, address=inst.get('address'), network=inst.get('network_id'), abi=inst.get('abi'), )) def _load_metafile_contract(self) -> None: """ Process the contract dict for this contract from the metafile. """ metafile_contract = self.metafile.get_contract(self.name) if not metafile_contract: log.debug('No metafile.json entry for contract {}.'.format(self.name)) return if metafile_contract['networks'].get(self.network_id): self.deployedHash = metafile_contract['networks'][self.network_id].get('deployedHash') deployed_instances = metafile_contract['networks'][self.network_id].get( 'deployedInstances' ) if deployed_instances and len(deployed_instances) > 0: self._process_instances( metafile_contract['networks'][self.network_id]['deployedInstances'] ) def _load_artifacts(self) -> None: """ Process the artifact dict from metafile.json. """ source = contract_artifacts(self.name) if source: self.name = source['name'] self.source_abi = source['abi'] self.source_bytecode = normalize_hexstring(source['bytecode']) def _create_deploy_transaction(self, bytecode: str, gas: int, gas_price: int, *args, **kwargs) -> dict: """ Create the transaction to deploy the contract :param bytecode: The bytecode we're deploying. :param gas: The gas limit for the transaction. :param gas_price: The gas price in wei for the transaction. :param *args: Constructor arguments :param **kargs: Constructor keyword arguments :returns: a transaction dict from Web3 """ nonce = self.web3.eth.getTransactionCount(self.from_account) # Create the tx object deploy_tx = create_deploy_tx(self.web3, self.source_abi, bytecode, { 'chainId': int(self.network_id), 'gas': gas, 'gasPrice': gas_price, 'nonce': nonce, 'from': self.from_account, }, *args, **kwargs) return deploy_tx def _transact(self, tx: MultiDict) -> str: """ Execute the deploy transaction :param tx: The transaction dict. :returns: A transaction hash. """ if self.accounts.account_known(self.from_account): # Sign it signed_tx = self.accounts.sign_tx(self.from_account, tx) # Send it try: deploy_txhash = self.web3.eth.sendRawTransaction(signed_tx.rawTransaction) except ( ValidationError, ValueError, ) as err: str_err = str(err) if 'out of gas' in str_err or 'exceeds gas' in str_err: log.error('TX ran out of gas when deploying {}.'.format(self.name)) elif 'cannot afford txn gas' in str_err: log.error('Deployer account unable to afford network fees') raise err else: if int(self.network_id) < MAX_PRODUCTION_NETWORK_ID and not self.web3.is_eth_tester: """ Disabling personal.unlock for official chains. It's not really a secure way to deal with sending transactions. At least on go-ethereum, when you unlock an account, that allows any party that can send a JSON-RPC request to the node to send transactions on that acocunt's behalf. If the machine isn't strictly local or firewalled in some way to prevent malicious parties from communicating with it (rare), that leaves the account easily compromised once unlocked. If you want to disagree or call me an idiot, please do: https://github.com/mikeshultz/solidbyte/issues/32 """ raise DeploymentValidationError(DISABLED_REMOTE_NOTICE) else: tx['from'] = self.from_account if self.web3.is_eth_tester: deploy_txhash = self.web3.eth.sendTransaction(tx) else: passphrase = store.get(store.Keys.DECRYPT_PASSPHRASE) if not passphrase: passphrase = getpass("Enter password to unlock account ({}):".format( self.from_account )) if self.web3.personal.unlockAccount(self.from_account, passphrase, duration=60*5): deploy_txhash = self.web3.eth.sendTransaction(tx) else: raise DeploymentError("Unable to unlock account {}".format( self.from_account )) log.debug("Deployment transaction hash for {}: {}".format(self.name, deploy_txhash.hex())) return deploy_txhash.hex() def _assemble_and_hash_bytecode(self, bytecode: str, links: Optional[dict] = None) -> Tuple[str, str]: """ Link bytecode(if necessary), and hash in a way that links are irrelevant :param bytecode: The bytecode from compiler output :param links: A dict with links(key: contract name, value: deployed address). :returns: A Tuple of the bytecode hash and linked bytecode. """ bytecode_hash = hash_linked_bytecode(bytecode) # Hash before linking if links: bytecode = link_library(bytecode, links) return (bytecode_hash, clean_bytecode(bytecode)) def _deploy(self, *args, **kwargs) -> Web3Contract: """ Deploy the contract :param *args: Any args to provide the constructor. :param **kargs: Any kwargs to provide the constructor OR one of the following special kwargs: - gas: The gas limit for the deploy transaction. - gasPrice: The gas price to use, in wei. - links: This is a dict of {name,address} of library links for the contract. :returns: A instantiated web3.eth.Contract object. """ self.links = pop_key_from_dict(kwargs, 'links') bytecode_hash, bytecode = self._assemble_and_hash_bytecode(self.source_bytecode, self.links) assert len(bytecode_hash) == 66, "Invalid response from linker." # Just in case. Got bit. gas = pop_key_from_dict(kwargs, 'gas') or int(6e6) gas_price = ( pop_key_from_dict(kwargs, 'gasPrice') or pop_key_from_dict(kwargs, 'gas_price') or self.web3.toWei('3', 'gwei') ) assert type(gas) == int and type(gas_price) == int, ( "Invalid gas or gas_price type. Expected (<class 'int'>/<class 'int'>) " "got ({}/{})".format( type(gas), type(gas_price) )) max_fee_wei = gas * gas_price deployer_balance = self.web3.eth.getBalance(self.from_account) log.debug("Max network fee: {} ({} Ether)".format( max_fee_wei, self.web3.fromWei(max_fee_wei, 'ether') )) log.debug("Deployer balance: {} ({})".format(deployer_balance, self.from_account)) if deployer_balance < max_fee_wei: raise DeploymentValidationError( "Deployer account {} under-funded! Has: {} Needed: {}".format( self.from_account, deployer_balance, max_fee_wei, ) ) log.debug("Creating deploy transaction...") deploy_tx = self._create_deploy_transaction(bytecode, gas, gas_price, *args, **kwargs) deploy_txhash = self._transact(deploy_tx) log.info("Sending deploy transaction {} for contract {}. This may take a moment...".format( deploy_txhash, self.name, )) # Wait for it to be mined deploy_receipt = self.web3.eth.waitForTransactionReceipt(deploy_txhash) # Verify all the things if deploy_receipt.status == 0: log.info("Receipt: {}".format(deploy_receipt)) raise DeploymentError("Deploy transaction failed!") log.debug("Contract Deploy Receipt: {}".format(deploy_receipt)) code = self.web3.eth.getCode(deploy_receipt.contractAddress) if not code or code == '0x': raise DeploymentError( "Bytecode for {} not found at address {}. This could mean the node is out " "of sync or that deployment failed for an unknown reason.".format( self.name, deploy_receipt.contractAddress, ) ) log.info("Successfully deployed {}. Transaction has been mined.".format(self.name)) self.deployments.append(Deployment( bytecode_hash=bytecode_hash, date=datetime.now().isoformat(), address=deploy_receipt.contractAddress, network=self.network_id, abi=self.source_abi, )) self.metafile.add(self.name, self.network_id, deploy_receipt.contractAddress, self.source_abi, bytecode_hash) self.new_deployment = True log.debug("Updated metadata for new deployment.") return self._get_web3_contract() def _get_web3_contract(self) -> Web3Contract: """ Instantiate a web3.eth.Contract instance with their factory and return. :returns: A instantiated web3.eth.Contract object. """ assert self.abi is not None, "ABI appears to be missing for contract {}".format(self.name) assert self.address is not None, "Address appears to be missing for contract {}".format( self.name ) return self.web3.eth.contract(abi=self.abi, address=self.address)