Serialization and compressionΒΆ

Kafka supports several compression types: gzip, snappy and lz4. You only need to specify the compression in Kafka Producer, Consumer will decompress automatically.

Note:

Messages are compressed in batches, so you will have more efficiency on larger batches. You can consider setting linger_ms to batch more data before sending.

By default value and key attributes of returned ConsumerRecord instances are bytes. You can use custom serializer/deserializer hooks to operate on objects instead of bytes in those attributes.

Producer

import json
import asyncio
from aiokafka import AIOKafkaProducer

def serializer(value):
    return json.dumps(value).encode()

async def produce():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=serializer,
        compression_type="gzip")

    await producer.start()
    data = {"a": 123.4, "b": "some string"}
    await producer.send('foobar', data)
    data = [1,2,3,4]
    await producer.send('foobar', data)
    await producer.stop()

asyncio.run(produce())

Consumer

import json
import asyncio
from aiokafka.errors import KafkaError
from aiokafka import AIOKafkaConsumer

def deserializer(serialized):
    return json.loads(serialized)

async def consume():
    # consumer will decompress messages automatically
    # in accordance to compression type specified in producer
    consumer = AIOKafkaConsumer(
        'foobar',
        bootstrap_servers='localhost:9092',
        value_deserializer=deserializer,
        auto_offset_reset='earliest')
    await consumer.start()
    data = await consumer.getmany(timeout_ms=10000)
    for tp, messages in data.items():
        for message in messages:
            print(type(message.value), message.value)
    await consumer.stop()

asyncio.run(consume())

Output:

>>> python3 producer.py
>>> python3 consumer.py
<class 'dict'> {'a': 123.4, 'b': 'some string'}
<class 'list'> [1,2,3,4]