Source code for terra_sdk.key.key

import abc
import copy
from typing import Optional

import attr

from terra_sdk.core import (
    AccAddress,
    AccPubKey,
    ModeInfo,
    ModeInfoSingle,
    SignatureV2,
    SignDoc,
    ValAddress,
    ValPubKey,
)
from terra_sdk.core.bech32 import get_bech
from terra_sdk.core.public_key import (
    PublicKey,
    address_from_public_key,
    amino_pubkey_from_public_key,
)
from terra_sdk.core.signature_v2 import Descriptor
from terra_sdk.core.signature_v2 import Single as SingleDescriptor
from terra_sdk.core.tx import AuthInfo, SignerInfo, SignMode, Tx

__all__ = ["Key", "SignOptions"]


@attr.s
class SignOptions:
    account_number: int = attr.ib(converter=int)
    sequence: int = attr.ib(converter=int)
    sign_mode: SignMode = attr.ib()
    chain_id: str = attr.ib()


[docs]class Key: """Abstract Key interface, representing an agent with transaction-signing capabilities. Args: public_key (Optional[bytes]): compressed public key bytes, """ public_key: Optional[PublicKey] """Compressed public key bytes, used to derive :data:`raw_address` and :data:`raw_pubkey`.""" raw_address: Optional[bytes] """Raw Bech32 words of address, used to derive associated account and validator operator addresses. """ raw_pubkey: Optional[bytes] """Raw Bech32 words of pubkey, used to derive associated account and validator pubkeys. """ def __init__(self, public_key: Optional[PublicKey] = None): self.public_key = public_key if public_key: self.raw_address = address_from_public_key(public_key) self.raw_pubkey = amino_pubkey_from_public_key(public_key)
[docs] @abc.abstractmethod def sign(self, payload: bytes) -> bytes: """Signs the data payload. An implementation of Key is expected to override this method. Args: payload (bytes): arbitrary data payload Raises: NotImplementedError: if not implemented Returns: bytes: signed payload """ raise NotImplementedError("an instance of Key must implement Key.sign")
@property def acc_address(self) -> AccAddress: """Terra Bech32 account address. Default derivation via :data:`public_key` is provided. Raises: ValueError: if Key was not initialized with proper public key Returns: AccAddress: account address """ if not self.raw_address: raise ValueError("could not compute acc_address: missing raw_address") return AccAddress(get_bech("terra", self.raw_address.hex())) @property def val_address(self) -> ValAddress: """Terra Bech32 validator operator address. Default derivation via :data:`public_key` is provided. Raises: ValueError: if Key was not initialized with proper public key Returns: ValAddress: validator operator address """ if not self.raw_address: raise ValueError("could not compute val_address: missing raw_address") return ValAddress(get_bech("terravaloper", self.raw_address.hex())) @property def acc_pubkey(self) -> AccPubKey: """Terra Bech32 account pubkey. Default derivation via :data:`public_key` is provided. Raises: ValueError: if Key was not initialized with proper public key Returns: AccPubKey: account pubkey """ if not self.raw_pubkey: raise ValueError("could not compute acc_pubkey: missing raw_pubkey") return AccPubKey(get_bech("terrapub", self.raw_pubkey.hex())) @property def val_pubkey(self) -> ValPubKey: """Terra Bech32 validator pubkey. Default derivation via ``public_key`` is provided. Raises: ValueError: if Key was not initialized with proper public key Returns: ValPubKey: validator pubkey """ if not self.raw_pubkey: raise ValueError("could not compute val_pubkey: missing raw_pubkey") return ValPubKey(get_bech("terravaloperpub", self.raw_pubkey.hex())) def create_signature_amino(self, sign_doc: SignDoc) -> SignatureV2: if self.public_key is None: raise ValueError( "signature could not be created: Key instance missing public_key" ) return SignatureV2( public_key=self.public_key, data=Descriptor( SingleDescriptor( mode=SignMode.SIGN_MODE_LEGACY_AMINO_JSON, signature=(self.sign(sign_doc.to_amino_json())) # signature=base64.b64encode(self.sign(sign_doc.to_amino_json())) ) ), sequence=sign_doc.sequence, )
[docs] def create_signature(self, sign_doc: SignDoc) -> SignatureV2: """Signs the transaction with the signing algorithm provided by this Key implementation, and outputs the signature. The signature is only returned, and must be manually added to the ``signatures`` field of an :class:`Tx`. Args: sign_doc (SignDoc): unsigned transaction Raises: ValueError: if missing ``public_key`` Returns: SignatureV2: signature object """ if self.public_key is None: raise ValueError( "signature could not be created: Key instance missing public_key" ) # make backup si_backup = copy.deepcopy(sign_doc.auth_info.signer_infos) sign_doc.auth_info.signer_infos = [ SignerInfo( public_key=self.public_key, sequence=sign_doc.sequence, mode_info=ModeInfo( single=ModeInfoSingle(mode=SignMode.SIGN_MODE_DIRECT) ), ) ] signature = self.sign(sign_doc.to_bytes()) # restore sign_doc.auth_info.signer_infos = si_backup return SignatureV2( public_key=self.public_key, data=Descriptor( single=SingleDescriptor( mode=SignMode.SIGN_MODE_DIRECT, signature=signature ) ), sequence=sign_doc.sequence, )
[docs] def sign_tx(self, tx: Tx, options: SignOptions) -> Tx: """Signs the transaction with the signing algorithm provided by this Key implementation, and creates a ready-to-broadcast :class:`Tx` object with the signature applied. Args: tx (Tx): unsigned transaction options (SignOptions): options for signing Returns: Tx: ready-to-broadcast transaction object """ signedTx = Tx( body=tx.body, auth_info=AuthInfo(signer_infos=[], fee=tx.auth_info.fee), signatures=[], ) signDoc = SignDoc( chain_id=options.chain_id, account_number=options.account_number, sequence=options.sequence, auth_info=signedTx.auth_info, tx_body=signedTx.body, ) if options.sign_mode == SignMode.SIGN_MODE_LEGACY_AMINO_JSON: signature: SignatureV2 = self.create_signature_amino(signDoc) else: signature: SignatureV2 = self.create_signature(signDoc) sigData: SingleDescriptor = signature.data.single for sig in tx.signatures: signedTx.signatures.append(sig) signedTx.signatures.append(sigData.signature) for infos in tx.auth_info.signer_infos: signedTx.auth_info.signer_infos.append(infos) signedTx.auth_info.signer_infos.append( SignerInfo( public_key=signature.public_key, sequence=signature.sequence, mode_info=ModeInfo(single=ModeInfoSingle(mode=sigData.mode)), ) ) return signedTx