Skip to content

pamqp.frame

pamqp.frame

Manage the marshaling and unmarshaling of AMQP frames

unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects from the specification file.

marshal will take an object created from the specification file and turn it into a raw byte stream.

marshal(frame_value, channel_id)

Marshal a frame to be sent over the wire.

Source code in pamqp/frame.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def marshal(frame_value: FrameTypes, channel_id: int) -> bytes:
    """Marshal a frame to be sent over the wire.

    :raises: ValueError

    """
    if isinstance(frame_value, header.ProtocolHeader):
        return frame_value.marshal()
    elif isinstance(frame_value, base.Frame):
        return _marshal_method_frame(frame_value, channel_id)
    elif isinstance(frame_value, header.ContentHeader):
        return _marshal_content_header_frame(frame_value, channel_id)
    elif isinstance(frame_value, body.ContentBody):
        return _marshal_content_body_frame(frame_value, channel_id)
    elif isinstance(frame_value, heartbeat.Heartbeat):
        return frame_value.marshal()
    raise ValueError(f'Could not determine frame type: {frame_value}')

unmarshal(data_in)

Takes in binary data and maps builds the appropriate frame type, returning a frame object.

Returns:

Type Description
tuple[int, int, FrameTypes]

tuple of bytes consumed, channel, and a frame object

Source code in pamqp/frame.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def unmarshal(data_in: bytes) -> tuple[int, int, FrameTypes]:
    """Takes in binary data and maps builds the appropriate frame type,
    returning a frame object.

    :returns: tuple of  bytes consumed, channel, and a frame object
    :raises: exceptions.UnmarshalingException

    """
    try:  # Look to see if it's a protocol header frame
        value = _unmarshal_protocol_header_frame(data_in)
    except ValueError as error:
        raise exceptions.UnmarshalingException(
            header.ProtocolHeader, error
        ) from error
    else:
        if value:
            return 8, 0, value

    frame_type, channel_id, frame_size = frame_parts(data_in)

    # Heartbeats do not have frame length indicators
    if frame_type == constants.FRAME_HEARTBEAT and frame_size == 0:
        return 8, channel_id, heartbeat.Heartbeat()

    if not frame_size:
        raise exceptions.UnmarshalingException('Unknown', 'No frame size')

    byte_count = constants.FRAME_HEADER_SIZE + frame_size + 1
    if byte_count > len(data_in):
        raise exceptions.UnmarshalingException(
            'Unknown', 'Not all data received'
        )

    if data_in[byte_count - 1] != constants.FRAME_END:
        raise exceptions.UnmarshalingException('Unknown', 'Last byte error')
    frame_data = data_in[constants.FRAME_HEADER_SIZE : byte_count - 1]
    if frame_type == constants.FRAME_METHOD:
        return byte_count, channel_id, _unmarshal_method_frame(frame_data)
    elif frame_type == constants.FRAME_HEADER:
        return byte_count, channel_id, _unmarshal_header_frame(frame_data)
    elif frame_type == constants.FRAME_BODY:
        return byte_count, channel_id, _unmarshal_body_frame(frame_data)
    raise exceptions.UnmarshalingException(
        'Unknown', f'Unknown frame type: {frame_type}'
    )

frame_parts(data)

Attempt to decode a low-level frame, returning frame parts

Source code in pamqp/frame.py
104
105
106
107
108
109
def frame_parts(data: bytes) -> tuple[int, int, int | None]:
    """Attempt to decode a low-level frame, returning frame parts"""
    try:  # Get the Frame Type, Channel Number and Frame Size
        return struct.unpack('>BHI', data[0 : constants.FRAME_HEADER_SIZE])
    except struct.error:  # Did not receive a full frame
        return UNMARSHAL_FAILURE