Firehose (data streaming)

You can use the clients below to get real-time updates from the whole network. If you subscribe to retrieve messages from repositories you will get information about each created, deleted, liked, reposted post, etc. If you subscribe to labels you will get information about added or updated labels on posts from moderation tools.

All clients present in two variants: sync and async. As a developer, you should create your own callback on a new message to handle incoming data. Here is how to do it for repositories events:

from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message

client = FirehoseSubscribeReposClient()


def on_message_handler(message) -> None:
    print(message.header, parse_subscribe_repos_message(message))


client.start(on_message_handler)

For labeling events:

from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message

client = FirehoseSubscribeLabelsClient()


def on_message_handler(message) -> None:
    print(message.header, parse_subscribe_labels_message(message))


client.start(on_message_handler)

More code examples: https://github.com/MarshalX/atproto/tree/main/examples/firehose

Note

To achieve more performance you could parse only required messages using message.header to filter.

By default parse_subscribe_repos_message doesn’t decode inner DAG-CBOR. Probably you want to decode it. To do so use atproto.CAR. Example of message handler with decoding of CAR files (commit blocks):

from atproto import CAR, models

def on_message_handler(message) -> None:
    commit = parse_subscribe_repos_message(message)
    # we need to be sure that it's commit message with .blocks inside
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return

    if not commit.blocks:
        return

    car = CAR.from_bytes(commit.blocks)

Here is how you can process labeling events:

from atproto import FirehoseSubscribeLabelsClient, firehose_models, models, parse_subscribe_labels_message

client = FirehoseSubscribeLabelsClient()


def on_message_handler(message: firehose_models.MessageFrame) -> None:
    labels_message = parse_subscribe_labels_message(message)
    if not isinstance(labels_message, models.ComAtprotoLabelSubscribeLabels.Labels):
        return

    for label in labels_message.labels:
        neg = '(NEG)' if label.neg else ''
        print(f'[{label.cts}] ({label.src}) {label.uri} => {label.val} {neg}')


client.start(on_message_handler)
class atproto_firehose.AsyncFirehoseSubscribeLabelsClient(params: dict | Params | None = None, base_uri: str | None = 'wss://mod.bsky.app/xrpc')

Async firehose subscribe labels client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Callable[[BaseException], Coroutine[Any, Any, None]] | None = None) None

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

async stop() None

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: ParamsModelBase | Dict[str, Any]) None

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto_firehose.AsyncFirehoseSubscribeReposClient(params: dict | Params | None = None, base_uri: str | None = 'wss://bsky.network/xrpc')

Async firehose subscribe repos client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Callable[[BaseException], Coroutine[Any, Any, None]] | None = None) None

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

async stop() None

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: ParamsModelBase | Dict[str, Any]) None

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto_firehose.FirehoseSubscribeLabelsClient(params: dict | Params | None = None, base_uri: str | None = 'wss://mod.bsky.app/xrpc')

Firehose subscribe labels client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Callable[[BaseException], None] | None = None) None

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

stop() None

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: ParamsModelBase | Dict[str, Any]) None

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

class atproto_firehose.FirehoseSubscribeReposClient(params: dict | Params | None = None, base_uri: str | None = 'wss://bsky.network/xrpc')

Firehose subscribe repos client.

Parameters:
  • params – Parameters model.

  • base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.

start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Callable[[BaseException], None] | None = None) None

Subscribe to Firehose and start client.

Parameters:
  • on_message_callback – Callback that will be called on the new Firehose message.

  • on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.

Returns:

None

stop() None

Unsubscribe and stop the Firehose client.

Returns:

None

update_params(params: ParamsModelBase | Dict[str, Any]) None

Update params.

Warning

If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.

atproto_firehose.parse_subscribe_labels_message(message: MessageFrame) Labels | Info

Parse Firehose labels message to the corresponding model.

Parameters:

message – Message frame.

Returns:

Corresponding message model.

Return type:

SubscribeLabelsMessage

atproto_firehose.parse_subscribe_repos_message(message: MessageFrame) Commit | Handle | Migrate | Tombstone | Info | Identity

Parse Firehose repositories message to the corresponding model.

Note

Use decode_inner_cbor only when required to increase performance.

Parameters:

message – Message frame.

Returns:

Corresponding message model.

Return type:

SubscribeReposMessage

Submodules