Skip to content

API Reference

Consumer

The primary base classes for building message consumers.

Consumer

rejected.consumer.Consumer

Bases: _Consumer

Backward-compatible consumer for rejected 3.x.

Processes one message at a time (locked). Message properties are accessible via self.body, self.content_type, etc. Override prepare(), process(), finish().

Source code in rejected/consumer.py
class Consumer(_Consumer):
    """Backward-compatible consumer for rejected 3.x.

    Processes one message at a time (locked). Message properties are
    accessible via ``self.body``, ``self.content_type``, etc.
    Override ``prepare()``, ``process()``, ``finish()``.

    """

    def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
        super().__init__(*args, **kwargs)
        self._context: models.ProcessingContext | None = None
        self._message_body: typing.Any = _UNSET
        self._lock: asyncio.Lock | None = None

    async def prepare(self) -> None:
        """Called before process. Override to add pre-processing."""
        pass

    async def process(self) -> None:
        """Implement your consumer logic here."""
        raise NotImplementedError

    async def finish(self) -> None:
        """Called after process. Override to add post-processing."""
        pass

    async def on_finish(self) -> None:
        """Called after processing completes."""
        pass

    async def _run_consumer(
        self, ctx: models.ProcessingContext
    ) -> models.Result:
        if self._lock is None:
            self._lock = asyncio.Lock()
        async with self._lock:
            self._context = ctx
            self._message_body = _UNSET
            try:
                return await self._handle_execution(
                    ctx, self._process_standard
                )
            finally:
                await self.on_finish()
                self._context = None
                self._message_body = _UNSET

    async def _process_standard(self) -> None:
        await self.prepare()
        await self.process()

    # --- Quick-access properties (read from self._context) ---

    @property
    def app_id(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.app_id

    @property
    def body(self) -> typing.Any:
        if not self._context:
            return None
        if self._message_body is not _UNSET:
            return self._message_body
        msg = self._context.message
        self._message_body = msg.body
        return self._message_body

    @property
    def content_encoding(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.content_encoding

    @property
    def content_type(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.content_type

    @property
    def exchange(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.exchange

    @property
    def expiration(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.expiration

    @property
    def headers(self) -> dict[str, typing.Any] | None:
        if not self._context:
            return None
        return self._context.message.headers or {}

    @property
    def message_id(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.message_id

    @property
    def message_type(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.type

    @property
    def priority(self) -> int | None:
        if not self._context:
            return None
        return self._context.message.priority

    @property
    def properties(self) -> dict[str, typing.Any] | None:
        if not self._context:
            return None
        return self._context.message.model_dump()

    @property
    def redelivered(self) -> bool | None:
        if not self._context:
            return None
        return self._context.message.redelivered

    @property
    def reply_to(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.reply_to

    @property
    def returned(self) -> bool | None:
        if not self._context:
            return None
        return self._context.message.returned

    @property
    def routing_key(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.routing_key

    @property
    def timestamp(self) -> datetime.datetime | None:
        if not self._context:
            return None
        return self._context.message.timestamp

    @property
    def user_id(self) -> str | None:
        if not self._context:
            return None
        return self._context.message.user_id

    def _publish_channel(self, name: str | None = None) -> channel.Channel:
        if name:
            return super()._publish_channel(name)
        if self._context:
            return typing.cast(channel.Channel, self._context.channel)
        raise ValueError('No channel available for publishing')

    # --- Stats helpers that auto-use self._context ---

    def stats_add_duration(
        self,
        key: str,
        duration: float,
        ctx: models.ProcessingContext | None = None,
    ) -> None:
        super().stats_add_duration(key, duration, ctx or self._context)

    def stats_incr(
        self,
        key: str,
        value: int = 1,
        ctx: models.ProcessingContext | None = None,
    ) -> None:
        super().stats_incr(key, value, ctx or self._context)

    def stats_set_tag(
        self,
        key: str,
        value: str | bool | int = 1,
        ctx: models.ProcessingContext | None = None,
    ) -> None:
        super().stats_set_tag(key, value, ctx or self._context)

    def stats_set_value(
        self,
        key: str,
        value: int | float = 1,
        ctx: models.ProcessingContext | None = None,
    ) -> None:
        super().stats_set_value(key, value, ctx or self._context)

    @contextlib.contextmanager
    def stats_track_duration(
        self, key: str, ctx: models.ProcessingContext | None = None
    ) -> typing.Generator[None, None, None]:
        start = time.monotonic()
        try:
            yield
        finally:
            self.stats_add_duration(
                key, time.monotonic() - start, ctx or self._context
            )

Attributes

body property

app_id property

content_encoding property

content_type property

exchange property

expiration property

headers property

message_id property

message_type property

priority property

properties property

redelivered property

reply_to property

returned property

routing_key property

timestamp property

user_id property

Functions

process() async

Implement your consumer logic here.

Source code in rejected/consumer.py
async def process(self) -> None:
    """Implement your consumer logic here."""
    raise NotImplementedError

prepare() async

Called before process. Override to add pre-processing.

Source code in rejected/consumer.py
async def prepare(self) -> None:
    """Called before process. Override to add pre-processing."""
    pass

finish() async

Called after process. Override to add post-processing.

Source code in rejected/consumer.py
async def finish(self) -> None:
    """Called after process. Override to add post-processing."""
    pass

on_finish() async

Called after processing completes.

Source code in rejected/consumer.py
async def on_finish(self) -> None:
    """Called after processing completes."""
    pass

stats_add_duration(key, duration, ctx=None)

Source code in rejected/consumer.py
def stats_add_duration(
    self,
    key: str,
    duration: float,
    ctx: models.ProcessingContext | None = None,
) -> None:
    super().stats_add_duration(key, duration, ctx or self._context)

stats_incr(key, value=1, ctx=None)

Source code in rejected/consumer.py
def stats_incr(
    self,
    key: str,
    value: int = 1,
    ctx: models.ProcessingContext | None = None,
) -> None:
    super().stats_incr(key, value, ctx or self._context)

stats_set_tag(key, value=1, ctx=None)

Source code in rejected/consumer.py
def stats_set_tag(
    self,
    key: str,
    value: str | bool | int = 1,
    ctx: models.ProcessingContext | None = None,
) -> None:
    super().stats_set_tag(key, value, ctx or self._context)

stats_set_value(key, value=1, ctx=None)

Source code in rejected/consumer.py
def stats_set_value(
    self,
    key: str,
    value: int | float = 1,
    ctx: models.ProcessingContext | None = None,
) -> None:
    super().stats_set_value(key, value, ctx or self._context)

stats_track_duration(key, ctx=None)

Source code in rejected/consumer.py
@contextlib.contextmanager
def stats_track_duration(
    self, key: str, ctx: models.ProcessingContext | None = None
) -> typing.Generator[None, None, None]:
    start = time.monotonic()
    try:
        yield
    finally:
        self.stats_add_duration(
            key, time.monotonic() - start, ctx or self._context
        )

FunctionalConsumer

rejected.consumer.FunctionalConsumer

Bases: _Consumer

Concurrent consumer that receives a ProcessingContext.

No lock — multiple messages may be processed concurrently. Override prepare(ctx), process(ctx), finish(ctx).

Source code in rejected/consumer.py
class FunctionalConsumer(_Consumer):
    """Concurrent consumer that receives a ProcessingContext.

    No lock — multiple messages may be processed concurrently.
    Override ``prepare(ctx)``, ``process(ctx)``, ``finish(ctx)``.

    """

    async def prepare(self, ctx: models.ProcessingContext) -> None:
        """Called before process. Override to add pre-processing."""
        pass

    async def process(self, ctx: models.ProcessingContext) -> None:
        """Implement your consumer logic here."""
        raise NotImplementedError

    async def finish(self, ctx: models.ProcessingContext) -> None:
        """Called after process. Override to add post-processing."""
        pass

    async def _run_consumer(
        self, ctx: models.ProcessingContext
    ) -> models.Result:
        return await self._handle_execution(
            ctx, lambda: self._process_transactional(ctx)
        )

    async def _process_transactional(
        self, ctx: models.ProcessingContext
    ) -> None:
        await self.prepare(ctx)
        await self.process(ctx)
        await self.finish(ctx)

Functions

process(ctx) async

Implement your consumer logic here.

Source code in rejected/consumer.py
async def process(self, ctx: models.ProcessingContext) -> None:
    """Implement your consumer logic here."""
    raise NotImplementedError

prepare(ctx) async

Called before process. Override to add pre-processing.

Source code in rejected/consumer.py
async def prepare(self, ctx: models.ProcessingContext) -> None:
    """Called before process. Override to add pre-processing."""
    pass

finish(ctx) async

Called after process. Override to add post-processing.

Source code in rejected/consumer.py
async def finish(self, ctx: models.ProcessingContext) -> None:
    """Called after process. Override to add post-processing."""
    pass

Exceptions

rejected.exceptions.ConsumerException

Bases: RejectedException

May be called when processing a message to indicate a problem that the Consumer may be experiencing that should cause it to stop.

:param str value: An optional value used in string representation :param str metric: An optional value for auto-instrumentation

Source code in rejected/exceptions.py
class ConsumerException(RejectedException):
    """May be called when processing a message to indicate a problem
    that the Consumer may be experiencing that should cause it to stop.

    :param str value: An optional value used in string representation
    :param str metric: An optional value for auto-instrumentation

    """

rejected.exceptions.MessageException

Bases: RejectedException

Invoke when a message should be rejected and not re-queued, but not due to a processing error that should cause the consumer to stop.

:param str value: An optional value used in string representation :param str metric: An optional value for auto-instrumentation

Source code in rejected/exceptions.py
class MessageException(RejectedException):
    """Invoke when a message should be rejected and not re-queued, but
    not due to a processing error that should cause the consumer to
    stop.

    :param str value: An optional value used in string representation
    :param str metric: An optional value for auto-instrumentation

    """

rejected.exceptions.ProcessingException

Bases: RejectedException

Invoke when a message should be rejected and not re-queued, but not due to a processing error that should cause the consumer to stop. This should be used for when you want to reject a message which will be republished to a retry queue, without anything being stated about the exception.

:param str value: An optional value used in string representation :param str metric: An optional value for auto-instrumentation

Source code in rejected/exceptions.py
class ProcessingException(RejectedException):
    """Invoke when a message should be rejected and not re-queued, but
    not due to a processing error that should cause the consumer to
    stop. This should be used for when you want to reject a message
    which will be republished to a retry queue, without anything being
    stated about the exception.

    :param str value: An optional value used in string representation
    :param str metric: An optional value for auto-instrumentation

    """

rejected.exceptions.RejectedException

Bases: Exception

Base exception for :py:class:~rejected.consumer.Consumer related exceptions.

If provided, the metric will be used to automatically record exception metric counts using the path [prefix].[consumer-name].exceptions.[exception-type].[metric].

Positional and keyword arguments are used to format the value that is passed in when providing the string value of the exception.

:param str value: An optional value used in string representation :param str metric: An optional value for auto-instrumentation

.. versionadded:: 3.19.0

Source code in rejected/exceptions.py
class RejectedException(Exception):
    """Base exception for :py:class:`~rejected.consumer.Consumer` related
    exceptions.

    If provided, the metric will be used to automatically record exception
    metric counts using the path
    `[prefix].[consumer-name].exceptions.[exception-type].[metric]`.

    Positional and keyword arguments are used to format the value that is
    passed in when providing the string value of the exception.

    :param str value: An optional value used in string representation
    :param str metric: An optional value for auto-instrumentation

    .. versionadded:: 3.19.0

    """

    METRIC_NAME: typing.ClassVar[str] = 'rejected-exception'

    def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
        if len(args) > 1:
            self.args = args[1:] if 'value' not in kwargs else args
        else:
            self.args = args
        self.metric: str | None = kwargs.pop('metric', None)
        raw_value = kwargs.pop('value', '{!r} {!r}' if not args else args[0])
        self.value: str = str(raw_value)
        self.kwargs = kwargs

    def __str__(self) -> str:
        if not self.args and not self.kwargs:
            return repr(self)
        return self.value.format(*self.args, **self.kwargs)

    def __repr__(self) -> str:
        if not self.args and not self.kwargs:
            return f'{self.__class__.__name__}()'
        return f'{self.__class__.__name__}({self!s})'

Models

Message

rejected.models.Message

Bases: BaseModel

The deserialized message to be processed by the consumer.

Source code in rejected/models.py
class Message(pydantic.BaseModel):
    """The deserialized message to be processed by the consumer."""

    delivery_tag: int | None
    exchange: str | None
    routing_key: str | None
    returned: bool = False

    body: typing.Any

    app_id: str | None
    content_encoding: str | None
    content_type: str | None
    correlation_id: str | None
    delivery_mode: int | None
    expiration: str | None
    headers: dict[
        str, bool | dict[str, typing.Any] | float | int | str | bytes
    ]
    message_id: str | None
    type: str | None
    priority: int | None
    redelivered: bool
    reply_to: str | None
    timestamp: datetime.datetime | None
    user_id: str | None

ProcessingContext

rejected.models.ProcessingContext

Bases: BaseModel

Tracks a single in-flight message being processed.

Source code in rejected/models.py
class ProcessingContext(pydantic.BaseModel):
    """Tracks a single in-flight message being processed."""

    model_config = pydantic.ConfigDict(arbitrary_types_allowed=True)

    connection: typing.Any  # connection.Connection (avoid circular import)
    channel: typing.Any  # pika.channel.Channel
    raw_body: bytes = b''  # original bytes before decoding
    received_at: float = pydantic.Field(default_factory=time.monotonic)
    message: Message
    measurement: measurement_mod.Measurement = pydantic.Field(
        default_factory=measurement_mod.Measurement
    )
    result: Result | None = None

Result

rejected.models.Result

Bases: IntEnum

Result codes returned by Consumer.execute() to indicate how the message should be handled by the process.

Source code in rejected/models.py
class Result(enum.IntEnum):
    """Result codes returned by Consumer.execute() to indicate how the
    message should be handled by the process."""

    MESSAGE_ACK = 1
    MESSAGE_DROP = 2
    MESSAGE_REQUEUE = 3
    CONSUMER_EXCEPTION = 10
    MESSAGE_EXCEPTION = 11
    PROCESSING_EXCEPTION = 12
    UNHANDLED_EXCEPTION = 13

Testing

rejected.testing.AsyncTestCase

Bases: IsolatedAsyncioTestCase

:class:unittest.IsolatedAsyncioTestCase subclass for testing :class:~rejected.consumer.Consumer and :class:~rejected.consumer.FunctionalConsumer classes.

Source code in rejected/testing.py
class AsyncTestCase(unittest.IsolatedAsyncioTestCase):
    """:class:`unittest.IsolatedAsyncioTestCase` subclass for testing
    :class:`~rejected.consumer.Consumer` and
    :class:`~rejected.consumer.FunctionalConsumer` classes.

    """

    _consumer: consumer._Consumer | None = None
    _last_ctx: models.ProcessingContext | None = None

    async def asyncSetUp(self) -> None:
        await super().asyncSetUp()
        self.correlation_id = str(uuid.uuid4())
        self.process = self._create_process()
        self.consumer = self._create_consumer()
        self.channel = self.process.connections['mock'].channel
        self.exc_info: (
            tuple[type[BaseException], BaseException, typing.Any] | None
        ) = None

    @property
    def published_messages(self) -> list['PublishedMessage']:
        """Return a list of :class:`PublishedMessage` extracted from
        all calls to :meth:`pika.channel.Channel.basic_publish` during
        the test.

        """
        return [
            PublishedMessage(
                body=c[2]['body'],
                exchange=c[2]['exchange'],
                properties=c[2]['properties'],
                routing_key=c[2]['routing_key'],
            )
            for c in self.channel.basic_publish.mock_calls
        ]

    def get_consumer(self) -> type[consumer._Consumer]:
        """Override to return the consumer class for testing."""
        return consumer.Consumer

    def get_settings(self) -> dict[str, typing.Any]:
        """Override to provide settings to the consumer during
        construction.

        """
        return {}

    def create_context(
        self,
        message_body: typing.Any = None,
        content_type: str = 'application/json',
        message_type: str | None = None,
        properties: dict[str, typing.Any] | None = None,
        exchange: str = 'rejected',
        routing_key: str = 'test',
    ) -> models.ProcessingContext:
        """Create a :class:`~rejected.models.ProcessingContext` for
        testing.

        The body is stored as raw bytes on the message, matching what
        RabbitMQ delivers. Non-bytes/str bodies are serialized via the
        :class:`~rejected.codecs.Codec`. Use :meth:`process_message`
        to also run the codec decode step before the consumer sees the
        message (matching production behavior).

        """
        properties = properties or {}
        properties.setdefault('content_type', content_type)
        properties.setdefault('correlation_id', self.correlation_id)
        properties.setdefault(
            'timestamp',
            int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
        )
        properties.setdefault('type', message_type)

        if (
            isinstance(message_body, dict)
            and properties.get('content_type') == 'application/json'
        ):
            message_body = json.dumps(message_body)
        if isinstance(message_body, str):
            message_body = message_body.encode('utf-8')

        mock_conn = mock.Mock(spec=connection.Connection)
        mock_conn.is_running = True

        msg = models.Message(
            delivery_tag=1,
            exchange=exchange,
            routing_key=routing_key,
            body=message_body,
            app_id=properties.get('app_id', 'rejected.testing'),
            content_encoding=properties.get('content_encoding'),
            content_type=properties.get('content_type'),
            correlation_id=properties.get(
                'correlation_id', self.correlation_id
            ),
            delivery_mode=properties.get('delivery_mode', 1),
            expiration=properties.get('expiration'),
            headers=properties.get('headers', {}),
            message_id=properties.get('message_id', str(uuid.uuid4())),
            type=properties.get('type'),
            priority=properties.get('priority'),
            redelivered=False,
            reply_to=properties.get('reply_to'),
            returned=False,
            timestamp=(
                datetime.datetime.fromtimestamp(
                    properties['timestamp'], tz=datetime.UTC
                )
                if properties.get('timestamp')
                else None
            ),
            user_id=properties.get('user_id'),
        )
        return models.ProcessingContext(
            connection=mock_conn,
            channel=self.process.connections['mock'].channel,
            message=msg,
        )

    @property
    def measurement(self) -> measurement_mod.Measurement | None:
        """Return the Measurement for the last processed message."""
        if self._last_ctx:
            return self._last_ctx.measurement
        return None

    async def process_message(
        self,
        message_body: typing.Any = None,
        content_type: str = 'application/json',
        message_type: str | None = None,
        properties: dict[str, typing.Any] | None = None,
        exchange: str = 'rejected',
        routing_key: str = 'routing-key',
    ) -> measurement_mod.Measurement:
        """Process a message as if it were being delivered by RabbitMQ.

        Builds a :class:`~rejected.models.ProcessingContext` and
        passes it through the consumer's ``execute`` method.

        If an exception is not raised, returns the
        :class:`~rejected.measurement.Measurement` collected during
        processing.

        :raises: :exc:`rejected.exceptions.ConsumerException`
        :raises: :exc:`rejected.exceptions.MessageException`
        :raises: :exc:`rejected.exceptions.ProcessingException`

        """
        ctx = self.create_context(
            message_body,
            content_type,
            message_type,
            properties,
            exchange,
            routing_key,
        )
        self._last_ctx = ctx

        # Decode body through the codec, matching process.py behavior
        if self.process.codec and ctx.message.body is not None:
            msg = ctx.message
            ctx.raw_body = msg.body if isinstance(msg.body, bytes) else b''
            try:
                msg.body = await self.process.codec.decode(
                    msg.body, msg.content_type, msg.content_encoding, msg.type
                )
            except codecs.DecodeError as err:
                ctx.result = models.Result.MESSAGE_EXCEPTION
                raise exceptions.MessageException(
                    'Failed to decode message body'
                ) from err

        # Patch _log_exception to capture exc_info for re-raising
        original_log = self.consumer._log_exception

        def _capture_log(
            ctx_: models.ProcessingContext, msg_format: str, *args: typing.Any
        ) -> None:
            import sys

            self.exc_info = sys.exc_info()  # type: ignore[assignment]
            original_log(ctx_, msg_format, *args)

        self.consumer._log_exception = _capture_log  # type: ignore[assignment]

        await self.consumer.execute(ctx)

        match ctx.result:
            case models.Result.CONSUMER_EXCEPTION:
                raise exceptions.ConsumerException()
            case models.Result.MESSAGE_EXCEPTION:
                raise exceptions.MessageException()
            case models.Result.PROCESSING_EXCEPTION:
                raise exceptions.ProcessingException()
            case models.Result.UNHANDLED_EXCEPTION:
                if self.exc_info:
                    raise self.exc_info[1]
                raise AssertionError('UNHANDLED_EXCEPTION')
            case models.Result.MESSAGE_REQUEUE:
                raise AssertionError(
                    'Message was requeued — consumer returned MESSAGE_REQUEUE'
                )
        return ctx.measurement

    @staticmethod
    def _create_channel() -> mock.Mock:
        return mock.Mock(spec=channel.Channel)

    def _create_connection(self) -> mock.Mock:
        obj = mock.Mock(spec=asyncio_connection.AsyncioConnection)
        obj.channel = self._create_channel()
        obj.channel.connection = obj
        return obj

    def _create_consumer(self) -> consumer._Consumer:
        cls = self.get_consumer()
        obj = cls(config_module.Settings(self.get_settings()), self.process)
        obj.set_channel('mock', self.process.connections['mock'].channel)
        return obj

    def _create_process(self) -> mock.Mock:
        obj = mock.Mock(spec=process.Process)
        obj.connections = {'mock': self._create_connection()}
        obj.sentry_client = True if sentry_sdk else None
        obj.codec = codecs.Codec()
        return obj

Attributes

published_messages property

Return a list of :class:PublishedMessage extracted from all calls to :meth:pika.channel.Channel.basic_publish during the test.

measurement property

Return the Measurement for the last processed message.

Functions

get_consumer()

Override to return the consumer class for testing.

Source code in rejected/testing.py
def get_consumer(self) -> type[consumer._Consumer]:
    """Override to return the consumer class for testing."""
    return consumer.Consumer

get_settings()

Override to provide settings to the consumer during construction.

Source code in rejected/testing.py
def get_settings(self) -> dict[str, typing.Any]:
    """Override to provide settings to the consumer during
    construction.

    """
    return {}

create_context(message_body=None, content_type='application/json', message_type=None, properties=None, exchange='rejected', routing_key='test')

Create a :class:~rejected.models.ProcessingContext for testing.

The body is stored as raw bytes on the message, matching what RabbitMQ delivers. Non-bytes/str bodies are serialized via the :class:~rejected.codecs.Codec. Use :meth:process_message to also run the codec decode step before the consumer sees the message (matching production behavior).

Source code in rejected/testing.py
def create_context(
    self,
    message_body: typing.Any = None,
    content_type: str = 'application/json',
    message_type: str | None = None,
    properties: dict[str, typing.Any] | None = None,
    exchange: str = 'rejected',
    routing_key: str = 'test',
) -> models.ProcessingContext:
    """Create a :class:`~rejected.models.ProcessingContext` for
    testing.

    The body is stored as raw bytes on the message, matching what
    RabbitMQ delivers. Non-bytes/str bodies are serialized via the
    :class:`~rejected.codecs.Codec`. Use :meth:`process_message`
    to also run the codec decode step before the consumer sees the
    message (matching production behavior).

    """
    properties = properties or {}
    properties.setdefault('content_type', content_type)
    properties.setdefault('correlation_id', self.correlation_id)
    properties.setdefault(
        'timestamp',
        int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
    )
    properties.setdefault('type', message_type)

    if (
        isinstance(message_body, dict)
        and properties.get('content_type') == 'application/json'
    ):
        message_body = json.dumps(message_body)
    if isinstance(message_body, str):
        message_body = message_body.encode('utf-8')

    mock_conn = mock.Mock(spec=connection.Connection)
    mock_conn.is_running = True

    msg = models.Message(
        delivery_tag=1,
        exchange=exchange,
        routing_key=routing_key,
        body=message_body,
        app_id=properties.get('app_id', 'rejected.testing'),
        content_encoding=properties.get('content_encoding'),
        content_type=properties.get('content_type'),
        correlation_id=properties.get(
            'correlation_id', self.correlation_id
        ),
        delivery_mode=properties.get('delivery_mode', 1),
        expiration=properties.get('expiration'),
        headers=properties.get('headers', {}),
        message_id=properties.get('message_id', str(uuid.uuid4())),
        type=properties.get('type'),
        priority=properties.get('priority'),
        redelivered=False,
        reply_to=properties.get('reply_to'),
        returned=False,
        timestamp=(
            datetime.datetime.fromtimestamp(
                properties['timestamp'], tz=datetime.UTC
            )
            if properties.get('timestamp')
            else None
        ),
        user_id=properties.get('user_id'),
    )
    return models.ProcessingContext(
        connection=mock_conn,
        channel=self.process.connections['mock'].channel,
        message=msg,
    )

process_message(message_body=None, content_type='application/json', message_type=None, properties=None, exchange='rejected', routing_key='routing-key') async

Process a message as if it were being delivered by RabbitMQ.

Builds a :class:~rejected.models.ProcessingContext and passes it through the consumer's execute method.

If an exception is not raised, returns the :class:~rejected.measurement.Measurement collected during processing.

:raises: :exc:rejected.exceptions.ConsumerException :raises: :exc:rejected.exceptions.MessageException :raises: :exc:rejected.exceptions.ProcessingException

Source code in rejected/testing.py
async def process_message(
    self,
    message_body: typing.Any = None,
    content_type: str = 'application/json',
    message_type: str | None = None,
    properties: dict[str, typing.Any] | None = None,
    exchange: str = 'rejected',
    routing_key: str = 'routing-key',
) -> measurement_mod.Measurement:
    """Process a message as if it were being delivered by RabbitMQ.

    Builds a :class:`~rejected.models.ProcessingContext` and
    passes it through the consumer's ``execute`` method.

    If an exception is not raised, returns the
    :class:`~rejected.measurement.Measurement` collected during
    processing.

    :raises: :exc:`rejected.exceptions.ConsumerException`
    :raises: :exc:`rejected.exceptions.MessageException`
    :raises: :exc:`rejected.exceptions.ProcessingException`

    """
    ctx = self.create_context(
        message_body,
        content_type,
        message_type,
        properties,
        exchange,
        routing_key,
    )
    self._last_ctx = ctx

    # Decode body through the codec, matching process.py behavior
    if self.process.codec and ctx.message.body is not None:
        msg = ctx.message
        ctx.raw_body = msg.body if isinstance(msg.body, bytes) else b''
        try:
            msg.body = await self.process.codec.decode(
                msg.body, msg.content_type, msg.content_encoding, msg.type
            )
        except codecs.DecodeError as err:
            ctx.result = models.Result.MESSAGE_EXCEPTION
            raise exceptions.MessageException(
                'Failed to decode message body'
            ) from err

    # Patch _log_exception to capture exc_info for re-raising
    original_log = self.consumer._log_exception

    def _capture_log(
        ctx_: models.ProcessingContext, msg_format: str, *args: typing.Any
    ) -> None:
        import sys

        self.exc_info = sys.exc_info()  # type: ignore[assignment]
        original_log(ctx_, msg_format, *args)

    self.consumer._log_exception = _capture_log  # type: ignore[assignment]

    await self.consumer.execute(ctx)

    match ctx.result:
        case models.Result.CONSUMER_EXCEPTION:
            raise exceptions.ConsumerException()
        case models.Result.MESSAGE_EXCEPTION:
            raise exceptions.MessageException()
        case models.Result.PROCESSING_EXCEPTION:
            raise exceptions.ProcessingException()
        case models.Result.UNHANDLED_EXCEPTION:
            if self.exc_info:
                raise self.exc_info[1]
            raise AssertionError('UNHANDLED_EXCEPTION')
        case models.Result.MESSAGE_REQUEUE:
            raise AssertionError(
                'Message was requeued — consumer returned MESSAGE_REQUEUE'
            )
    return ctx.measurement

rejected.testing.PublishedMessage

Contains information about messages published during a test.

:param str exchange: The exchange the message was published to :param str routing_key: The routing key used :param pika.spec.BasicProperties properties: AMQP message properties :param bytes body: AMQP message body

.. versionadded:: 3.18.9

Source code in rejected/testing.py
class PublishedMessage:
    """Contains information about messages published during a test.

    :param str exchange: The exchange the message was published to
    :param str routing_key: The routing key used
    :param pika.spec.BasicProperties properties: AMQP message properties
    :param bytes body: AMQP message body

    .. versionadded:: 3.18.9

    """

    __slots__ = ['body', 'exchange', 'properties', 'routing_key']

    def __init__(
        self,
        exchange: str,
        routing_key: str,
        properties: spec.BasicProperties,
        body: bytes,
    ) -> None:
        self.exchange = exchange
        self.routing_key = routing_key
        self.properties = properties
        self.body = body

    def __repr__(self) -> str:
        return (
            f'<PublishedMessage exchange="{self.exchange}"'
            f' routing_key="{self.routing_key}">'
        )

Measurement

rejected.measurement.Measurement

Per-message instrumentation collector.

Accumulates counters, durations, tags, and values during message processing, then submitted to statsd and/or Prometheus by the :class:~rejected.process.Process.

.. versionadded:: 3.13.0

Source code in rejected/measurement.py
class Measurement:
    """Per-message instrumentation collector.

    Accumulates counters, durations, tags, and values during message
    processing, then submitted to statsd and/or Prometheus by the
    :class:`~rejected.process.Process`.

    .. versionadded:: 3.13.0

    """

    def __init__(self) -> None:
        self.durations: dict[str, list[float]] = {}
        self.counters: collections.Counter[str] = collections.Counter()
        self.message_age: float = 0.0
        self.tags: dict[str, str | bool | int] = {}
        self.values: dict[str, int | float] = {}

    def decr(self, key: str, value: int = 1) -> None:
        """Decrement a counter.

        :param key: The key to decrement
        :param value: The value to decrement by

        """
        self.counters[key] -= value

    def incr(self, key: str, value: int = 1) -> None:
        """Increment a counter.

        :param key: The key to increment
        :param value: The value to increment by

        """
        self.counters[key] += value

    def add_duration(self, key: str, value: float) -> None:
        """Add a duration for the specified key.

        :param key: The value name
        :param value: The value

        .. versionadded:: 3.19.0

        """
        if key not in self.durations:
            self.durations[key] = []
        self.durations[key].append(value)

    def set_tag(self, key: str, value: str | bool | int) -> None:
        """Set a tag for metrics submission.

        :param key: The tag name
        :param value: The tag value

        """
        self.tags[key] = value

    def set_value(self, key: str, value: int | float) -> None:
        """Set a numeric value.

        :param key: The value name
        :param value: The value

        """
        self.values[key] = value

    @contextlib.contextmanager
    def track_duration(self, key: str) -> typing.Generator[None, None, None]:
        """Context manager that records the duration of the wrapped
        block.

        :param key: The timing name

        """
        if key not in self.durations:
            self.durations[key] = []
        start_time = time.monotonic()
        try:
            yield
        finally:
            self.durations[key].append(time.monotonic() - start_time)

Functions

add_duration(key, value)

Add a duration for the specified key.

:param key: The value name :param value: The value

.. versionadded:: 3.19.0

Source code in rejected/measurement.py
def add_duration(self, key: str, value: float) -> None:
    """Add a duration for the specified key.

    :param key: The value name
    :param value: The value

    .. versionadded:: 3.19.0

    """
    if key not in self.durations:
        self.durations[key] = []
    self.durations[key].append(value)

decr(key, value=1)

Decrement a counter.

:param key: The key to decrement :param value: The value to decrement by

Source code in rejected/measurement.py
def decr(self, key: str, value: int = 1) -> None:
    """Decrement a counter.

    :param key: The key to decrement
    :param value: The value to decrement by

    """
    self.counters[key] -= value

incr(key, value=1)

Increment a counter.

:param key: The key to increment :param value: The value to increment by

Source code in rejected/measurement.py
def incr(self, key: str, value: int = 1) -> None:
    """Increment a counter.

    :param key: The key to increment
    :param value: The value to increment by

    """
    self.counters[key] += value

set_tag(key, value)

Set a tag for metrics submission.

:param key: The tag name :param value: The tag value

Source code in rejected/measurement.py
def set_tag(self, key: str, value: str | bool | int) -> None:
    """Set a tag for metrics submission.

    :param key: The tag name
    :param value: The tag value

    """
    self.tags[key] = value

set_value(key, value)

Set a numeric value.

:param key: The value name :param value: The value

Source code in rejected/measurement.py
def set_value(self, key: str, value: int | float) -> None:
    """Set a numeric value.

    :param key: The value name
    :param value: The value

    """
    self.values[key] = value

track_duration(key)

Context manager that records the duration of the wrapped block.

:param key: The timing name

Source code in rejected/measurement.py
@contextlib.contextmanager
def track_duration(self, key: str) -> typing.Generator[None, None, None]:
    """Context manager that records the duration of the wrapped
    block.

    :param key: The timing name

    """
    if key not in self.durations:
        self.durations[key] = []
    start_time = time.monotonic()
    try:
        yield
    finally:
        self.durations[key].append(time.monotonic() - start_time)

Mixins

rejected.mixins.GarbageCollectorMixin

Consumer mixin to periodically call gc.collect in the :meth:on_finish method.

By default, gc.collect is invoked every 10,000 messages.

To configure frequency of collection, include a gc_collection_frequency setting in the consumer configuration.

Source code in rejected/mixins.py
class GarbageCollectorMixin:
    """Consumer mixin to periodically call ``gc.collect`` in the
    :meth:`on_finish` method.

    By default, ``gc.collect`` is invoked every 10,000 messages.

    To configure frequency of collection, include a
    ``gc_collection_frequency`` setting in the consumer configuration.

    """

    DEFAULT_GC_FREQUENCY: typing.ClassVar[int] = 10000

    def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
        super().__init__(*args, **kwargs)
        settings = kwargs.get('settings') or (args[0] if args else None)
        gc_freq = self.DEFAULT_GC_FREQUENCY
        if settings is not None and hasattr(settings, 'get'):
            gc_freq = settings.get(
                'gc_collection_frequency', self.DEFAULT_GC_FREQUENCY
            )
        self._collection_cycle: int = gc_freq
        self._cycles_left: int = self._collection_cycle

    @property
    def collection_cycle(self) -> int:
        """Call :func:`gc.collect` every this many messages."""
        return self._collection_cycle

    @collection_cycle.setter
    def collection_cycle(self, value: int | None) -> None:
        """Set the number of messages to process before invoking
        ``gc.collect``.

        """
        if value is not None:
            self._collection_cycle = value
            self._cycles_left = min(self._cycles_left, self._collection_cycle)

    async def on_finish(self) -> None:
        """Used to initiate the garbage collection"""
        if hasattr(super(), 'on_finish'):
            await super().on_finish()  # type: ignore[misc]
        self._cycles_left -= 1
        if self._cycles_left <= 0:
            num_collected = gc.collect()
            self._cycles_left = self._collection_cycle
            LOGGER.debug(
                'garbage collection run, %d objects evicted', num_collected
            )

Attributes

collection_cycle property writable

Call :func:gc.collect every this many messages.

Functions

on_finish() async

Used to initiate the garbage collection

Source code in rejected/mixins.py
async def on_finish(self) -> None:
    """Used to initiate the garbage collection"""
    if hasattr(super(), 'on_finish'):
        await super().on_finish()  # type: ignore[misc]
    self._cycles_left -= 1
    if self._cycles_left <= 0:
        num_collected = gc.collect()
        self._cycles_left = self._collection_cycle
        LOGGER.debug(
            'garbage collection run, %d objects evicted', num_collected
        )