Using SSL with aiokafkaΒΆ
An example of SSL usage with aiokafka. Please read SSL Authentication for more information.
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from aiokafka.errors import TopicPartition
context = create_ssl_context(
cafile="./ca-cert", # CA used to sign certificate.
# `CARoot` of JKS store container
certfile="./cert-signed", # Signed certificate
keyfile="./cert-key", # Private Key file of `certfile` certificate
password="123123"
)
async def produce_and_consume():
# Produce
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9093',
security_protocol="SSL", ssl_context=context)
await producer.start()
try:
msg = await producer.send_and_wait(
'my_topic', b"Super Message", partition=0)
finally:
await producer.stop()
consumer = AIOKafkaConsumer(
"my_topic", bootstrap_servers='localhost:9093',
security_protocol="SSL", ssl_context=context)
await consumer.start()
try:
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
fetch_msg = await consumer.getone()
finally:
await consumer.stop()
print("Success", msg, fetch_msg)
if __name__ == "__main__":
asyncio.run(produce_and_consume())
Output:
>>> python3 ssl_consume_produce.py
Success RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=32) ConsumerRecord(topic='my_topic', partition=0, offset=32, timestamp=1479393347381, timestamp_type=0, key=None, value=b'Super Message', checksum=469650252, serialized_key_size=-1, serialized_value_size=13)