Firehose (data streaming)#

You can use the clients below to get real-time updates from the whole network. If you subscribe to retrieve messages from repositories you will get information about each created, deleted, liked, reposted post, etc.

All clients present in two variants: sync and async. As a developer, you should create your own callback on a new message to handle incoming data. Here is how to do it:

from atproto.firehose import FirehoseSubscribeReposClient, parse_subscribe_repos_message

client = FirehoseSubscribeReposClient()


def on_message_handler(message) -> None:
    print(message.header, parse_subscribe_repos_message(message))


client.start(on_message_handler)

More code examples: https://github.com/MarshalX/atproto/tree/main/examples/firehose

Note

To achieve more performance you could parse only required messages using message.header to filter.

By default parse_subscribe_repos_message and parse_subscribe_labels_message doesn’t decode inner DAG-CBOR. Probably you want to decode it. To do so use atproto.CAR. Example of message handler with decoding of CAR files (commit blocks):

from atproto import CAR, models

def on_message_handler(message) -> None:
    commit = parse_subscribe_repos_message(message)
    # we need to be sure that it's commit message with .blocks inside
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return

    car = CAR.from_bytes(commit.blocks)
class atproto.firehose.AsyncFirehoseSubscribeLabelsClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#

Async firehose subscribe labels client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Optional[Callable[[BaseException], Coroutine[Any, Any, None]]] = None) None#

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

async stop() None#

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None#

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto.firehose.AsyncFirehoseSubscribeReposClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#

Async firehose subscribe repos client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Optional[Callable[[BaseException], Coroutine[Any, Any, None]]] = None) None#

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

async stop() None#

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None#

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto.firehose.FirehoseSubscribeLabelsClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#

Firehose subscribe labels client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Optional[Callable[[BaseException], None]] = None) None#

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

stop() None#

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None#

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto.firehose.FirehoseSubscribeReposClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#

Firehose subscribe repos client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Optional[Callable[[BaseException], None]] = None) None#

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

stop() None#

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None#

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

atproto.firehose.SubscribeLabelsMessage#

Subscribe Labels Message

alias of Union[Labels, Info]

atproto.firehose.SubscribeReposMessage#

Subscribe Repos Message

alias of Union[Commit, Handle, Migrate, Tombstone, Info]

atproto.firehose.parse_subscribe_labels_message(message: MessageFrame) Union[Labels, Info]#

Parse Firehose labels message to the corresponding model.

Parameters:

message – Message frame.

Returns:

Corresponding message model.

Return type:

SubscribeLabelsMessage

atproto.firehose.parse_subscribe_repos_message(message: MessageFrame) Union[Commit, Handle, Migrate, Tombstone, Info]#

Parse Firehose repositories message to the corresponding model.

Note

Use decode_inner_cbor only when required to increase performance.

Parameters:

message – Message frame.

Returns:

Corresponding message model.

Return type:

SubscribeReposMessage

Submodules#