Firehose (data streaming)¶
You can use the clients below to get real-time updates from the whole network. If you subscribe to retrieve messages from repositories you will get information about each created, deleted, liked, reposted post, etc. If you subscribe to labels you will get information about added or updated labels on posts from moderation tools.
All clients present in two variants: sync and async. As a developer, you should create your own callback on a new message to handle incoming data. Here is how to do it for repositories events:
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
client = FirehoseSubscribeReposClient()
def on_message_handler(message) -> None:
print(message.header, parse_subscribe_repos_message(message))
client.start(on_message_handler)
For labeling events:
from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message
client = FirehoseSubscribeLabelsClient()
def on_message_handler(message) -> None:
print(message.header, parse_subscribe_labels_message(message))
client.start(on_message_handler)
More code examples: https://github.com/MarshalX/atproto/tree/main/examples/firehose
Note
To achieve more performance you could parse only required messages using message.header to filter.
By default parse_subscribe_repos_message
doesn’t decode inner DAG-CBOR. Probably you want to decode it. To do so use atproto.CAR
. Example of message handler with decoding of CAR files (commit blocks):
from atproto import CAR, models
def on_message_handler(message) -> None:
commit = parse_subscribe_repos_message(message)
# we need to be sure that it's commit message with .blocks inside
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
if not commit.blocks:
return
car = CAR.from_bytes(commit.blocks)
Here is how you can process labeling events:
from atproto import FirehoseSubscribeLabelsClient, firehose_models, models, parse_subscribe_labels_message
client = FirehoseSubscribeLabelsClient()
def on_message_handler(message: firehose_models.MessageFrame) -> None:
labels_message = parse_subscribe_labels_message(message)
if not isinstance(labels_message, models.ComAtprotoLabelSubscribeLabels.Labels):
return
for label in labels_message.labels:
neg = '(NEG)' if label.neg else ''
print(f'[{label.cts}] ({label.src}) {label.uri} => {label.val} {neg}')
client.start(on_message_handler)
- class atproto_firehose.AsyncFirehoseSubscribeLabelsClient(params: dict | Params | None = None, base_uri: str | None = 'wss://mod.bsky.app/xrpc')¶
Async firehose subscribe labels client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Callable[[BaseException], Coroutine[Any, Any, None]] | None = None) None ¶
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- async stop() None ¶
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: ParamsModelBase | Dict[str, Any]) None ¶
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto_firehose.AsyncFirehoseSubscribeReposClient(params: dict | Params | None = None, base_uri: str | None = 'wss://bsky.network/xrpc')¶
Async firehose subscribe repos client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- async start(on_message_callback: Callable[[MessageFrame], Coroutine[Any, Any, None]], on_callback_error_callback: Callable[[BaseException], Coroutine[Any, Any, None]] | None = None) None ¶
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- async stop() None ¶
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: ParamsModelBase | Dict[str, Any]) None ¶
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto_firehose.FirehoseSubscribeLabelsClient(params: dict | Params | None = None, base_uri: str | None = 'wss://mod.bsky.app/xrpc')¶
Firehose subscribe labels client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Callable[[BaseException], None] | None = None) None ¶
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- stop() None ¶
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: ParamsModelBase | Dict[str, Any]) None ¶
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- class atproto_firehose.FirehoseSubscribeReposClient(params: dict | Params | None = None, base_uri: str | None = 'wss://bsky.network/xrpc')¶
Firehose subscribe repos client.
- Parameters:
params – Parameters model.
base_uri – Base websocket URI. Example: wss://bsky.social/xrpc.
- start(on_message_callback: Callable[[MessageFrame], None], on_callback_error_callback: Callable[[BaseException], None] | None = None) None ¶
Subscribe to Firehose and start client.
- Parameters:
on_message_callback – Callback that will be called on the new Firehose message.
on_callback_error_callback – Callback that will be called if the on_message_callback raised an exception.
- Returns:
None
- stop() None ¶
Unsubscribe and stop the Firehose client.
- Returns:
None
- update_params(params: ParamsModelBase | Dict[str, Any]) None ¶
Update params.
Warning
If you are using params arg at the client start, you must care about keeping params up to date. Otherwise, your client will be rolled back to the previous state (cursor) on reconnecting.
- atproto_firehose.parse_subscribe_labels_message(message: MessageFrame) Labels | Info ¶
Parse Firehose labels message to the corresponding model.
- Parameters:
message – Message frame.
- Returns:
Corresponding message model.
- Return type:
SubscribeLabelsMessage
- atproto_firehose.parse_subscribe_repos_message(message: MessageFrame) Commit | Handle | Migrate | Tombstone | Info | Identity ¶
Parse Firehose repositories message to the corresponding model.
Note
Use decode_inner_cbor only when required to increase performance.
- Parameters:
message – Message frame.
- Returns:
Corresponding message model.
- Return type:
SubscribeReposMessage