.. _kafka_python_difference:

Difference between aiokafka and kafka-python
--------------------------------------------

.. _kip-41:
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records

.. _kip-62:
    https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

.. _a lot of code:
  https://gist.github.com/tvoinarovskyi/05a5d083a0f96cae3e9b4c2af580be74

.. _kafka-python: https://github.com/dpkp/kafka-python
.. _Java Client API: https://kafka.apache.org/documentation/#api


Why do we need another library?
===============================

`kafka-python`_ is a great project, which tries to fully mimic the interface
of the `Java Client API`_. It is more *feature* oriented, rather than *speed*, but
still gives quite good throughput. It's actively developed and is fast to react
to changes in the Java client.

While `kafka-python`_ has a lot of great features it is made to be used in a
**Threaded** environment. Even more, it mimics Java's client, making it
**Java's threaded** environment, which does not have that much of
*asynchronous* ways of doing things. It's not **bad** as Java's Threads are
very powerful with the ability to use multiple cores.

The API itself just can't be adopted to be used in an asynchronous way (even
though the library does asynchronous IO using *selectors*). It has too much
blocking behavior including *blocking* socket usage, threading synchronization,
etc. Examples would be:

* *bootstrap*, which blocks in the constructor itself
* blocking iterator for consumption
* sending produce requests block if buffer is full

All those can't be changed to use :class:`~concurrent.futures.Future` API
seamlessly. So to get a normal, non-blocking interface based on
:class:`~concurrent.futures.Future`'s and coroutines a new library needed to be
written.


API differences and rationale
=============================

**aiokafka** has some differences in API design. While the **Producer** is
mostly the same, **Consumer** has some significant differences, that we want
to talk about.


Consumer has no ``poll()`` method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In `kafka-python`_, :meth:`kafka.KafkaConsumer.poll` is a blocking call that performs
not only message fetching, but also:

  * Socket polling using `epoll`, `kqueue` or other available API of your OS.
  * Ensures liveliness of a Consumer Group
  * Does autocommit

This will never be a case where you own the IO loop, at least not with socket
polling. To avoid misunderstandings as to why do those methods behave in a
different way :class:`.AIOKafkaConsumer` exposes this interface under the name
:meth:`~.AIOKafkaConsumer.getmany` with some other differences described below.


Rebalances are happening in the background
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In original Kafka Java Client before 0.10.1 heartbeats were only sent if
``poll()`` was called. This lead to a lot of issues (reasons for `KIP-41`_ and
`KIP-62`_ proposals) and workarounds using :meth:`~kafka.KafkaConsumer.pause`
and :meth:`poll(0) <kafka.KafkaConsumer.poll>` for heartbeats. After Java client
and kafka-python also changed the behaviour to a background Thread sending, that
mitigated most issues.

**aiokafka** delegates heartbeating to a background *Task* and will send
heartbeats to Coordinator as long as the *event loop* is running. This
behaviour is very similar to Java client, with the exception of no heartbeats
on long CPU bound methods.

But **aiokafka** also performs group rebalancing in the same background Task. This
means, that the processing time between :meth:`~.AIOKafkaConsumer.getmany` calls actually does not
affect rebalancing. ``KIP-62`` proposed to provide ``max.poll.interval.ms`` as
the configuration for both *rebalance timeout* and *consumer processing
timeout*. In **aiokafka** it does not make much sense, as those 2 are not
related, so we added both configurations (``rebalance_timeout_ms`` and
``max_poll_interval_ms``).

It is quite critical to provide :class:`.ConsumerRebalanceListener` if you need
to control rebalance start and end moments. In that case set the
``rebalance_timeout_ms`` to the maximum time your application can spend
waiting in the callback. If your callback waits for the last
:meth:`~.AIOKafkaConsumer.getmany` result to be processed, it is safe to set
this value to ``max_poll_interval_ms``, same as in Java client.


Prefetching is more sophisticated
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In the `Kafka Java Client <Java Client API>`_ and `kafka-python`_, the
prefetching is very simple, as it only performs prefetches:

* in :meth:`~kafka.KafkaConsumer.poll` call if we don't have enough data stored to satisfy
  another :meth:`~kafka.KafkaConsumer.poll`
* in the *iterator* interface if we have processed *nearly* all data.

A very simplified version would be:

.. code:: python

    def poll():
        max_records = self.config['max_poll_records']
        records = consumer.fethed_records(max_records)
        if not consumer.has_enough_records(max_records)
            consumer.send_fetches()  # prefetch another batch
        return records

This works great for throughput as the algorithm is simple and we pipeline
IO task with record processing.

But it does not perform as great in case of **semantic partitioning**, where
you may have per-partition processing. In this case latency will be bound to
the time of processing of data in all topics.

Which is why **aiokafka** tries to do prefetches **per partition**. For
example, if we processed all data pending for a partition in *iterator*
interface, **aiokafka** will *try* to prefetch new data right away. The same
interface could be built on top of `kafka-python`_'s
:meth:`~kafka.KafkaConsumer.pause` API, but would require `a lot of code`_.

.. note::

    Using :meth:`~.AIOKafkaConsumer.getmany` without specifying partitions will result in the same
    prefetch behaviour as using :meth:`~kafka.KafkaConsumer.poll`.