Skip to content

Websocket Client

solana.rpc.websocket_api

This module contains code for interacting with the RPC Websocket endpoint.

SolanaWsClientProtocol

Subclass of websockets.WebSocketClientProtocol tailored for Solana RPC websockets.

Source code in solana/rpc/websocket_api.py
class SolanaWsClientProtocol(WebSocketClientProtocol):
    """Subclass of `websockets.WebSocketClientProtocol` tailored for Solana RPC websockets."""

    def __init__(self, *args, **kwargs):
        """Init. Args and kwargs are passed to `websockets.WebSocketClientProtocol`."""
        super().__init__(*args, **kwargs)
        self.subscriptions: Dict[int, Body] = {}
        self.sent_subscriptions: Dict[int, Body] = {}
        self.failed_subscriptions = {}
        self.request_counter = itertools.count()

    def increment_counter_and_get_id(self) -> int:
        """Increment self.request_counter and return the latest id."""
        return next(self.request_counter) + 1

    async def send_data(self, message: Union[Body, List[Body]]) -> None:
        """Send a subscribe/unsubscribe request or list of requests.

        Basically `.send` from `websockets` with extra parsing.

        Args:
            message: The request(s) to send.
        """
        if isinstance(message, list):
            to_send = batch_to_json(message)
            for req in message:
                self.sent_subscriptions[req.id] = req
        else:
            to_send = message.to_json()
            self.sent_subscriptions[message.id] = message
        await super().send(to_send)  # type: ignore

    async def recv(  # type: ignore
        self,
    ) -> List[Union[Notification, SubscriptionResult]]:
        """Receive the next message.

        Basically `.recv` from `websockets` with extra parsing.
        """
        data = await super().recv()
        return self._process_rpc_response(cast(str, data))

    async def account_subscribe(
        self,
        pubkey: Pubkey,
        commitment: Optional[Commitment] = None,
        encoding: Optional[str] = None,
    ) -> None:
        """Subscribe to an account to receive notifications when the lamports or data change.

        Args:
            pubkey: Account pubkey.
            commitment: Commitment level.
            encoding: Encoding to use.
        """
        req_id = self.increment_counter_and_get_id()
        commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
        encoding_to_use = None if encoding is None else _ACCOUNT_ENCODING_TO_SOLDERS[encoding]
        config = (
            None
            if commitment_to_use is None and encoding_to_use is None
            else RpcAccountInfoConfig(encoding=encoding_to_use, commitment=commitment_to_use)
        )
        req = AccountSubscribe(pubkey, config, req_id)
        await self.send_data(req)

    async def account_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from account notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = AccountUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def logs_subscribe(
        self,
        filter_: Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions] = RpcTransactionLogsFilter.All,
        commitment: Optional[Commitment] = None,
    ) -> None:
        """Subscribe to transaction logging.

        Args:
            filter_: filter criteria for the logs.
            commitment: The commitment level to use.
        """
        req_id = self.increment_counter_and_get_id()
        commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
        config = RpcTransactionLogsConfig(commitment_to_use)
        req = LogsSubscribe(filter_, config, req_id)
        await self.send_data(req)

    async def logs_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from transaction logging.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = LogsUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def block_subscribe(
        self,
        filter_: Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions] = RpcBlockSubscribeFilter.All,
        commitment: Optional[Commitment] = None,
        encoding: Optional[str] = None,
        transaction_details: Union[TransactionDetails, None] = None,
        show_rewards: Optional[bool] = None,
        max_supported_transaction_version: Optional[int] = None,
    ) -> None:
        """Subscribe to blocks.

        Args:
            filter_: filter criteria for the blocks.
            commitment: The commitment level to use.
            encoding: Encoding to use.
            transaction_details: level of transaction detail to return.
            show_rewards: whether to populate the rewards array.
            max_supported_transaction_version: the max transaction version to return in responses.
        """
        req_id = self.increment_counter_and_get_id()
        commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
        encoding_to_use = None if encoding is None else _TX_ENCODING_TO_SOLDERS[encoding]
        config = RpcBlockSubscribeConfig(
            commitment=commitment_to_use,
            encoding=encoding_to_use,
            transaction_details=transaction_details,
            show_rewards=show_rewards,
            max_supported_transaction_version=max_supported_transaction_version,
        )
        req = BlockSubscribe(filter_, config, req_id)
        await self.send_data(req)

    async def block_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from blocks.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = BlockUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def program_subscribe(  # pylint: disable=too-many-arguments
        self,
        program_id: Pubkey,
        commitment: Optional[Commitment] = None,
        encoding: Optional[str] = None,
        data_slice: Optional[types.DataSliceOpts] = None,
        filters: Optional[Sequence[Union[int, types.MemcmpOpts]]] = None,
    ) -> None:
        """Receive notifications when the lamports or data for a given account owned by the program changes.

        Args:
            program_id: The program ID.
            commitment: Commitment level to use.
            encoding: Encoding to use.
            data_slice: (optional) Limit the returned account data using the provided `offset`: <usize> and
            `   length`: <usize> fields; only available for "base58" or "base64" encoding.
            filters: (optional) Options to compare a provided series of bytes with program account data at a particular offset.
                Note: an int entry is converted to a `dataSize` filter.
        """  # noqa: E501 # pylint: disable=line-too-long
        req_id = self.increment_counter_and_get_id()
        if commitment is None and encoding is None and data_slice is None and filters is None:
            config = None
        else:
            encoding_to_use = None if encoding is None else _ACCOUNT_ENCODING_TO_SOLDERS[encoding]
            commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
            data_slice_to_use = (
                None if data_slice is None else UiDataSliceConfig(offset=data_slice.offset, length=data_slice.length)
            )
            account_config = RpcAccountInfoConfig(
                encoding=encoding_to_use,
                commitment=commitment_to_use,
                data_slice=data_slice_to_use,
            )
            filters_to_use: Optional[List[Union[int, Memcmp]]] = (
                None if filters is None else [x if isinstance(x, int) else Memcmp(*x) for x in filters]
            )
            config = RpcProgramAccountsConfig(account_config, filters_to_use)
        req = ProgramSubscribe(program_id, config, req_id)
        await self.send_data(req)

    async def program_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from program account notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = ProgramUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def signature_subscribe(
        self,
        signature: Signature,
        commitment: Optional[Commitment] = None,
    ) -> None:
        """Subscribe to a transaction signature to receive notification when the transaction is confirmed.

        Args:
            signature: The transaction signature to subscribe to.
            commitment: Commitment level.
        """
        req_id = self.increment_counter_and_get_id()
        commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
        config = None if commitment_to_use is None else RpcSignatureSubscribeConfig(commitment=commitment_to_use)
        req = SignatureSubscribe(signature, config, req_id)
        await self.send_data(req)

    async def signature_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from signature notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = SignatureUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def slot_subscribe(self) -> None:
        """Subscribe to receive notification anytime a slot is processed by the validator."""
        req_id = self.increment_counter_and_get_id()
        req = SlotSubscribe(req_id)
        await self.send_data(req)

    async def slot_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from slot notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = SlotUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def slots_updates_subscribe(self) -> None:
        """Subscribe to receive a notification from the validator on a variety of updates on every slot."""
        req_id = self.increment_counter_and_get_id()
        req = SlotsUpdatesSubscribe(req_id)
        await self.send_data(req)

    async def slots_updates_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from slot update notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = SlotsUpdatesUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def root_subscribe(self) -> None:
        """Subscribe to receive notification anytime a new root is set by the validator."""
        req_id = self.increment_counter_and_get_id()
        req = RootSubscribe(req_id)
        await self.send_data(req)

    async def root_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from root notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = RootUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    async def vote_subscribe(self) -> None:
        """Subscribe to receive notification anytime a new vote is observed in gossip."""
        req_id = self.increment_counter_and_get_id()
        req = VoteSubscribe(req_id)
        await self.send_data(req)

    async def vote_unsubscribe(
        self,
        subscription: int,
    ) -> None:
        """Unsubscribe from vote notifications.

        Args:
            subscription: ID of subscription to cancel.
        """
        req_id = self.increment_counter_and_get_id()
        req = VoteUnsubscribe(subscription, req_id)
        await self.send_data(req)
        del self.subscriptions[subscription]

    def _process_rpc_response(self, raw: str) -> List[Union[Notification, SubscriptionResult]]:
        parsed = parse_websocket_message(raw)
        for item in parsed:
            if isinstance(item, SoldersSubscriptionError):
                subscription = self.sent_subscriptions[item.id]
                self.failed_subscriptions[item.id] = subscription
                raise SubscriptionError(item, subscription)
            if isinstance(item, SubscriptionResult):
                self.subscriptions[item.result] = self.sent_subscriptions[item.id]
        return cast(List[Union[Notification, SubscriptionResult]], parsed)

__init__(self, *args, **kwargs) special

Init. Args and kwargs are passed to websockets.WebSocketClientProtocol.

Source code in solana/rpc/websocket_api.py
def __init__(self, *args, **kwargs):
    """Init. Args and kwargs are passed to `websockets.WebSocketClientProtocol`."""
    super().__init__(*args, **kwargs)
    self.subscriptions: Dict[int, Body] = {}
    self.sent_subscriptions: Dict[int, Body] = {}
    self.failed_subscriptions = {}
    self.request_counter = itertools.count()

account_subscribe(self, pubkey, commitment=None, encoding=None) async

Subscribe to an account to receive notifications when the lamports or data change.

Parameters:

Name Type Description Default
pubkey Pubkey

Account pubkey.

required
commitment Optional[Commitment]

Commitment level.

None
encoding Optional[str]

Encoding to use.

None
Source code in solana/rpc/websocket_api.py
async def account_subscribe(
    self,
    pubkey: Pubkey,
    commitment: Optional[Commitment] = None,
    encoding: Optional[str] = None,
) -> None:
    """Subscribe to an account to receive notifications when the lamports or data change.

    Args:
        pubkey: Account pubkey.
        commitment: Commitment level.
        encoding: Encoding to use.
    """
    req_id = self.increment_counter_and_get_id()
    commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
    encoding_to_use = None if encoding is None else _ACCOUNT_ENCODING_TO_SOLDERS[encoding]
    config = (
        None
        if commitment_to_use is None and encoding_to_use is None
        else RpcAccountInfoConfig(encoding=encoding_to_use, commitment=commitment_to_use)
    )
    req = AccountSubscribe(pubkey, config, req_id)
    await self.send_data(req)

account_unsubscribe(self, subscription) async

Unsubscribe from account notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def account_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from account notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = AccountUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

block_subscribe(self, filter_=RpcBlockSubscribeFilter.All, commitment=None, encoding=None, transaction_details=None, show_rewards=None, max_supported_transaction_version=None) async

Subscribe to blocks.

Parameters:

Name Type Description Default
filter_ Union[solders.rpc.config.RpcBlockSubscribeFilter, solders.rpc.config.RpcBlockSubscribeFilterMentions]

filter criteria for the blocks.

RpcBlockSubscribeFilter.All
commitment Optional[Commitment]

The commitment level to use.

None
encoding Optional[str]

Encoding to use.

None
transaction_details Optional[solders.transaction_status.TransactionDetails]

level of transaction detail to return.

None
show_rewards Optional[bool]

whether to populate the rewards array.

None
max_supported_transaction_version Optional[int]

the max transaction version to return in responses.

None
Source code in solana/rpc/websocket_api.py
async def block_subscribe(
    self,
    filter_: Union[RpcBlockSubscribeFilter, RpcBlockSubscribeFilterMentions] = RpcBlockSubscribeFilter.All,
    commitment: Optional[Commitment] = None,
    encoding: Optional[str] = None,
    transaction_details: Union[TransactionDetails, None] = None,
    show_rewards: Optional[bool] = None,
    max_supported_transaction_version: Optional[int] = None,
) -> None:
    """Subscribe to blocks.

    Args:
        filter_: filter criteria for the blocks.
        commitment: The commitment level to use.
        encoding: Encoding to use.
        transaction_details: level of transaction detail to return.
        show_rewards: whether to populate the rewards array.
        max_supported_transaction_version: the max transaction version to return in responses.
    """
    req_id = self.increment_counter_and_get_id()
    commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
    encoding_to_use = None if encoding is None else _TX_ENCODING_TO_SOLDERS[encoding]
    config = RpcBlockSubscribeConfig(
        commitment=commitment_to_use,
        encoding=encoding_to_use,
        transaction_details=transaction_details,
        show_rewards=show_rewards,
        max_supported_transaction_version=max_supported_transaction_version,
    )
    req = BlockSubscribe(filter_, config, req_id)
    await self.send_data(req)

block_unsubscribe(self, subscription) async

Unsubscribe from blocks.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def block_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from blocks.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = BlockUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

increment_counter_and_get_id(self)

Increment self.request_counter and return the latest id.

Source code in solana/rpc/websocket_api.py
def increment_counter_and_get_id(self) -> int:
    """Increment self.request_counter and return the latest id."""
    return next(self.request_counter) + 1

logs_subscribe(self, filter_=RpcTransactionLogsFilter.All, commitment=None) async

Subscribe to transaction logging.

Parameters:

Name Type Description Default
filter_ Union[solders.rpc.config.RpcTransactionLogsFilter, solders.rpc.config.RpcTransactionLogsFilterMentions]

filter criteria for the logs.

RpcTransactionLogsFilter.All
commitment Optional[Commitment]

The commitment level to use.

None
Source code in solana/rpc/websocket_api.py
async def logs_subscribe(
    self,
    filter_: Union[RpcTransactionLogsFilter, RpcTransactionLogsFilterMentions] = RpcTransactionLogsFilter.All,
    commitment: Optional[Commitment] = None,
) -> None:
    """Subscribe to transaction logging.

    Args:
        filter_: filter criteria for the logs.
        commitment: The commitment level to use.
    """
    req_id = self.increment_counter_and_get_id()
    commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
    config = RpcTransactionLogsConfig(commitment_to_use)
    req = LogsSubscribe(filter_, config, req_id)
    await self.send_data(req)

logs_unsubscribe(self, subscription) async

Unsubscribe from transaction logging.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def logs_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from transaction logging.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = LogsUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

program_subscribe(self, program_id, commitment=None, encoding=None, data_slice=None, filters=None) async

Receive notifications when the lamports or data for a given account owned by the program changes.

Parameters:

Name Type Description Default
program_id Pubkey

The program ID.

required
commitment Optional[Commitment]

Commitment level to use.

None
encoding Optional[str]

Encoding to use.

None
data_slice Optional[solana.rpc.types.DataSliceOpts]

(optional) Limit the returned account data using the provided offset: and

None
` length`

fields; only available for "base58" or "base64" encoding.

required
filters Optional[Sequence[Union[int, solana.rpc.types.MemcmpOpts]]]

(optional) Options to compare a provided series of bytes with program account data at a particular offset. Note: an int entry is converted to a dataSize filter.

None
Source code in solana/rpc/websocket_api.py
async def program_subscribe(  # pylint: disable=too-many-arguments
    self,
    program_id: Pubkey,
    commitment: Optional[Commitment] = None,
    encoding: Optional[str] = None,
    data_slice: Optional[types.DataSliceOpts] = None,
    filters: Optional[Sequence[Union[int, types.MemcmpOpts]]] = None,
) -> None:
    """Receive notifications when the lamports or data for a given account owned by the program changes.

    Args:
        program_id: The program ID.
        commitment: Commitment level to use.
        encoding: Encoding to use.
        data_slice: (optional) Limit the returned account data using the provided `offset`: <usize> and
        `   length`: <usize> fields; only available for "base58" or "base64" encoding.
        filters: (optional) Options to compare a provided series of bytes with program account data at a particular offset.
            Note: an int entry is converted to a `dataSize` filter.
    """  # noqa: E501 # pylint: disable=line-too-long
    req_id = self.increment_counter_and_get_id()
    if commitment is None and encoding is None and data_slice is None and filters is None:
        config = None
    else:
        encoding_to_use = None if encoding is None else _ACCOUNT_ENCODING_TO_SOLDERS[encoding]
        commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
        data_slice_to_use = (
            None if data_slice is None else UiDataSliceConfig(offset=data_slice.offset, length=data_slice.length)
        )
        account_config = RpcAccountInfoConfig(
            encoding=encoding_to_use,
            commitment=commitment_to_use,
            data_slice=data_slice_to_use,
        )
        filters_to_use: Optional[List[Union[int, Memcmp]]] = (
            None if filters is None else [x if isinstance(x, int) else Memcmp(*x) for x in filters]
        )
        config = RpcProgramAccountsConfig(account_config, filters_to_use)
    req = ProgramSubscribe(program_id, config, req_id)
    await self.send_data(req)

program_unsubscribe(self, subscription) async

Unsubscribe from program account notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def program_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from program account notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = ProgramUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

recv(self) async

Receive the next message.

Basically .recv from websockets with extra parsing.

Source code in solana/rpc/websocket_api.py
async def recv(  # type: ignore
    self,
) -> List[Union[Notification, SubscriptionResult]]:
    """Receive the next message.

    Basically `.recv` from `websockets` with extra parsing.
    """
    data = await super().recv()
    return self._process_rpc_response(cast(str, data))

root_subscribe(self) async

Subscribe to receive notification anytime a new root is set by the validator.

Source code in solana/rpc/websocket_api.py
async def root_subscribe(self) -> None:
    """Subscribe to receive notification anytime a new root is set by the validator."""
    req_id = self.increment_counter_and_get_id()
    req = RootSubscribe(req_id)
    await self.send_data(req)

root_unsubscribe(self, subscription) async

Unsubscribe from root notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def root_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from root notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = RootUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

send_data(self, message) async

Send a subscribe/unsubscribe request or list of requests.

Basically .send from websockets with extra parsing.

Parameters:

Name Type Description Default
message Union[solders.rpc.requests.GetAccountInfo, solders.rpc.requests.GetBalance, solders.rpc.requests.GetBlock, solders.rpc.requests.GetBlockHeight, solders.rpc.requests.GetBlockProduction, solders.rpc.requests.GetBlockCommitment, solders.rpc.requests.GetBlocks, solders.rpc.requests.GetBlocksWithLimit, solders.rpc.requests.GetBlockTime, solders.rpc.requests.GetClusterNodes, solders.rpc.requests.GetEpochInfo, solders.rpc.requests.GetEpochSchedule, solders.rpc.requests.GetFeeForMessage, solders.rpc.requests.GetFirstAvailableBlock, solders.rpc.requests.GetGenesisHash, solders.rpc.requests.GetHealth, solders.rpc.requests.GetHighestSnapshotSlot, solders.rpc.requests.GetIdentity, solders.rpc.requests.GetInflationGovernor, solders.rpc.requests.GetInflationRate, solders.rpc.requests.GetInflationReward, solders.rpc.requests.GetLargestAccounts, solders.rpc.requests.GetLatestBlockhash, solders.rpc.requests.GetLeaderSchedule, solders.rpc.requests.GetMaxRetransmitSlot, solders.rpc.requests.GetMaxShredInsertSlot, solders.rpc.requests.GetMinimumBalanceForRentExemption, solders.rpc.requests.GetMultipleAccounts, solders.rpc.requests.GetProgramAccounts, solders.rpc.requests.GetRecentPerformanceSamples, solders.rpc.requests.GetSignaturesForAddress, solders.rpc.requests.GetSignatureStatuses, solders.rpc.requests.GetSlot, solders.rpc.requests.GetSlotLeader, solders.rpc.requests.GetSlotLeaders, solders.rpc.requests.GetStakeActivation, solders.rpc.requests.GetSupply, solders.rpc.requests.GetTokenAccountBalance, solders.rpc.requests.GetTokenAccountsByDelegate, solders.rpc.requests.GetTokenAccountsByOwner, solders.rpc.requests.GetTokenLargestAccounts, solders.rpc.requests.GetTokenSupply, solders.rpc.requests.GetTransaction, solders.rpc.requests.GetTransactionCount, solders.rpc.requests.GetVersion, solders.rpc.requests.GetVoteAccounts, solders.rpc.requests.IsBlockhashValid, solders.rpc.requests.MinimumLedgerSlot, solders.rpc.requests.RequestAirdrop, solders.rpc.requests.SendRawTransaction, solders.rpc.requests.SendLegacyTransaction, solders.rpc.requests.ValidatorExit, solders.rpc.requests.AccountSubscribe, solders.rpc.requests.BlockSubscribe, solders.rpc.requests.LogsSubscribe, solders.rpc.requests.ProgramSubscribe, solders.rpc.requests.SignatureSubscribe, solders.rpc.requests.SlotSubscribe, solders.rpc.requests.SlotsUpdatesSubscribe, solders.rpc.requests.RootSubscribe, solders.rpc.requests.VoteSubscribe, solders.rpc.requests.AccountUnsubscribe, solders.rpc.requests.BlockUnsubscribe, solders.rpc.requests.LogsUnsubscribe, solders.rpc.requests.ProgramUnsubscribe, solders.rpc.requests.SignatureUnsubscribe, solders.rpc.requests.SimulateLegacyTransaction, solders.rpc.requests.SlotUnsubscribe, solders.rpc.requests.SlotsUpdatesUnsubscribe, solders.rpc.requests.RootUnsubscribe, solders.rpc.requests.VoteUnsubscribe, List[Union[solders.rpc.requests.GetAccountInfo, solders.rpc.requests.GetBalance, solders.rpc.requests.GetBlock, solders.rpc.requests.GetBlockHeight, solders.rpc.requests.GetBlockProduction, solders.rpc.requests.GetBlockCommitment, solders.rpc.requests.GetBlocks, solders.rpc.requests.GetBlocksWithLimit, solders.rpc.requests.GetBlockTime, solders.rpc.requests.GetClusterNodes, solders.rpc.requests.GetEpochInfo, solders.rpc.requests.GetEpochSchedule, solders.rpc.requests.GetFeeForMessage, solders.rpc.requests.GetFirstAvailableBlock, solders.rpc.requests.GetGenesisHash, solders.rpc.requests.GetHealth, solders.rpc.requests.GetHighestSnapshotSlot, solders.rpc.requests.GetIdentity, solders.rpc.requests.GetInflationGovernor, solders.rpc.requests.GetInflationRate, solders.rpc.requests.GetInflationReward, solders.rpc.requests.GetLargestAccounts, solders.rpc.requests.GetLatestBlockhash, solders.rpc.requests.GetLeaderSchedule, solders.rpc.requests.GetMaxRetransmitSlot, solders.rpc.requests.GetMaxShredInsertSlot, solders.rpc.requests.GetMinimumBalanceForRentExemption, solders.rpc.requests.GetMultipleAccounts, solders.rpc.requests.GetProgramAccounts, solders.rpc.requests.GetRecentPerformanceSamples, solders.rpc.requests.GetSignaturesForAddress, solders.rpc.requests.GetSignatureStatuses, solders.rpc.requests.GetSlot, solders.rpc.requests.GetSlotLeader, solders.rpc.requests.GetSlotLeaders, solders.rpc.requests.GetStakeActivation, solders.rpc.requests.GetSupply, solders.rpc.requests.GetTokenAccountBalance, solders.rpc.requests.GetTokenAccountsByDelegate, solders.rpc.requests.GetTokenAccountsByOwner, solders.rpc.requests.GetTokenLargestAccounts, solders.rpc.requests.GetTokenSupply, solders.rpc.requests.GetTransaction, solders.rpc.requests.GetTransactionCount, solders.rpc.requests.GetVersion, solders.rpc.requests.GetVoteAccounts, solders.rpc.requests.IsBlockhashValid, solders.rpc.requests.MinimumLedgerSlot, solders.rpc.requests.RequestAirdrop, solders.rpc.requests.SendRawTransaction, solders.rpc.requests.SendLegacyTransaction, solders.rpc.requests.ValidatorExit, solders.rpc.requests.AccountSubscribe, solders.rpc.requests.BlockSubscribe, solders.rpc.requests.LogsSubscribe, solders.rpc.requests.ProgramSubscribe, solders.rpc.requests.SignatureSubscribe, solders.rpc.requests.SlotSubscribe, solders.rpc.requests.SlotsUpdatesSubscribe, solders.rpc.requests.RootSubscribe, solders.rpc.requests.VoteSubscribe, solders.rpc.requests.AccountUnsubscribe, solders.rpc.requests.BlockUnsubscribe, solders.rpc.requests.LogsUnsubscribe, solders.rpc.requests.ProgramUnsubscribe, solders.rpc.requests.SignatureUnsubscribe, solders.rpc.requests.SimulateLegacyTransaction, solders.rpc.requests.SlotUnsubscribe, solders.rpc.requests.SlotsUpdatesUnsubscribe, solders.rpc.requests.RootUnsubscribe, solders.rpc.requests.VoteUnsubscribe]]]

The request(s) to send.

required
Source code in solana/rpc/websocket_api.py
async def send_data(self, message: Union[Body, List[Body]]) -> None:
    """Send a subscribe/unsubscribe request or list of requests.

    Basically `.send` from `websockets` with extra parsing.

    Args:
        message: The request(s) to send.
    """
    if isinstance(message, list):
        to_send = batch_to_json(message)
        for req in message:
            self.sent_subscriptions[req.id] = req
    else:
        to_send = message.to_json()
        self.sent_subscriptions[message.id] = message
    await super().send(to_send)  # type: ignore

signature_subscribe(self, signature, commitment=None) async

Subscribe to a transaction signature to receive notification when the transaction is confirmed.

Parameters:

Name Type Description Default
signature Signature

The transaction signature to subscribe to.

required
commitment Optional[Commitment]

Commitment level.

None
Source code in solana/rpc/websocket_api.py
async def signature_subscribe(
    self,
    signature: Signature,
    commitment: Optional[Commitment] = None,
) -> None:
    """Subscribe to a transaction signature to receive notification when the transaction is confirmed.

    Args:
        signature: The transaction signature to subscribe to.
        commitment: Commitment level.
    """
    req_id = self.increment_counter_and_get_id()
    commitment_to_use = None if commitment is None else _COMMITMENT_TO_SOLDERS[commitment]
    config = None if commitment_to_use is None else RpcSignatureSubscribeConfig(commitment=commitment_to_use)
    req = SignatureSubscribe(signature, config, req_id)
    await self.send_data(req)

signature_unsubscribe(self, subscription) async

Unsubscribe from signature notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def signature_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from signature notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = SignatureUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

slot_subscribe(self) async

Subscribe to receive notification anytime a slot is processed by the validator.

Source code in solana/rpc/websocket_api.py
async def slot_subscribe(self) -> None:
    """Subscribe to receive notification anytime a slot is processed by the validator."""
    req_id = self.increment_counter_and_get_id()
    req = SlotSubscribe(req_id)
    await self.send_data(req)

slot_unsubscribe(self, subscription) async

Unsubscribe from slot notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def slot_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from slot notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = SlotUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

slots_updates_subscribe(self) async

Subscribe to receive a notification from the validator on a variety of updates on every slot.

Source code in solana/rpc/websocket_api.py
async def slots_updates_subscribe(self) -> None:
    """Subscribe to receive a notification from the validator on a variety of updates on every slot."""
    req_id = self.increment_counter_and_get_id()
    req = SlotsUpdatesSubscribe(req_id)
    await self.send_data(req)

slots_updates_unsubscribe(self, subscription) async

Unsubscribe from slot update notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def slots_updates_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from slot update notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = SlotsUpdatesUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

vote_subscribe(self) async

Subscribe to receive notification anytime a new vote is observed in gossip.

Source code in solana/rpc/websocket_api.py
async def vote_subscribe(self) -> None:
    """Subscribe to receive notification anytime a new vote is observed in gossip."""
    req_id = self.increment_counter_and_get_id()
    req = VoteSubscribe(req_id)
    await self.send_data(req)

vote_unsubscribe(self, subscription) async

Unsubscribe from vote notifications.

Parameters:

Name Type Description Default
subscription int

ID of subscription to cancel.

required
Source code in solana/rpc/websocket_api.py
async def vote_unsubscribe(
    self,
    subscription: int,
) -> None:
    """Unsubscribe from vote notifications.

    Args:
        subscription: ID of subscription to cancel.
    """
    req_id = self.increment_counter_and_get_id()
    req = VoteUnsubscribe(subscription, req_id)
    await self.send_data(req)
    del self.subscriptions[subscription]

SubscriptionError

Raise when subscribing to an RPC feed fails.

Source code in solana/rpc/websocket_api.py
class SubscriptionError(Exception):
    """Raise when subscribing to an RPC feed fails."""

    def __init__(self, err: SoldersSubscriptionError, subscription: Body) -> None:
        """Init.

        Args:
            err: The RPC error object.
            subscription: The subscription message that caused the error.

        """
        self.type = err.error.__class__
        self.msg: str = err.error.message  # type: ignore #  TODO: narrow this union type
        self.subscription = subscription
        super().__init__(f"{self.type.__name__}: {self.msg}\n Caused by subscription: {subscription}")

__init__(self, err, subscription) special

Init.

Parameters:

Name Type Description Default
err SubscriptionError

The RPC error object.

required
subscription Union[solders.rpc.requests.GetAccountInfo, solders.rpc.requests.GetBalance, solders.rpc.requests.GetBlock, solders.rpc.requests.GetBlockHeight, solders.rpc.requests.GetBlockProduction, solders.rpc.requests.GetBlockCommitment, solders.rpc.requests.GetBlocks, solders.rpc.requests.GetBlocksWithLimit, solders.rpc.requests.GetBlockTime, solders.rpc.requests.GetClusterNodes, solders.rpc.requests.GetEpochInfo, solders.rpc.requests.GetEpochSchedule, solders.rpc.requests.GetFeeForMessage, solders.rpc.requests.GetFirstAvailableBlock, solders.rpc.requests.GetGenesisHash, solders.rpc.requests.GetHealth, solders.rpc.requests.GetHighestSnapshotSlot, solders.rpc.requests.GetIdentity, solders.rpc.requests.GetInflationGovernor, solders.rpc.requests.GetInflationRate, solders.rpc.requests.GetInflationReward, solders.rpc.requests.GetLargestAccounts, solders.rpc.requests.GetLatestBlockhash, solders.rpc.requests.GetLeaderSchedule, solders.rpc.requests.GetMaxRetransmitSlot, solders.rpc.requests.GetMaxShredInsertSlot, solders.rpc.requests.GetMinimumBalanceForRentExemption, solders.rpc.requests.GetMultipleAccounts, solders.rpc.requests.GetProgramAccounts, solders.rpc.requests.GetRecentPerformanceSamples, solders.rpc.requests.GetSignaturesForAddress, solders.rpc.requests.GetSignatureStatuses, solders.rpc.requests.GetSlot, solders.rpc.requests.GetSlotLeader, solders.rpc.requests.GetSlotLeaders, solders.rpc.requests.GetStakeActivation, solders.rpc.requests.GetSupply, solders.rpc.requests.GetTokenAccountBalance, solders.rpc.requests.GetTokenAccountsByDelegate, solders.rpc.requests.GetTokenAccountsByOwner, solders.rpc.requests.GetTokenLargestAccounts, solders.rpc.requests.GetTokenSupply, solders.rpc.requests.GetTransaction, solders.rpc.requests.GetTransactionCount, solders.rpc.requests.GetVersion, solders.rpc.requests.GetVoteAccounts, solders.rpc.requests.IsBlockhashValid, solders.rpc.requests.MinimumLedgerSlot, solders.rpc.requests.RequestAirdrop, solders.rpc.requests.SendRawTransaction, solders.rpc.requests.SendLegacyTransaction, solders.rpc.requests.ValidatorExit, solders.rpc.requests.AccountSubscribe, solders.rpc.requests.BlockSubscribe, solders.rpc.requests.LogsSubscribe, solders.rpc.requests.ProgramSubscribe, solders.rpc.requests.SignatureSubscribe, solders.rpc.requests.SlotSubscribe, solders.rpc.requests.SlotsUpdatesSubscribe, solders.rpc.requests.RootSubscribe, solders.rpc.requests.VoteSubscribe, solders.rpc.requests.AccountUnsubscribe, solders.rpc.requests.BlockUnsubscribe, solders.rpc.requests.LogsUnsubscribe, solders.rpc.requests.ProgramUnsubscribe, solders.rpc.requests.SignatureUnsubscribe, solders.rpc.requests.SimulateLegacyTransaction, solders.rpc.requests.SlotUnsubscribe, solders.rpc.requests.SlotsUpdatesUnsubscribe, solders.rpc.requests.RootUnsubscribe, solders.rpc.requests.VoteUnsubscribe]

The subscription message that caused the error.

required
Source code in solana/rpc/websocket_api.py
def __init__(self, err: SoldersSubscriptionError, subscription: Body) -> None:
    """Init.

    Args:
        err: The RPC error object.
        subscription: The subscription message that caused the error.

    """
    self.type = err.error.__class__
    self.msg: str = err.error.message  # type: ignore #  TODO: narrow this union type
    self.subscription = subscription
    super().__init__(f"{self.type.__name__}: {self.msg}\n Caused by subscription: {subscription}")

connect

Solana RPC websocket connector.

Source code in solana/rpc/websocket_api.py
class connect(ws_connect):  # pylint: disable=invalid-name,too-few-public-methods
    """Solana RPC websocket connector."""

    def __init__(self, uri: str = "ws://localhost:8900", **kwargs: Any) -> None:
        """Init. Kwargs are passed to `websockets.connect`.

        Args:
            uri: The websocket endpoint.
            **kwargs: Keyword arguments for ``websockets.legacy.client.connect``
        """
        # Ensure that create_protocol explicitly creates a SolanaWsClientProtocol
        kwargs.setdefault("create_protocol", SolanaWsClientProtocol)
        super().__init__(uri, **kwargs)

    async def __aenter__(self) -> SolanaWsClientProtocol:
        """Overrides to specify the type of protocol explicitly."""
        protocol = await super().__aenter__()
        return cast(SolanaWsClientProtocol, protocol)

__init__(self, uri='ws://localhost:8900', **kwargs) special

Init. Kwargs are passed to websockets.connect.

Parameters:

Name Type Description Default
uri str

The websocket endpoint.

'ws://localhost:8900'
**kwargs Any

Keyword arguments for websockets.legacy.client.connect

{}
Source code in solana/rpc/websocket_api.py
def __init__(self, uri: str = "ws://localhost:8900", **kwargs: Any) -> None:
    """Init. Kwargs are passed to `websockets.connect`.

    Args:
        uri: The websocket endpoint.
        **kwargs: Keyword arguments for ``websockets.legacy.client.connect``
    """
    # Ensure that create_protocol explicitly creates a SolanaWsClientProtocol
    kwargs.setdefault("create_protocol", SolanaWsClientProtocol)
    super().__init__(uri, **kwargs)