Frame models

class atproto_firehose.models.ErrorFrame(header: ErrorFrameHeader, body: ErrorFrameBody)

Bases: Frame

Firehose error frame.

body: ErrorFrameBody

Body.

header: ErrorFrameHeader

Header.

class atproto_firehose.models.ErrorFrameBody(error: str, message: str | None = None)

Bases: object

Body of error frame.

error: str

Code of the error.

message: str | None = None

Description of the error.

class atproto_firehose.models.ErrorFrameHeader(op: FrameType = FrameType.ERROR)

Bases: object

Header of the error frame.

op: FrameType = -1

Operation. For Error header is FrameType.ERROR always.

class atproto_firehose.models.Frame(header: MessageFrameHeader | ErrorFrameHeader, body: ErrorFrameBody | dict)

Bases: object

Firehose base frame.

body: ErrorFrameBody | dict

Body

static from_bytes(data: bytes | bytearray) MessageFrame | ErrorFrame

Decode frame from bytes of stream of bytes.

Parameters:

data – Bytes or stream of bytes of frame.

Returns:

atproto.firehose_models.MessageFrame or atproto.firehose_models.ErrorFrame

Raises:

atproto.exceptions.FirehoseError – Invalid data frame.

header: MessageFrameHeader | ErrorFrameHeader

Header.

property is_error: bool

Is frame the ErrorFrame.

Type:

bool

property is_message: bool

Is frame the MessageFrame.

Type:

bool

property operation: FrameType

Frame operation (frame type).

Type:

FrameType

atproto_firehose.models.FrameHeader

Base frame header.

alias of Union[MessageFrameHeader, ErrorFrameHeader]

class atproto_firehose.models.FrameType(value)

Bases: Enum

Type of frame.

ERROR = -1
MESSAGE = 1
classmethod has_value(value: int) bool
class atproto_firehose.models.MessageFrame(header: MessageFrameHeader, body: dict)

Bases: Frame

Firehose message frame.

body: dict

Body.

header: MessageFrameHeader

Header.

property type: str

Type of body.

Type:

str

class atproto_firehose.models.MessageFrameHeader(op: FrameType = FrameType.MESSAGE, t: str | None = None)

Bases: object

Header of the message frame.

op: FrameType = 1

Operation. For Message header is FrameType.MESSAGE always.

t: str | None = None

Type of body content.

atproto_firehose.models.parse_frame(header: MessageFrameHeader | ErrorFrameHeader, raw_body: dict) ErrorFrame | MessageFrame
atproto_firehose.models.parse_frame_header(raw_header: dict) MessageFrameHeader | ErrorFrameHeader