# aiokafka

> Kafka integration with asyncio

- **URL**: https://www.freshcrate.ai/projects/aiokafka
- **Author**: pypi
- **Category**: Frameworks
- **Latest version**: `v0.14.0` (2026-04-29)
- **License**: Unknown
- **Source**: https://github.com/aio-libs/aiokafka/blob/master/CHANGES.rst
- **Homepage**: https://pypi.org/project/aiokafka/
- **Language**: Python
- **GitHub**: 1,380 stars, 259 forks
- **Registry**: pypi (`aiokafka`)
- **Tags**: `pypi`

## Description

aiokafka
========
.. image:: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml/badge.svg?branch=master
    :target: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml?query=branch%3Amaster
    :alt: |Build status|
.. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master
    :target: https://codecov.io/gh/aio-libs/aiokafka/branch/master
    :alt: |Coverage|
.. image:: https://badges.gitter.im/Join%20Chat.svg
    :target: https://gitter.im/aio-libs/Lobby
    :alt: |Chat on Gitter|

asyncio client for Kafka


AIOKafkaProducer
****************

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

.. code-block:: python

    from aiokafka import AIOKafkaProducer
    import asyncio

    async def send_one():
        producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
        # Get cluster layout and initial topic/partition leadership information
        await producer.start()
        try:
            # Produce message
            await producer.send_and_wait("my_topic", b"Super message")
        finally:
            # Wait for all pending messages to be delivered or expire.
            await producer.stop()

    asyncio.run(send_one())


AIOKafkaConsumer
****************

AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple
consumers to load balance consumption of topics (requires kafka >= 0.11).

Example of AIOKafkaConsumer usage:

.. code-block:: python

    from aiokafka import AIOKafkaConsumer
    import asyncio

    async def consume():
        consumer = AIOKafkaConsumer(
            'my_topic', 'my_other_topic',
            bootstrap_servers='localhost:9092',
            group_id="my-group")
        # Get cluster layout and join group `my-group`
        await consumer.start()
        try:
            # Consume messages
            async for msg in consumer:
                print("consumed: ", msg.topic, msg.partition, msg.offset,
                      msg.key, msg.value, msg.timestamp)
        finally:
            # Will leave consumer group; perform autocommit if enabled.
            await consumer.stop()

    asyncio.run(consume())


Documentation
-------------

https://aiokafka.readthedocs.io/


Running tests
-------------

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package,
or python source header files for compilation on Linux.
NOTE: You will also need a valid java installation. It's required for the ``keytool`` utility, used to
generate ssh keys for some tests.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::

    sudo apt-get install -y libkrb5-dev krb5-user
    make setup

Running tests with coverage::

    make cov

To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable::

    make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1

Test running cheat-sheet:

 * ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.
 * ``make test FLAGS="-k consumer"`` - run only the consumer tests.
 * ``make test FLAGS="-m 'not ssl'"`` - run tests excluding ssl.
 * ``make test FLAGS="--no-pull"`` - do not try to pull new docker image before test run.

## Recent releases

| Version | Date | Urgency | Changes |
| --- | --- | --- | --- |
| `v0.14.0` | 2026-04-29 | High | New features:  * Add rack-aware fetching from the closest in-sync replica (KIP-392) via the new   ``client_rack`` option on :class:`AIOKafkaConsumer`. When set and the brokers   support ``FetchRequest v11`` (Kafka 2.4+) with a ``replica.selector.class``   configured, the consumer will fetch from a same-rack follower instead of the   partition leader, reducing cross-AZ traffic and tail latency.   (prs #1159 and #1160 by @GlebShipilov)  Bugfixes:  * Fix type annotation for `AIOKafkaAdmi |
| `0.13.0` | 2026-04-21 | Low | Imported from PyPI (0.13.0) |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.13.0` | 2026-01-02 | Low | Breaking changes:  * Resolve API versions at connection with brokers   `api_version` parameter has been removed from the different clients (admin/consumer/producer)   (pr #1136 by @vmaurin)  Improved Documentation:  * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times`   (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman)   Bugfixes:  * Make KafkaStorageError retriable after metadata refresh like in other   implementations |
| `v0.12.0` | 2024-10-26 | Low | New features:  * Build mac x86_64 wheels (pr #1029) * Add support for Python 3.13, drop support for Python 3.8 due to end of life (pr #1061) * Remove duplicate error logging during rebalance (pr #1025 by @y4n9squared)   Bugfixes:  * Quote username in SCRAM auth (pr #1043)   Improved Documentation:  * Fix building of readthedocs documentation (pr #1034) * Fix typo in producer documentation (pr #1036 by @lgo) |

## Dependency audit

- **Score**: 84/100
- **Total deps**: 5
- **Resolved**: 3
- **Unresolved**: 2
- **License conflicts**: 0
- **Warnings**: 4
- **Scanned**: 2026-05-18

## Citation

- HTML: https://www.freshcrate.ai/projects/aiokafka
- Markdown: https://www.freshcrate.ai/projects/aiokafka.md
- Dependencies JSON: https://www.freshcrate.ai/api/projects/aiokafka/deps

_Generated by freshcrate.ai. Indexes pypi releases for AI-agent ecosystem packages._
