Frame models#

class atproto.firehose.models.ErrorFrame(header: ErrorFrameHeader, body: ErrorFrameBody)#

Bases: Frame

Firehose error frame.

body: ErrorFrameBody#

Body.

header: ErrorFrameHeader#

Header.

class atproto.firehose.models.ErrorFrameBody(error: str, message: Optional[str] = None)#

Bases: object

Body of error frame.

error: str#

Code of the error.

message: Optional[str] = None#

Description of the error.

class atproto.firehose.models.ErrorFrameHeader(op: FrameType = FrameType.ERROR)#

Bases: object

Header of the error frame.

op: FrameType = -1#

Operation. For Error header is FrameType.ERROR always.

class atproto.firehose.models.Frame(header: Union[MessageFrameHeader, ErrorFrameHeader], body: Union[ErrorFrameBody, dict])#

Bases: object

Firehose base frame.

body: Union[ErrorFrameBody, dict]#

Body

static from_bytes(data: Union[bytes, bytearray]) Union[MessageFrame, ErrorFrame]#

Decode frame from bytes of stream of bytes.

Parameters:

data – Bytes or stream of bytes of frame.

Returns:

atproto.firehose_models.MessageFrame or atproto.firehose_models.ErrorFrame

Raises:

atproto.exceptions.FirehoseError – Invalid data frame.

header: Union[MessageFrameHeader, ErrorFrameHeader]#

Header.

property is_error: bool#

Is frame the ErrorFrame.

Type:

bool

property is_message: bool#

Is frame the MessageFrame.

Type:

bool

property operation: FrameType#

Frame operation (frame type).

Type:

FrameType

atproto.firehose.models.FrameHeader#

Base frame header.

alias of Union[MessageFrameHeader, ErrorFrameHeader]

class atproto.firehose.models.FrameType(value)#

Bases: Enum

Type of frame.

ERROR = -1#
MESSAGE = 1#
classmethod has_value(value: int) bool#
class atproto.firehose.models.MessageFrame(header: MessageFrameHeader, body: dict)#

Bases: Frame

Firehose message frame.

body: dict#

Body.

header: MessageFrameHeader#

Header.

property type: str#

Type of body.

Type:

str

class atproto.firehose.models.MessageFrameHeader(op: FrameType = FrameType.MESSAGE, t: Optional[str] = None)#

Bases: object

Header of the message frame.

op: FrameType = 1#

Operation. For Message header is FrameType.MESSAGE always.

t: Optional[str] = None#

Type of body content.

atproto.firehose.models.parse_frame(header: Union[MessageFrameHeader, ErrorFrameHeader], raw_body: dict) Union[ErrorFrame, MessageFrame]#
atproto.firehose.models.parse_frame_header(raw_header: dict) Union[MessageFrameHeader, ErrorFrameHeader]#