Skip to content

pamqp.header

pamqp.header

AMQP Header Class Definitions

For encoding AMQP Header frames into binary AMQP stream data and decoding AMQP binary data into AMQP Header frames.

ProtocolHeader

Class that represents the AMQP Protocol Header

Source code in pamqp/header.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ProtocolHeader:
    """Class that represents the AMQP Protocol Header"""

    name: typing.ClassVar[str] = 'ProtocolHeader'

    def __init__(
        self,
        major_version: int = constants.VERSION[0],
        minor_version: int = constants.VERSION[1],
        revision: int = constants.VERSION[2],
    ) -> None:
        """Construct a Protocol Header frame object for the specified AMQP
        version.

        :param major_version: The AMQP major version (``0``)
        :param minor_version: The AMQP major version (``9``)
        :param revision: The AMQP major version (``1``)

        """
        self.major_version = major_version
        self.minor_version = minor_version
        self.revision = revision

    def marshal(self) -> bytes:
        """Return the full AMQP wire protocol frame data representation of the
        ProtocolHeader frame.

        """
        return constants.AMQP + struct.pack(
            'BBBB', 0, self.major_version, self.minor_version, self.revision
        )

    def unmarshal(self, data: bytes) -> int:
        """Dynamically decode the frame data applying the values to the method
        object by iterating through the attributes in order and decoding them.

        :param data: The frame value to unpack
        :raises: ValueError

        """
        try:
            (self.major_version, self.minor_version, self.revision) = (
                struct.unpack('BBB', data[5:8])
            )
        except struct.error as err:
            raise ValueError(
                f'Could not unpack protocol header from {data!r}'
            ) from err
        return 8

__init__(major_version=constants.VERSION[0], minor_version=constants.VERSION[1], revision=constants.VERSION[2])

Construct a Protocol Header frame object for the specified AMQP version.

Parameters:

Name Type Description Default
major_version int

The AMQP major version (0)

VERSION[0]
minor_version int

The AMQP major version (9)

VERSION[1]
revision int

The AMQP major version (1)

VERSION[2]
Source code in pamqp/header.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    major_version: int = constants.VERSION[0],
    minor_version: int = constants.VERSION[1],
    revision: int = constants.VERSION[2],
) -> None:
    """Construct a Protocol Header frame object for the specified AMQP
    version.

    :param major_version: The AMQP major version (``0``)
    :param minor_version: The AMQP major version (``9``)
    :param revision: The AMQP major version (``1``)

    """
    self.major_version = major_version
    self.minor_version = minor_version
    self.revision = revision

marshal()

Return the full AMQP wire protocol frame data representation of the ProtocolHeader frame.

Source code in pamqp/header.py
40
41
42
43
44
45
46
47
def marshal(self) -> bytes:
    """Return the full AMQP wire protocol frame data representation of the
    ProtocolHeader frame.

    """
    return constants.AMQP + struct.pack(
        'BBBB', 0, self.major_version, self.minor_version, self.revision
    )

unmarshal(data)

Dynamically decode the frame data applying the values to the method object by iterating through the attributes in order and decoding them.

Parameters:

Name Type Description Default
data bytes

The frame value to unpack

required
Source code in pamqp/header.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def unmarshal(self, data: bytes) -> int:
    """Dynamically decode the frame data applying the values to the method
    object by iterating through the attributes in order and decoding them.

    :param data: The frame value to unpack
    :raises: ValueError

    """
    try:
        (self.major_version, self.minor_version, self.revision) = (
            struct.unpack('BBB', data[5:8])
        )
    except struct.error as err:
        raise ValueError(
            f'Could not unpack protocol header from {data!r}'
        ) from err
    return 8

ContentHeader

Represent a content header frame

A Content Header frame is received after a Basic.Deliver or Basic.GetOk frame and has the data and properties for the Content Body frames that follow.

Source code in pamqp/header.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class ContentHeader:
    """Represent a content header frame

    A Content Header frame is received after a Basic.Deliver or Basic.GetOk
    frame and has the data and properties for the Content Body frames that
    follow.

    """

    name: typing.ClassVar[str] = 'ContentHeader'

    def __init__(
        self,
        weight: int = 0,
        body_size: int = 0,
        properties: commands.Basic.Properties | None = None,
    ) -> None:
        """Initialize the Exchange.DeleteOk class

        Weight is unused and must be `0`

        :param weight: The unused content weight field
        :param body_size: The size in bytes of the message body
        :param properties: The message properties

        """
        self.class_id = None
        self.weight = weight
        self.body_size = body_size
        self.properties = properties or commands.Basic.Properties()

    def marshal(self) -> bytes:
        """Return the AMQP binary encoded value of the frame"""
        return (
            struct.pack('>HxxQ', commands.Basic.frame_id, self.body_size)
            + self.properties.marshal()
        )

    def unmarshal(self, data: bytes) -> None:
        """Dynamically decode the frame data applying the values to the method
        object by iterating through the attributes in order and decoding them.

        :param data: The raw frame data to unmarshal

        """
        self.class_id, self.weight, self.body_size = struct.unpack(
            '>HHQ', data[0:12]
        )
        offset, flags = self._get_flags(data[12:])
        self.properties.unmarshal(flags, data[12 + offset :])

    @staticmethod
    def _get_flags(data: bytes) -> tuple[int, int]:
        """Decode the flags from the data returning the bytes consumed and
        flags.

        """
        bytes_consumed, flags, flagword_index = 0, 0, 0
        while True:
            consumed, partial_flags = decode.short_int(data)
            bytes_consumed += consumed
            flags |= partial_flags << (flagword_index * 16)
            if not partial_flags & 1:  # pragma: nocover
                break
            flagword_index += 1  # pragma: nocover
        return bytes_consumed, flags

__init__(weight=0, body_size=0, properties=None)

Initialize the Exchange.DeleteOk class

Weight is unused and must be 0

Parameters:

Name Type Description Default
weight int

The unused content weight field

0
body_size int

The size in bytes of the message body

0
properties Properties | None

The message properties

None
Source code in pamqp/header.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    weight: int = 0,
    body_size: int = 0,
    properties: commands.Basic.Properties | None = None,
) -> None:
    """Initialize the Exchange.DeleteOk class

    Weight is unused and must be `0`

    :param weight: The unused content weight field
    :param body_size: The size in bytes of the message body
    :param properties: The message properties

    """
    self.class_id = None
    self.weight = weight
    self.body_size = body_size
    self.properties = properties or commands.Basic.Properties()

marshal()

Return the AMQP binary encoded value of the frame

Source code in pamqp/header.py
 99
100
101
102
103
104
def marshal(self) -> bytes:
    """Return the AMQP binary encoded value of the frame"""
    return (
        struct.pack('>HxxQ', commands.Basic.frame_id, self.body_size)
        + self.properties.marshal()
    )

unmarshal(data)

Dynamically decode the frame data applying the values to the method object by iterating through the attributes in order and decoding them.

Parameters:

Name Type Description Default
data bytes

The raw frame data to unmarshal

required
Source code in pamqp/header.py
106
107
108
109
110
111
112
113
114
115
116
117
def unmarshal(self, data: bytes) -> None:
    """Dynamically decode the frame data applying the values to the method
    object by iterating through the attributes in order and decoding them.

    :param data: The raw frame data to unmarshal

    """
    self.class_id, self.weight, self.body_size = struct.unpack(
        '>HHQ', data[0:12]
    )
    offset, flags = self._get_flags(data[12:])
    self.properties.unmarshal(flags, data[12 + offset :])