from __future__ import annotations
from asyncio import AbstractEventLoop, get_event_loop
from json import JSONDecodeError
from typing import List, Optional, Union
import nest_asyncio
from aiohttp import ClientSession
from multidict import CIMultiDict
from terra_sdk.core import Coins, Dec, Numeric
from terra_sdk.exceptions import LCDResponseError
from terra_sdk.key.key import Key
from terra_sdk.util.json import dict_to_data
from terra_sdk.util.url import urljoin
from .api.auth import AsyncAuthAPI, AuthAPI
from .api.authz import AsyncAuthzAPI, AuthzAPI
from import AsyncBankAPI, BankAPI
from .api.distribution import AsyncDistributionAPI, DistributionAPI
from .api.feegrant import AsyncFeeGrantAPI, FeeGrantAPI
from import AsyncGovAPI, GovAPI
from .api.ibc import AsyncIbcAPI, IbcAPI
from .api.ibc_transfer import AsyncIbcTransferAPI, IbcTransferAPI
from import AsyncMarketAPI, MarketAPI
from import AsyncMintAPI, MintAPI
from import AsyncOracleAPI, OracleAPI
from .api.slashing import AsyncSlashingAPI, SlashingAPI
from .api.staking import AsyncStakingAPI, StakingAPI
from .api.tendermint import AsyncTendermintAPI, TendermintAPI
from .api.treasury import AsyncTreasuryAPI, TreasuryAPI
from .api.tx import AsyncTxAPI, TxAPI
from .api.wasm import AsyncWasmAPI, WasmAPI
from .lcdutils import AsyncLCDUtils, LCDUtils
from .params import APIParams
from .wallet import AsyncWallet, Wallet
def get_default(chain_id: str) -> [Coins, Numeric]:
if chain_id == "columbus-5":
return [Coins.from_str("0.15uusd"), Numeric.parse(1.75)]
if chain_id == "bombay-12":
return [Coins.from_str("0.15uusd"), Numeric.parse(1.75)]
if chain_id == "localterra":
return [Coins.from_str("0.15uusd"), Numeric.parse(1.75)]
raise ValueError("chain_id is invalid")
class AsyncLCDClient:
def __init__(
url: str,
chain_id: Optional[str] = None,
gas_prices: Optional[Coins.Input] = None,
gas_adjustment: Optional[Numeric.Input] = None,
loop: Optional[AbstractEventLoop] = None,
_create_session: bool = True, # don't create a session (used for sync LCDClient)
if loop is None:
loop = get_event_loop()
self.loop = loop
if _create_session:
self.session = ClientSession(
headers={"Accept": "application/json"}, loop=self.loop
self.chain_id = chain_id
self.url = url
self.last_request_height = None
default_price, default_adjustment = get_default(chain_id)
self.gas_prices = Coins(gas_prices) if gas_prices else default_price
self.gas_adjustment = gas_adjustment if gas_adjustment else default_adjustment
self.auth = AsyncAuthAPI(self) = AsyncBankAPI(self)
self.distribution = AsyncDistributionAPI(self)
self.feegrant = AsyncFeeGrantAPI(self) = AsyncGovAPI(self) = AsyncMarketAPI(self) = AsyncMintAPI(self)
self.authz = AsyncAuthzAPI(self) = AsyncOracleAPI(self)
self.slashing = AsyncSlashingAPI(self)
self.staking = AsyncStakingAPI(self)
self.tendermint = AsyncTendermintAPI(self)
self.treasury = AsyncTreasuryAPI(self)
self.wasm = AsyncWasmAPI(self)
self.ibc = AsyncIbcAPI(self)
self.ibc_transfer = AsyncIbcTransferAPI(self)
self.tx = AsyncTxAPI(self)
self.utils = AsyncLCDUtils(self)
def wallet(self, key: Key) -> AsyncWallet:
"""Creates a :class:`AsyncWallet` object from a key.
key (Key): key implementation
return AsyncWallet(self, key)
async def _get(
endpoint: str,
params: Optional[Union[APIParams, CIMultiDict, list, dict]] = None,
# raw: bool = False
if (
and hasattr(params, "to_dict")
and callable(getattr(params, "to_dict"))
params = params.to_dict()
async with self.session.get(
urljoin(self.url, endpoint), params=params
) as response:
result = await response.json(content_type=None)
except JSONDecodeError:
raise LCDResponseError(message=str(response.reason), response=response)
if not 200 <= response.status < 299:
raise LCDResponseError(message=str(result), response=response)
self.last_request_height = (
result.get("height") if result else self.last_request_height
return result # if raw else result["result"]
async def _post(
self, endpoint: str, data: Optional[dict] = None # , raw: bool = False
async with
urljoin(self.url, endpoint), json=data and dict_to_data(data)
) as response:
result = await response.json(content_type=None)
except JSONDecodeError:
raise LCDResponseError(message=str(response.reason), response=response)
if not 200 <= response.status < 299:
raise LCDResponseError(message=result.get("message"), response=response)
self.last_request_height = (
result.get("height") if result else self.last_request_height
return result # if raw else result["result"]
async def _search(
events: List[list],
params: Optional[Union[APIParams, CIMultiDict, list, dict]] = None,
# raw: bool = False
actual_params = CIMultiDict()
for event in events:
if event[0] == "tx.height":
actual_params.add("events", f"{event[0]}={event[1]}")
actual_params.add("events", f"{event[0]}='{event[1]}'")
if params:
for p in params:
actual_params.add(p, params[p])
async with self.session.get(
urljoin(self.url, "/cosmos/tx/v1beta1/txs"), params=actual_params
) as response:
result = await response.json(content_type=None)
except JSONDecodeError:
raise LCDResponseError(message=str(response.reason), response=response)
if not 200 <= response.status < 299:
raise LCDResponseError(message=str(result), response=response)
self.last_request_height = (
result.get("height") if result else self.last_request_height
return result # if raw else result["result"]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.session.close()
[docs]class LCDClient(AsyncLCDClient):
"""An object representing a connection to a node running the Terra LCD server."""
url: str
"""URL endpoint of LCD server."""
chain_id: str
"""Chain ID of blockchain network connecting to."""
gas_prices: Coins.Input
"""Gas prices to use for automatic fee estimation."""
gas_adjustment: Union[str, float, int, Dec]
"""Gas adjustment factor for automatic fee estimation."""
last_request_height: Optional[int] # type: ignore
"""Height of response of last-made made LCD request."""
auth: AuthAPI
bank: BankAPI
distribution: DistributionAPI
gov: GovAPI
feegrant: FeeGrantAPI
market: MarketAPI
mint: MintAPI
authz: AuthzAPI
oracle: OracleAPI
slashing: SlashingAPI
staking: StakingAPI
tendermint: TendermintAPI
treasury: TreasuryAPI
wasm: WasmAPI
tx: TxAPI
ibc: IbcAPI
ibc_transfer: IbcTransferAPI
def __init__(
url: str,
chain_id: str = None,
gas_prices: Optional[Coins.Input] = None,
gas_adjustment: Optional[Numeric.Input] = None,
self.auth = AuthAPI(self) = BankAPI(self)
self.distribution = DistributionAPI(self) = GovAPI(self)
self.feegrant = FeeGrantAPI(self) = MarketAPI(self) = MintAPI(self)
self.authz = AuthzAPI(self) = OracleAPI(self)
self.slashing = SlashingAPI(self)
self.staking = StakingAPI(self)
self.tendermint = TendermintAPI(self)
self.treasury = TreasuryAPI(self)
self.wasm = WasmAPI(self)
self.ibc = IbcAPI(self)
self.ibc_transfer = IbcTransferAPI(self)
self.tx = TxAPI(self)
self.utils = LCDUtils(self)
async def __aenter__(self):
raise NotImplementedError(
"async context manager not implemented - you probably want AsyncLCDClient"
async def __aexit__(self, exc_type, exc, tb):
raise NotImplementedError(
"async context manager not implemented - you probably want AsyncLCDClient"
[docs] def wallet(self, key: Key) -> Wallet: # type: ignore
"""Creates a :class:`Wallet` object from a key for easy transaction creating and
key (Key): key implementation
return Wallet(self, key)
async def _get(self, *args, **kwargs):
# session has to be manually created and torn down for each HTTP request in a
# synchronous client
self.session = ClientSession(
headers={"Accept": "application/json"}, loop=self.loop
result = await super()._get(*args, **kwargs)
await self.session.close()
return result
async def _post(self, *args, **kwargs):
# session has to be manually created and torn down for each HTTP request in a
# synchronous client
self.session = ClientSession(
headers={"Accept": "application/json"}, loop=self.loop
result = await super()._post(*args, **kwargs)
await self.session.close()
return result
async def _search(self, *args, **kwargs):
# session has to be manually created and torn down for each HTTP request in a
# synchronous client
self.session = ClientSession(
headers={"Accept": "application/json"}, loop=self.loop
result = await super()._search(*args, **kwargs)
await self.session.close()
return result