Migrating from 3.x to 4.0
Rejected 4.0 is a major release that modernizes the framework around asyncio,
Pydantic models, and a cleaner module structure. This guide covers all breaking
changes and how to update your consumers.
Python Version
Rejected 4.0 requires Python 3.11+. Python 2.7 and 3.5-3.10 are no longer supported.
Tornado Removed
The most significant change in 4.0 is the removal of the Tornado dependency.
Rejected now uses asyncio directly via pika's AsyncioConnection adapter.
If your consumer uses any Tornado APIs (tornado.gen, tornado.ioloop,
tornado.concurrent, etc.), you must replace them with their asyncio
equivalents.
| 3.x (Tornado) | 4.0 (asyncio) |
|---|---|
@gen.coroutine / yield |
async def / await |
tornado.gen.sleep(n) |
asyncio.sleep(n) |
tornado.concurrent.Future |
asyncio.Future |
self.io_loop |
asyncio.get_event_loop() |
self.yield_to_ioloop() |
await asyncio.sleep(0) |
Consumer Classes
All methods are now async
In 3.x, consumer methods (process, prepare, finish, etc.) were
synchronous by default and could optionally use @gen.coroutine. In 4.0,
all lifecycle methods must be async def:
# 3.x
class MyConsumer(consumer.Consumer):
def process(self):
self.logger.info(self.body)
# 3.x with coroutine
class MyConsumer(consumer.Consumer):
@gen.coroutine
def process(self):
yield self.do_something()
# 4.0
import rejected
class MyConsumer(rejected.Consumer):
async def process(self):
self.logger.info(self.body)
await self.do_something()
publish_message is now async
In 3.x, publish_message was synchronous. In 4.0, it is async because
it may encode the message body using an async Avro codec:
# 3.x
self.publish_message('exchange', 'routing-key', {}, self.body)
# 4.0
await self.publish_message('exchange', 'routing-key', {}, self.body)
SmartConsumer and PublishingConsumer removed
SmartConsumer and PublishingConsumer have been removed. Their functionality
is now built into the base Consumer class:
- Auto-deserialization (previously
SmartConsumer):Consumernow automatically decodes message bodies based oncontent_typeandcontent_encoding.self.bodyreturns the deserialized value. - Publishing (previously
PublishingConsumer):Consumerhas always supportedpublish_message; the separate class was a no-op since 3.x.
# 3.x
class MyConsumer(consumer.SmartConsumer):
def process(self):
# self.body is auto-deserialized
data = self.body
self.publish_message(...)
# 4.0
import rejected
class MyConsumer(rejected.Consumer):
async def process(self):
# self.body is auto-deserialized (same behavior)
data = self.body
await self.publish_message(...)
FunctionalConsumer (new)
4.0 introduces FunctionalConsumer for concurrent message processing.
Unlike Consumer (which holds a lock), FunctionalConsumer passes a
ProcessingContext to each method and allows multiple messages to be
processed in parallel:
import rejected
class MyConcurrentConsumer(rejected.FunctionalConsumer):
async def process(self, ctx: rejected.ProcessingContext):
self.logger.info('Body: %s', ctx.message.body)
Use FunctionalConsumer with qos_prefetch > 1 in the configuration.
ACK_PROCESSING_EXCEPTIONS behavior
The ACK_PROCESSING_EXCEPTIONS class attribute still works the same way.
When True, messages that raise ProcessingException are acknowledged
instead of rejected.
Module Reorganization
New modules
| Module | Purpose |
|---|---|
rejected.codecs |
Message serialization/deserialization (extracted from Consumer) |
rejected.config |
Configuration loading and Settings class |
rejected.connection |
RabbitMQ connection management (extracted from process.py) |
rejected.exceptions |
Exception classes (extracted from consumer.py) |
rejected.measurement |
Measurement class (moved from rejected.data) |
rejected.models |
Pydantic models for config, messages, and processing context |
rejected.prometheus |
Prometheus metrics exporter (new) |
Removed modules
| Module | Replacement |
|---|---|
rejected.data |
rejected.measurement.Measurement, rejected.models.Message |
Import changes
In 4.0, the primary consumer classes, exceptions, and models are all
available directly from the rejected package:
# 3.x
from rejected.consumer import Consumer, ConsumerException, MessageException
from rejected.data import Measurement
# 4.0 (preferred — use top-level imports)
import rejected
class MyConsumer(rejected.Consumer): ...
class MyConcurrent(rejected.FunctionalConsumer): ...
# Exceptions
raise rejected.ConsumerException('...')
raise rejected.MessageException('...')
raise rejected.ProcessingException('...')
# Models
ctx: rejected.ProcessingContext
msg: rejected.Message
result: rejected.Result
Sub-module imports still work for backward compatibility or if you prefer explicit paths:
from rejected.exceptions import ConsumerException
from rejected.measurement import Measurement
from rejected.models import Message, ProcessingContext
Data Classes
Message
The rejected.data.Message and rejected.data.Properties classes have been
replaced by rejected.Message (a Pydantic model). If you accessed these
directly in tests or custom code:
# 3.x
from rejected.data import Message, Properties
msg = Message(channel, method, header, properties, body)
msg.properties.content_type
# 4.0
import rejected
msg = rejected.Message(
delivery_tag=1,
exchange='exchange',
routing_key='key',
body=b'...',
content_type='application/json',
# ... all properties are top-level fields
)
msg.content_type
Key differences:
- Properties are top-level fields on
Message, not nested undermsg.properties - The AMQP
typeproperty is accessed asmsg.type(matching the AMQP property name) Messageis a PydanticBaseModelwith validationbodystarts as raw bytes; theCodecclass decodes it asynchronously before the consumer sees it
Measurement
rejected.data.Measurement has moved to rejected.measurement.Measurement.
The API is unchanged.
Testing
AsyncTestCase
rejected.testing.AsyncTestCase now extends
unittest.IsolatedAsyncioTestCase instead of tornado.testing.AsyncTestCase.
# 3.x
from rejected import testing
class MyTest(testing.AsyncTestCase):
def get_consumer(self):
return MyConsumer
@testing.gen_test
def test_it(self):
yield self.process_message({'key': 'value'})
# 4.0
from rejected import testing
class MyTest(testing.AsyncTestCase):
def get_consumer(self):
return MyConsumer
async def test_it(self):
await self.process_message({'key': 'value'})
Key changes:
- Test methods must be
async def(no@gen_testdecorator) setUp->asyncSetUp(called automatically by the framework)create_message->create_context(returns aProcessingContext)process_messagereturns aMeasurementon success (unchanged)process_messageraises exceptions directly on consumer errors (unchanged)
PublishedMessage
rejected.testing.PublishedMessage is unchanged. Access published messages
via self.published_messages in your tests.
Configuration
TOML support
4.0 adds TOML configuration file support alongside YAML. Use a .toml file
extension and the configuration will be parsed accordingly.
schema_registry (new)
Avro schema configuration has moved from the consumer level to the application level:
# 3.x (not available - schemas were not configurable)
# 4.0
Application:
schema_registry:
type: file # or http
uri: file:///etc/avro/{0}.avsc
Connection changes
The ssl field is now a top-level boolean on connection entries, separate
from ssl_options:
# 3.x (ssl was implied by the presence of ssl_options)
Connections:
rabbitmq:
host: rabbitmq.example.com
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.crt
# 4.0 (explicit ssl toggle + ssl_options)
Connections:
rabbitmq:
host: rabbitmq.example.com
ssl: true
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.crt
New optional connection fields have been added:
| Field | Description | Default |
|---|---|---|
frame_max |
Maximum AMQP frame size in bytes | 131072 |
socket_timeout |
Socket timeout in seconds | 10 |
The default SSL protocol has changed from PROTOCOL_TLS to
PROTOCOL_TLS_CLIENT, which enables certificate verification by default.
If you were relying on the old default without explicit certificate
validation, you may need to provide ca_certs or ca_path in your
ssl_options.
Consumer connections
The structured connection publisher_confirmation field has been renamed to
confirm:
# 3.x
connections:
- name: rabbitmq
consume: true
publisher_confirmation: true
# 4.0
connections:
- name: rabbitmq
consume: true
confirm: true
Consumer configuration changes
The influxdb_measurement consumer field has been removed along with
InfluxDB support. Use Prometheus metrics instead.
Logging changes
The debug_only handler option has been removed. In 3.x, this was used to
suppress console handlers when running as a daemon. Since 4.0 always runs
in the foreground, this option is no longer needed. Remove debug_only from
any handler definitions in your logging configuration.
Removed configuration
| Removed | Notes |
|---|---|
Daemon section |
Rejected no longer daemonizes; use systemd/supervisord |
stats.influxdb |
InfluxDB support removed; use Prometheus |
influxdb_measurement |
Consumer-level InfluxDB measurement name removed |
dynamic_qos |
QoS is now static via qos_prefetch |
debug_only (logging handler) |
No longer needed without daemon mode |
-f / --foreground CLI flag |
Rejected always runs in the foreground |
New CLI options
| Flag | Description |
|---|---|
-n / --max-messages N |
Process N messages per consumer then shut down |
SIGHUP reload
4.0 supports configuration reloading via SIGHUP. When the process receives
SIGHUP, it reloads the configuration file and restarts consumer processes
with the updated settings — no full process restart required.
Removed APIs
| 3.x API | 4.0 Replacement |
|---|---|
consumer.SmartConsumer |
rejected.Consumer (auto-deserializes) |
consumer.PublishingConsumer |
rejected.Consumer (always could publish) |
self.io_loop |
asyncio.get_event_loop() |
self.yield_to_ioloop() |
await asyncio.sleep(0) |
self.reply(...) |
Use await self.publish_message(...) with reply_to |
self.sentry_client |
Use self.send_exception_to_sentry() |
self.stats_add_timing(...) |
self.stats_add_duration(...) |
self.statsd_add_timing(...) |
self.stats_add_duration(...) |
self.statsd_incr(...) |
self.stats_incr(...) |
self.statsd_track_duration(...) |
self.stats_track_duration(...) |
rejected.data.Data |
Removed |
rejected.data.Message |
rejected.Message |
rejected.data.Properties |
Fields are on rejected.Message |
pickle content types |
Removed for security (RCE risk) |
Prometheus Metrics (new)
4.0 adds a built-in Prometheus metrics exporter. Enable it in the configuration:
See the Configuration page for the
full list of exposed metrics. Custom metrics emitted via stats_incr,
stats_add_duration, and stats_set_value are automatically forwarded
to Prometheus.
Quick Migration Checklist
- Ensure Python >= 3.11
- Update imports to use
import rejectedandrejected.Consumer, etc. - Replace
@gen.coroutine/yieldwithasync def/await - Make
process(),prepare(),finish()async - Add
awaittopublish_message()calls - Replace
SmartConsumer/PublishingConsumerwithrejected.Consumer - Replace
tornadoimports withasyncioequivalents - Replace
from rejected.data import ...withrejected.Message,rejected.measurement.Measurement, etc. - Update tests to use
async deftest methods (no@gen_test) - Rename
publisher_confirmationtoconfirmin connection config - Add
ssl: trueto connections that usessl_options - Remove
Daemonsection from config (if present) - Remove
stats.influxdbfrom config (use Prometheus instead) - Remove
influxdb_measurementfrom consumer configs - Remove
debug_onlyfrom logging handler configs - Remove any
picklecontent type usage - Replace deprecated
statsd_*method calls withstats_*equivalents