Firehose (data streaming)#
You can use the clients below to get real-time updates from the whole network. If you subscribe to retrieve messages from repositories you will get information about each created, deleted, liked, reposted post, etc.
All clients present in two variants: sync and async. As a developer, you should create your own callback on a new message to handle incoming data. Here is how to do it:
from atproto.firehose import FirehoseSubscribeReposClient, parse_subscribe_repos_message
client = FirehoseSubscribeReposClient()
def on_message_handler(message) -> None:
print(message.header, parse_subscribe_repos_message(message))
client.start(on_message_handler)
More code examples: https://github.com/MarshalX/atproto/tree/main/examples/firehose
Note
To achieve more performance you could parse only required messages using message.header to filter.
By default parse_subscribe_repos_message
and parse_subscribe_labels_message
doesn’t decode inner DAG-CBOR. Probably you want to decode it. To do so use atproto.CAR
. Example of message handler with decoding of CAR files (commit blocks):
from atproto import CAR, models
def on_message_handler(message) -> None:
commit = parse_subscribe_repos_message(message)
# we need to be sure that it's commit message with .blocks inside
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
car = CAR.from_bytes(commit.blocks)
- class atproto.firehose.AsyncFirehoseSubscribeLabelsClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#
Async firehose subscribe labels client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Optional[Callable[[BaseException], Coroutine[Any, Any, None]]] = None) None #
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- async stop() None #
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None #
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto.firehose.AsyncFirehoseSubscribeReposClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#
Async firehose subscribe repos client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Optional[Callable[[BaseException], Coroutine[Any, Any, None]]] = None) None #
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- async stop() None #
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None #
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto.firehose.FirehoseSubscribeLabelsClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#
Firehose subscribe labels client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Optional[Callable[[BaseException], None]] = None) None #
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- stop() None #
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None #
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto.firehose.FirehoseSubscribeReposClient(params: Optional[Union[dict, Params]] = None, base_uri: Optional[str] = None)#
Firehose subscribe repos client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Optional[Callable[[BaseException], None]] = None) None #
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- stop() None #
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: Union[ParamsModelBase, Dict[str, Any]]) None #
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- atproto.firehose.SubscribeReposMessage#
Subscribe Repos Message
- atproto.firehose.parse_subscribe_labels_message(message: MessageFrame) Union[Labels, Info] #
Parse Firehose labels message to the corresponding model.
- Parameters:
message – Message frame.
- Returns:
Corresponding message model.
- Return type:
- atproto.firehose.parse_subscribe_repos_message(message: MessageFrame) Union[Commit, Handle, Migrate, Tombstone, Info] #
Parse Firehose repositories message to the corresponding model.
Note
Use decode_inner_cbor only when required to increase performance.
- Parameters:
message – Message frame.
- Returns:
Corresponding message model.
- Return type: