API Documentation¶
Producer class¶
- class aiokafka.AIOKafkaProducer(*, loop=None, bootstrap_servers='localhost', client_id=None, metadata_max_age_ms=300000, request_timeout_ms=40000, api_version='auto', acks=<object object>, key_serializer=None, value_serializer=None, compression_type=None, max_batch_size=16384, partitioner=<aiokafka.partitioner.DefaultPartitioner object>, max_request_size=1048576, linger_ms=0, send_backoff_ms=100, retry_backoff_ms=100, security_protocol='PLAINTEXT', ssl_context=None, connections_max_idle_ms=540000, enable_idempotence=False, transactional_id=None, transaction_timeout_ms=60000, sasl_mechanism='PLAIN', sasl_plain_password=None, sasl_plain_username=None, sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name=None, sasl_oauth_token_provider=None)[source]¶
A Kafka client that publishes records to the Kafka cluster.
The producer consists of a pool of buffer space that holds records that haven’t yet been transmitted to the server as well as a background task that is responsible for turning these records into requests and transmitting them to the cluster.
The
send()
method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.The acks config controls the criteria under which requests are considered complete. The
all
setting will result in waiting for all replicas to respond, the slowest but most durable setting.The key_serializer and value_serializer instruct how to turn the key and value objects the user provides into
bytes
.- Parameters
bootstrap_servers (str, list(str)) – a
host[:port]
string or list ofhost[:port]
strings that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default tolocalhost:9092
.client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Default:
aiokafka-producer-#
(appended with a unique number per instance)key_serializer (Callable) – used to convert user-supplied keys to bytes If not
None
, called asf(key),
should returnbytes
. Default:None
.value_serializer (Callable) – used to convert user-supplied message values to
bytes
. If notNone
, called asf(value)
, should returnbytes
. Default:None
.acks (Any) –
one of
0
,1
,all
. The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:0
: Producer will not wait for any acknowledgment from the server at all. The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.1
: The broker leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.all
: The broker leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
If unset, defaults to
acks=1
. If enable_idempotence isTrue
defaults toacks=all
compression_type (str) – The compression type for all data generated by the producer. Valid values are
gzip
,snappy
,lz4
,zstd
orNone
. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default:None
.max_batch_size (int) – Maximum size of buffered data per partition. After this amount
send()
coroutine will block until batch is drained. Default: 16384linger_ms (int) – The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay; that is, if first request is processed faster, than linger_ms, producer will wait
linger_ms - process_time
. Default: 0 (i.e. no delay).partitioner (Callable) – Callable used to determine which partition each message is assigned to. Called (after key serialization):
partitioner(key_bytes, all_partitions, available_partitions)
. The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the Java client so that messages with the same key are assigned to the same partition. When a key isNone
, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible).max_request_size (int) – The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. Default: 1048576.
metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
request_timeout_ms (int) – Produce request timeout in milliseconds. As it’s sent as part of
ProduceRequest
(it’s a blocking call), maximum waiting time can be up to2 * request_timeout_ms
. Default: 40000.retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
api_version (str) – specify which kafka API version to use. If set to
auto
, will attempt to infer the broker version by probing various APIs. Default:auto
security_protocol (str) – Protocol used to communicate with brokers. Valid values are:
PLAINTEXT
,SSL
,SASL_PLAINTEXT
,SASL_SSL
. Default:PLAINTEXT
.ssl_context (ssl.SSLContext) – pre-configured
SSLContext
for wrapping socket connections. Directly passed into asyncio’screate_connection()
. For more information see SSL Authentication. Default:None
connections_max_idle_ms (int) – Close idle connections after the number of milliseconds specified by this config. Specifying
None
will disable idle checks. Default: 540000 (9 minutes).enable_idempotence (bool) – When set to
True
, the producer will ensure that exactly one copy of each message is written in the stream. IfFalse
, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence acks to set toall
. If it is not explicitly set by the user it will be chosen. If incompatible values are set, aValueError
will be thrown. New in version 0.5.0.sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for
SASL_PLAINTEXT
orSASL_SSL
. Valid values are:PLAIN
,GSSAPI
,SCRAM-SHA-256
,SCRAM-SHA-512
,OAUTHBEARER
. Default:PLAIN
sasl_plain_username (str) – username for SASL
PLAIN
authentication. Default:None
sasl_plain_password (str) – password for SASL
PLAIN
authentication. Default:None
sasl_oauth_token_provider (
AbstractTokenProvider
) – OAuthBearer token provider instance. Default:None
Note
Many configuration parameters are taken from the Java client: https://kafka.apache.org/documentation.html#producerconfigs
- create_batch()[source]¶
Create and return an empty
BatchBuilder
.The batch is not queued for send until submission to
send_batch()
.- Returns
empty batch to be filled and submitted by the caller.
- Return type
- async send(topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None)[source]¶
Publish a message to a topic.
- Parameters
topic (str) – topic where the message will be published
value (Optional) –
message value. Must be type
bytes
, or be serializable tobytes
via configured value_serializer. If value isNone
, key is required and message acts as adelete
.See Kafka compaction documentation for more details. (compaction requires kafka >= 0.8.1)
partition (int, Optional) – optionally specify a partition. If not set, the partition will be selected using the configured partitioner.
key (Optional) – a key to associate with the message. Can be used to determine which partition to send the message to. If partition is
None
(and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key isNone
, partition is chosen randomly). Must be typebytes
, or be serializable to bytes via configured key_serializer.timestamp_ms (int, Optional) – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.
headers (Optional) – Kafka headers to be included in the message using the format
[("key", b"value")]
. Iterable of tuples where key is a normal string and value is a byte string.
- Returns
object that will be set when message is processed
- Return type
- Raises
KafkaTimeoutError – if we can’t schedule this record (pending buffer is full) in up to request_timeout_ms milliseconds.
Note
The returned future will wait based on request_timeout_ms setting. Cancelling the returned future will not stop event from being sent, but cancelling the
send()
coroutine itself will.
- async send_and_wait(topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None)[source]¶
Publish a message to a topic and wait the result
- async send_batch(batch, topic, *, partition)[source]¶
Submit a BatchBuilder for publication.
- Parameters
batch (BatchBuilder) – batch object to be published.
topic (str) – topic where the batch will be published.
partition (int) – partition where this batch will be published.
- Returns
- object that will be set when the batch is
delivered.
- Return type
Consumer class¶
- class aiokafka.AIOKafkaConsumer(*topics, loop=None, bootstrap_servers='localhost', client_id='aiokafka-0.10.0', group_id=None, group_instance_id=None, key_deserializer=None, value_deserializer=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, fetch_min_bytes=1, max_partition_fetch_bytes=1048576, request_timeout_ms=40000, retry_backoff_ms=100, auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms=5000, check_crcs=True, metadata_max_age_ms=300000, partition_assignment_strategy=(<class 'aiokafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>, ), max_poll_interval_ms=300000, rebalance_timeout_ms=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, consumer_timeout_ms=200, max_poll_records=None, ssl_context=None, security_protocol='PLAINTEXT', api_version='auto', exclude_internal_topics=True, connections_max_idle_ms=540000, isolation_level='read_uncommitted', sasl_mechanism='PLAIN', sasl_plain_password=None, sasl_plain_username=None, sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name=None, sasl_oauth_token_provider=None)[source]¶
A client that consumes records from a Kafka cluster.
The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers.
It also interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (feature of Kafka >= 0.9.0.0).
- Parameters
*topics (list(str)) – optional list of topics to subscribe to. If not set, call
subscribe()
orassign()
before consuming records. Passing topics directly is same as callingsubscribe()
API.bootstrap_servers (str, list(str)) –
a
host[:port]
string (or list ofhost[:port]
strings) that the consumer should contact to bootstrap initial cluster metadata.This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to
localhost:9092
.client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to
GroupCoordinator
for logging with respect to consumer group administration. Default:aiokafka-{version}
group_id (str or None) – name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
group_instance_id (str or None) – name of the group instance ID used for static membership (KIP-345)
key_deserializer (Callable) – Any callable that takes a raw message key and returns a deserialized key.
value_deserializer (Callable, Optional) – Any callable that takes a raw message value and returns a deserialized value.
fetch_min_bytes (int) – Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_max_wait_ms for more data to accumulate. Default: 1.
fetch_max_bytes (int) – The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned to ensure that the consumer can make progress. NOTE: consumer performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
fetch_max_wait_ms (int) – The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500.
max_partition_fetch_bytes (int) – The maximum amount of data per-partition the server will return. The maximum total memory used for a request
= #partitions * max_partition_fetch_bytes
. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. Default: 1048576.max_poll_records (int) – The maximum number of records returned in a single call to
getmany()
. DefaultsNone
, no limit.request_timeout_ms (int) – Client request timeout in milliseconds. Default: 40000.
retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
auto_offset_reset (str) – A policy for resetting offsets on
OffsetOutOfRangeError
errors:earliest
will move to the oldest available message,latest
will move to the most recent, andnone
will raise an exception so you can handle this case. Default:latest
.enable_auto_commit (bool) – If true the consumer’s offset will be periodically committed in the background. Default: True.
auto_commit_interval_ms (int) – milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000.
check_crcs (bool) – Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True
metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
partition_assignment_strategy (list) – List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order of the strategies in the list. When assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. Default: [
RoundRobinPartitionAssignor
]max_poll_interval_ms (int) – Maximum allowed time between calls to consume messages (e.g.,
getmany()
). If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. See KIP-62 for more information. Default 300000rebalance_timeout_ms (int) – The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to max.poll.interval.ms configuration, but as
aiokafka
will rejoin the group in the background, we decouple this setting to allow finer tuning by users that useConsumerRebalanceListener
to delay rebalacing. Defaults tosession_timeout_ms
session_timeout_ms (int) – Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms. Default: 10000
heartbeat_interval_ms (int) – The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management feature. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session_timeout_ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000
consumer_timeout_ms (int) – maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. Default: 200
api_version (str) – specify which kafka API version to use.
AIOKafkaConsumer
supports Kafka API versions >=0.9 only. If set toauto
, will attempt to infer the broker version by probing various APIs. Default:auto
security_protocol (str) – Protocol used to communicate with brokers. Valid values are:
PLAINTEXT
,SSL
,SASL_PLAINTEXT
,SASL_SSL
. Default:PLAINTEXT
.ssl_context (ssl.SSLContext) – pre-configured
SSLContext
for wrapping socket connections. Directly passed into asyncio’screate_connection()
. For more information see SSL Authentication. Default: None.exclude_internal_topics (bool) – Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+ Default: True
connections_max_idle_ms (int) – Close idle connections after the number of milliseconds specified by this config. Specifying None will disable idle checks. Default: 540000 (9 minutes).
isolation_level (str) –
Controls how to read messages written transactionally.
If set to
read_committed
,getmany()
will only return transactional messages which have been committed. If set toread_uncommitted
(the default),getmany()
will return all messages, even transactional messages which have been aborted.Non-transactional messages will be returned unconditionally in either mode.
Messages will always be returned in offset order. Hence, in read_committed mode,
getmany()
will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, read_committed consumers will not be able to read up to the high watermark when there are in flight transactions. Further, when in read_committed the seek_to_end method will return the LSO. See method docs below. Default:read_uncommitted
sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for
SASL_PLAINTEXT
orSASL_SSL
. Valid values are:PLAIN
,GSSAPI
,SCRAM-SHA-256
,SCRAM-SHA-512
,OAUTHBEARER
. Default:PLAIN
sasl_plain_username (str) – username for SASL
PLAIN
authentication. Default: Nonesasl_plain_password (str) – password for SASL
PLAIN
authentication. Default: Nonesasl_oauth_token_provider (AbstractTokenProvider) – OAuthBearer token provider instance. Default: None
Note
Many configuration parameters are taken from Java Client: https://kafka.apache.org/documentation.html#newconsumerconfigs
- assign(partitions)[source]¶
Manually assign a list of
TopicPartition
to this consumer.This interface does not support incremental assignment and will replace the previous assignment (if there was one).
- Parameters
partitions (list(TopicPartition)) – assignment for this instance.
- Raises
IllegalStateError – if consumer has already called
subscribe()
Warning
It is not possible to use both manual partition assignment with
assign()
and group assignment withsubscribe()
.Note
Manual topic assignment through this method does not use the consumer’s group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
- assignment()[source]¶
Get the set of partitions currently assigned to this consumer.
If partitions were directly assigned using
assign()
, then this will simply return the same partitions that were previously assigned.If topics were subscribed using
subscribe()
, then this will give the set of topic partitions currently assigned to the consumer (which may be empty if the assignment hasn’t happened yet or if the partitions are in the process of being reassigned).- Returns
the set of partitions currently assigned to this consumer
- Return type
- async beginning_offsets(partitions)[source]¶
Get the first offset for the given partitions.
This method does not change the current consumer position of the partitions.
Note
This method may block indefinitely if the partition does not exist.
- Parameters
partitions (list[TopicPartition]) – List of
TopicPartition
instances to fetch offsets for.- Returns
mapping of partition to earliest available offset.
- Return type
- Raises
UnsupportedVersionError – If the broker does not support looking up the offsets by timestamp.
KafkaTimeoutError – If fetch failed in request_timeout_ms.
New in version 0.3.0.
- async commit(offsets=None)[source]¶
Commit offsets to Kafka.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
Currently only supports kafka-topic offset storage (not Zookeeper)
When explicitly passing offsets use either offset of next record, or tuple of offset and metadata:
tp = TopicPartition(msg.topic, msg.partition) metadata = "Some utf-8 metadata" # Either await consumer.commit({tp: msg.offset + 1}) # Or position directly await consumer.commit({tp: (msg.offset + 1, metadata)})
Note
If you want fire and forget commit, like
commit_async()
in kafka-python, just run it in a task. Something like:fut = loop.create_task(consumer.commit()) fut.add_done_callback(on_commit_done)
- Parameters
offsets (dict, Optional) – A mapping from
TopicPartition
to(offset, metadata)
to commit with the configuredgroup_id
. Defaults to current consumed offsets for all subscribed partitions.- Raises
CommitFailedError – If membership already changed on broker.
IllegalOperation – If used with
group_id == None
.IllegalStateError – If partitions not assigned.
KafkaError – If commit failed on broker side. This could be due to invalid offset, too long metadata, authorization failure, etc.
ValueError – If offsets is of wrong format.
Changed in version 0.4.0: Changed
AssertionError
toIllegalStateError
in case of unassigned partition.Changed in version 0.4.0: Will now raise
CommitFailedError
in case membership changed, as (possibly) this partition is handled by another consumer.
- async committed(partition)[source]¶
Get the last committed offset for the given partition. (whether the commit happened by this process or another).
This offset will be used as the position for the consumer in the event of a failure.
This call will block to do a remote call to get the latest offset, as those are not cached by consumer (Transactional Producer can change them without Consumer knowledge as of Kafka 0.11.0)
- Parameters
partition (TopicPartition) – the partition to check
- Returns
The last committed offset, or None if there was no prior commit.
- Raises
IllegalOperation – If used with
group_id == None
- async end_offsets(partitions)[source]¶
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
This method does not change the current consumer position of the partitions.
Note
This method may block indefinitely if the partition does not exist.
- Parameters
partitions (list[TopicPartition]) – List of
TopicPartition
instances to fetch offsets for.- Returns
mapping of partition to last available offset + 1.
- Return type
- Raises
UnsupportedVersionError – If the broker does not support looking up the offsets by timestamp.
KafkaTimeoutError – If fetch failed in
request_timeout_ms
New in version 0.3.0.
- async getmany(*partitions, timeout_ms=0, max_records=None) Dict[aiokafka.structs.TopicPartition, List[aiokafka.structs.ConsumerRecord]] [source]¶
Get messages from assigned topics / partitions.
Prefetched messages are returned in batches by topic-partition. If messages is not available in the prefetched buffer this method waits timeout_ms milliseconds.
- Parameters
partitions (list[TopicPartition]) – The partitions that need fetching message. If no one partition specified then all subscribed partitions will be used
timeout_ms (int, Optional) – milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative. Default: 0
- Returns
topic to list of records since the last fetch for the subscribed list of topics and partitions
- Return type
Example usage:
data = await consumer.getmany() for tp, messages in data.items(): topic = tp.topic partition = tp.partition for message in messages: # Process message print(message.offset, message.key, message.value)
- async getone(*partitions) aiokafka.structs.ConsumerRecord [source]¶
Get one message from Kafka. If no new messages prefetched, this method will wait for it.
- Parameters
partitions (list(TopicPartition)) – Optional list of partitions to return from. If no partitions specified then returned message will be from any partition, which consumer is subscribed to.
- Returns
the message
- Return type
Will return instance of
collections.namedtuple( "ConsumerRecord", ["topic", "partition", "offset", "key", "value"])
Example usage:
while True: message = await consumer.getone() topic = message.topic partition = message.partition # Process message print(message.offset, message.key, message.value)
- highwater(partition)[source]¶
Last known highwater offset for a partition.
A highwater offset is the offset that will be assigned to the next message that is produced. It may be useful for calculating lag, by comparing with the reported position. Note that both position and highwater refer to the next offset – i.e., highwater offset is one greater than the newest available message.
Highwater offsets are returned as part of
FetchResponse
, so will not be available if messages for this partition were not requested yet.- Parameters
partition (TopicPartition) – partition to check
- Returns
offset if available
- Return type
- last_poll_timestamp(partition)[source]¶
Returns the timestamp of the last poll of this partition (in ms). It is the last time
highwater()
andlast_stable_offset()
were updated. However it does not mean that new messages were received.As with
highwater()
will not be available until some messages are consumed.- Parameters
partition (TopicPartition) – partition to check
- Returns
timestamp if available
- Return type
- last_stable_offset(partition)[source]¶
Returns the Last Stable Offset of a topic. It will be the last offset up to which point all transactions were completed. Only available in with isolation_level read_committed, in read_uncommitted will always return -1. Will return None for older Brokers.
As with
highwater()
will not be available until some messages are consumed.- Parameters
partition (TopicPartition) – partition to check
- Returns
offset if available
- Return type
- async offsets_for_times(timestamps)[source]¶
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
The consumer does not have to be assigned the partitions.
If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps,
None
will be returned for that partition.Note
This method may block indefinitely if the partition does not exist.
- Parameters
timestamps (dict(TopicPartition, int)) – mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC))
- Returns
mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp.
- Return type
- Raises
ValueError – If the target timestamp is negative
UnsupportedVersionError – If the broker does not support looking up the offsets by timestamp.
KafkaTimeoutError – If fetch failed in request_timeout_ms
New in version 0.3.0.
- partitions_for_topic(topic)[source]¶
Get metadata about the partitions for a given topic.
This method will return None if Consumer does not already have metadata for this topic.
- pause(*partitions)[source]¶
Suspend fetching from the requested partitions.
Future calls to
getmany()
will not return any records from these partitions until they have been resumed usingresume()
.Note: This method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
- Parameters
*partitions (list[TopicPartition]) – Partitions to pause.
- paused()[source]¶
Get the partitions that were previously paused using
pause()
.- Returns
partitions
- Return type
- async position(partition)[source]¶
Get the offset of the next record that will be fetched (if a record with that offset exists on broker).
- Parameters
partition (TopicPartition) – partition to check
- Returns
offset
- Return type
- Raises
IllegalStateError – partition is not assigned
Changed in version 0.4.0: Changed
AssertionError
toIllegalStateError
in case of unassigned partition
- resume(*partitions)[source]¶
Resume fetching from the specified (paused) partitions.
- Parameters
*partitions (list[TopicPartition]) – Partitions to resume.
- seek(partition, offset)[source]¶
Manually specify the fetch offset for a
TopicPartition
.Overrides the fetch offsets that the consumer will use on the next
getmany()
/getone()
call. If this API is invoked for the same partition more than once, the latest offset will be used on the next fetch.Note
You may lose data if this API is arbitrarily used in the middle of consumption to reset the fetch offsets. Use it either on rebalance listeners or after all pending messages are processed.
- Parameters
partition (TopicPartition) – partition for seek operation
offset (int) – message offset in partition
- Raises
ValueError – if offset is not a positive integer
IllegalStateError – partition is not currently assigned
Changed in version 0.4.0: Changed
AssertionError
toIllegalStateError
andValueError
in respective cases.
- async seek_to_beginning(*partitions)[source]¶
Seek to the oldest available offset for partitions.
- Parameters
*partitions – Optionally provide specific
TopicPartition
, otherwise default to all assigned partitions.- Raises
IllegalStateError – If any partition is not currently assigned
TypeError – If partitions are not instances of
TopicPartition
New in version 0.3.0.
- async seek_to_committed(*partitions)[source]¶
Seek to the committed offset for partitions.
- Parameters
*partitions – Optionally provide specific
TopicPartition
, otherwise default to all assigned partitions.- Returns
mapping of the currently committed offsets.
- Return type
- Raises
IllegalStateError – If any partition is not currently assigned
IllegalOperation – If used with
group_id == None
Changed in version 0.3.0: Changed
AssertionError
toIllegalStateError
in case of unassigned partition
- async seek_to_end(*partitions)[source]¶
Seek to the most recent available offset for partitions.
- Parameters
*partitions – Optionally provide specific
TopicPartition
, otherwise default to all assigned partitions.- Raises
IllegalStateError – If any partition is not currently assigned
TypeError – If partitions are not instances of
TopicPartition
New in version 0.3.0.
- async start()[source]¶
Connect to Kafka cluster. This will:
Load metadata for all cluster nodes and partition allocation
Wait for possible topic autocreation
Join group if
group_id
provided
- async stop()[source]¶
Close the consumer, while waiting for finalizers:
Commit last consumed message if autocommit enabled
Leave group if used Consumer Groups
- subscribe(topics=(), pattern=None, listener=None)[source]¶
Subscribe to a list of topics, or a topic regex pattern.
Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
This method is incompatible with
assign()
.- Parameters
topics (list) – List of topics for subscription.
pattern (str) – Pattern to match available topics. You must provide either topics or pattern, but not both.
listener (ConsumerRebalanceListener) –
Optionally include listener callback, which will be called before and after each rebalance operation. As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if one of the following events trigger:
Number of partitions change for any of the subscribed topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to the consumer group
When any of these events are triggered, the provided listener will be invoked first to indicate that the consumer’s assignment has been revoked, and then again when the new assignment has been received. Note that this listener will immediately override any listener set in a previous call to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call.
- Raises
IllegalStateError – if called after previously calling
assign()
ValueError – if neither topics or pattern is provided or both are provided
TypeError – if listener is not a
ConsumerRebalanceListener
Helpers¶
- aiokafka.helpers.create_ssl_context(*, cafile=None, capath=None, cadata=None, certfile=None, keyfile=None, password=None, crlfile=None)[source]¶
Simple helper, that creates an
SSLContext
based on params similar to those in kafka-python, but with some restrictions like:check_hostname is not optional, and will be set to
True
crlfile option is missing. It is fairly hard to test it.
- Parameters
cafile (str) – Certificate Authority file path containing certificates used to sign broker certificates. If CA not specified (by either cafile, capath, cadata) default system CA will be used if found by OpenSSL. For more information see
load_verify_locations()
. Default:None
capath (str) – Same as cafile, but points to a directory containing several CA certificates. For more information see
load_verify_locations()
. Default:None
cadata (str, bytes) – Same as cafile, but instead contains already read data in either ASCII or bytes format. Can be used to specify DER-encoded certificates, rather than PEM ones. For more information see
load_verify_locations()
. Default:None
certfile (str) – optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate’s authenticity. For more information see
load_cert_chain()
. Default:None
.keyfile (str) – optional filename containing the client private key. For more information see
load_cert_chain()
. Default:None
.password (str) – optional password to be used when loading the certificate chain. For more information see
load_cert_chain()
. Default:None
.
Abstracts¶
- class aiokafka.abc.AbstractTokenProvider(**config)[source]¶
A Token Provider must be used for the SASL OAuthBearer protocol.
The implementation should ensure token reuse so that multiple calls at connect time do not create multiple tokens. The implementation should also periodically refresh the token in order to guarantee that each call returns an unexpired token.
A timeout error should be returned after a short period of inactivity so that the broker can log debugging info and retry.
Token Providers MUST implement the
token()
method- abstract async token()[source]¶
An async callback returning a
str
ID/Access Token to be sent to the Kafka client. In case where a synchoronous callback is needed, implementations like following can be used:from aiokafka.abc import AbstractTokenProvider class CustomTokenProvider(AbstractTokenProvider): async def token(self): return await asyncio.get_running_loop().run_in_executor( None, self._token) def _token(self): # The actual synchoronous token callback.
- class aiokafka.abc.ConsumerRebalanceListener[source]¶
A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer’s directly assign partitions, those partitions will never be reassigned and this callback is not applicable.
When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure. Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when then number of partitions is administratively adjusted).
There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in the
on_partitions_revoked()
, call we can ensure that any time partition assignment changes the offset gets saved.Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example, consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the number of page views per users for each five minute window. Let’s say the topic is partitioned by the user id so that all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over consumption.
This callback will execute during the rebalance process, and Consumer will wait for callbacks to finish before proceeding with group join.
It is guaranteed that all consumer processes will invoke
on_partitions_revoked()
prior to any process invokingon_partitions_assigned()
. So if offsets or other state is saved in theon_partitions_revoked()
call, it should be saved by the time the process taking over that partition has theiron_partitions_assigned()
callback called to load the state.- abstract on_partitions_revoked(revoked)[source]¶
A coroutine or function the user can implement to provide cleanup or custom state save on the start of a rebalance operation.
This method will be called before a rebalance operation starts and after the consumer stops fetching data.
If you are using manual commit you have to commit all consumed offsets here, to avoid duplicate message delivery after rebalance is finished.
Note
This method is only called before rebalances. It is not called prior to
AIOKafkaConsumer.stop()
- Parameters
revoked (list(TopicPartition)) – the partitions that were assigned to the consumer on the last rebalance
- abstract on_partitions_assigned(assigned)[source]¶
A coroutine or function the user can implement to provide load of custom consumer state or cache warmup on completion of a successful partition re-assignment.
This method will be called after partition re-assignment completes and before the consumer starts fetching data again.
It is guaranteed that all the processes in a consumer group will execute their
on_partitions_revoked()
callback before any instance executes itson_partitions_assigned()
callback.- Parameters
assigned (list(TopicPartition)) – the partitions assigned to the consumer (may include partitions that were previously assigned)
SSL Authentication¶
Security is not an easy thing, at least when you want to do it right. Before diving in on how to setup aiokafka to work with SSL, make sure there is a need for SSL Authentication and go through the official documentation for SSL support in Kafka itself.
aiokafka provides only ssl_context
as a parameter for Consumer and
Producer classes. This is done intentionally, as it is recommended that you
read through the
Python ssl documentation
to have some understanding on the topic. Although if you know what you are
doing, there is a simple helper function aiokafka.helpers.create_ssl_context()
,
that will create an ssl.SSLContext
based on similar params to kafka-python.
A few notes on Kafka’s SSL store types. Java uses JKS store type, that contains normal certificates, same as ones OpenSSL (and Python, as it’s based on OpenSSL) uses, but encodes them into a single, encrypted file, protected by another password. Just look the internet on how to extract CARoot, Certificate and Key from JKS store.
See also the Using SSL with aiokafka example.
SASL Authentication¶
As of version 0.5.1 aiokafka supports SASL authentication using both PLAIN
and GSSAPI
SASL methods. Be sure to install gssapi python module to use
GSSAPI
.
Please consult the official documentation
for setup instructions on Broker side. Client configuration is pretty much the
same as Java’s, consult the sasl_*
options in Consumer and Producer API
Reference for more details. See AbstractTokenProvider
.
Error handling¶
Both consumer and producer can raise exceptions that inherit from the
aiokafka.errors.KafkaError
class.
Exception handling example:
from aiokafka.errors import KafkaError, KafkaTimeoutError
# ...
try:
send_future = await producer.send('foobar', b'test data')
response = await send_future # wait until message is produced
except KafkaTimeoutError:
print("produce timeout... maybe we want to resend data again?")
except KafkaError as err:
print("some kafka error on produce: {}".format(err))
Consumer errors¶
Consumer’s async for
and
getone()
/getmany()
interfaces
will handle those differently. Possible consumer errors include:
TopicAuthorizationFailedError
- topic requires authorization. Always raisedOffsetOutOfRangeError
- if you don’t specify auto_offset_reset policy and started cosumption from not valid offset. Always raisedRecordTooLargeError
- broker has a MessageSet larger than max_partition_fetch_bytes. async for - log error, get* will raise it.InvalidMessageError
- CRC check on MessageSet failed due to connection failure or bug. Always raised. Changed in version0.5.0
, before we ignored this error inasync for
.
Other references¶
- class aiokafka.producer.message_accumulator.BatchBuilder(magic, batch_size, compression_type, *, is_transactional)[source]¶
- class aiokafka.consumer.group_coordinator.GroupCoordinator(client, subscription, *, group_id='aiokafka-default-group', group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, enable_auto_commit=True, auto_commit_interval_ms=5000, assignors=(<class 'aiokafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>, ), exclude_internal_topics=True, max_poll_interval_ms=300000, rebalance_timeout_ms=30000)[source]¶
GroupCoordinator implements group management for single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
From a high level, Kafka’s group management protocol consists of the following sequence of actions:
Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
Group/Leader Selection: The coordinator (one of Kafka nodes) select the members of the group and chooses one member (one of client’s) as the leader.
State Assignment: The leader receives metadata for all members and assigns partitions to them.
Group Stabilization: Each member receives the state assigned by the leader and begins processing. Between each phase coordinator awaits all clients to respond. If some do not respond in time - it will revoke their membership
- NOTE: Try to maintain same log messages and behaviour as Java and
kafka-python clients:
https://github.com/apache/kafka/blob/0.10.1.1/clients/src/main/java/ org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java https://github.com/apache/kafka/blob/0.10.1.1/clients/src/main/java/ org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
- class aiokafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor[source]¶
The roundrobin assignor lays out all the available partitions and all the available consumers. It then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumers.)
For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
- The assignment will be:
C0: [t0p0, t0p2, t1p1] C1: [t0p1, t1p0, t1p2]
When subscriptions differ across consumer instances, the assignment process still considers each consumer instance in round robin fashion but skips over an instance if it is not subscribed to the topic. Unlike the case when subscriptions are identical, this can result in imbalanced assignments.
For example, suppose we have three consumers C0, C1, C2, and three topics t0, t1, t2, with unbalanced partitions t0p0, t1p0, t1p1, t2p0, t2p1, t2p2, where C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
- The assignment will be:
C0: [t0p0] C1: [t1p0] C2: [t1p1, t2p0, t2p1, t2p2]
Errors¶
- exception aiokafka.errors.ConsumerStoppedError[source]¶
Raised on get* methods of Consumer if it’s cancelled, even pending ones.
- aiokafka.errors.CoordinatorLoadInProgressError¶
- aiokafka.errors.CoordinatorNotAvailableError¶
- exception aiokafka.errors.IllegalOperation[source]¶
Raised if you try to execute an operation, that is not available with current configuration. For example trying to commit if no group_id was given.
- aiokafka.errors.InvalidMessageError¶
- aiokafka.errors.NotCoordinatorError¶
- exception aiokafka.errors.ProducerFenced(msg='There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)')[source]¶
Another producer with the same transactional ID went online. NOTE: As it seems this will be raised by Broker if transaction timeout occurred also.
Structs¶
- class aiokafka.structs.TopicPartition(topic: str, partition: int)[source]¶
A topic and partition tuple
- class aiokafka.structs.RecordMetadata(topic: str, partition: int, topic_partition: aiokafka.structs.TopicPartition, offset: int, timestamp: Optional[int], timestamp_type: int, log_start_offset: Optional[int])[source]¶
Returned when a
AIOKafkaProducer
sends a message- offset: int¶
The unique offset of the message in this partition.
See Offsets and Consumer Position for more details on offsets.
- timestamp_type: int¶
The timestamp type of this record.
If the broker respected the timestamp passed to
AIOKafkaProducer.send()
,0
will be returned (CreateTime
).If the broker set it’s own timestamp,
1
will be returned (LogAppendTime
).
- topic_partition: aiokafka.structs.TopicPartition¶
- class aiokafka.structs.ConsumerRecord(*args, **kwds)[source]¶
-
- key: Optional[aiokafka.structs.KT]¶
The key (or None if no key is specified)
- value: Optional[aiokafka.structs.VT]¶
The value
- class aiokafka.structs.KT¶
The type of a key.
- class aiokafka.structs.VT¶
The type of a value.
Protocols¶
- aiokafka.protocol.produce.ProduceRequest¶
alias of [<class ‘aiokafka.protocol.produce.ProduceRequest_v0’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v1’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v2’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v3’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v4’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v5’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v6’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v7’>, <class ‘aiokafka.protocol.produce.ProduceRequest_v8’>]