Manage the marshaling and unmarshaling of AMQP frames
unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects
from the specification file.
marshal will take an object created from the specification file and turn it
into a raw byte stream.
marshal(frame_value, channel_id)
Marshal a frame to be sent over the wire.
Source code in pamqp/frame.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 | def marshal(frame_value: FrameTypes, channel_id: int) -> bytes:
"""Marshal a frame to be sent over the wire.
:raises: ValueError
"""
if isinstance(frame_value, header.ProtocolHeader):
return frame_value.marshal()
elif isinstance(frame_value, base.Frame):
return _marshal_method_frame(frame_value, channel_id)
elif isinstance(frame_value, header.ContentHeader):
return _marshal_content_header_frame(frame_value, channel_id)
elif isinstance(frame_value, body.ContentBody):
return _marshal_content_body_frame(frame_value, channel_id)
elif isinstance(frame_value, heartbeat.Heartbeat):
return frame_value.marshal()
raise ValueError(f'Could not determine frame type: {frame_value}')
|
unmarshal(data_in)
Takes in binary data and maps builds the appropriate frame type,
returning a frame object.
Returns:
| Type |
Description |
tuple[int, int, FrameTypes]
|
tuple of bytes consumed, channel, and a frame object
|
Source code in pamqp/frame.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | def unmarshal(data_in: bytes) -> tuple[int, int, FrameTypes]:
"""Takes in binary data and maps builds the appropriate frame type,
returning a frame object.
:returns: tuple of bytes consumed, channel, and a frame object
:raises: exceptions.UnmarshalingException
"""
try: # Look to see if it's a protocol header frame
value = _unmarshal_protocol_header_frame(data_in)
except ValueError as error:
raise exceptions.UnmarshalingException(
header.ProtocolHeader, error
) from error
else:
if value:
return 8, 0, value
frame_type, channel_id, frame_size = frame_parts(data_in)
# Heartbeats do not have frame length indicators
if frame_type == constants.FRAME_HEARTBEAT and frame_size == 0:
return 8, channel_id, heartbeat.Heartbeat()
if not frame_size:
raise exceptions.UnmarshalingException('Unknown', 'No frame size')
byte_count = constants.FRAME_HEADER_SIZE + frame_size + 1
if byte_count > len(data_in):
raise exceptions.UnmarshalingException(
'Unknown', 'Not all data received'
)
if data_in[byte_count - 1] != constants.FRAME_END:
raise exceptions.UnmarshalingException('Unknown', 'Last byte error')
frame_data = data_in[constants.FRAME_HEADER_SIZE : byte_count - 1]
if frame_type == constants.FRAME_METHOD:
return byte_count, channel_id, _unmarshal_method_frame(frame_data)
elif frame_type == constants.FRAME_HEADER:
return byte_count, channel_id, _unmarshal_header_frame(frame_data)
elif frame_type == constants.FRAME_BODY:
return byte_count, channel_id, _unmarshal_body_frame(frame_data)
raise exceptions.UnmarshalingException(
'Unknown', f'Unknown frame type: {frame_type}'
)
|
frame_parts(data)
Attempt to decode a low-level frame, returning frame parts
Source code in pamqp/frame.py
| def frame_parts(data: bytes) -> tuple[int, int, int | None]:
"""Attempt to decode a low-level frame, returning frame parts"""
try: # Get the Frame Type, Channel Number and Frame Size
return struct.unpack('>BHI', data[0 : constants.FRAME_HEADER_SIZE])
except struct.error: # Did not receive a full frame
return UNMARSHAL_FAILURE
|