freshcrate
Skin:/
Home > Frameworks > aiokafka

aiokafka

Kafka integration with asyncio

Why this rank:Strong adoptionRelease freshnessHealthy release cadence

Description

aiokafka ======== .. image:: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml/badge.svg?branch=master :target: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml?query=branch%3Amaster :alt: |Build status| .. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master :target: https://codecov.io/gh/aio-libs/aiokafka/branch/master :alt: |Coverage| .. image:: https://badges.gitter.im/Join%20Chat.svg :target: https://gitter.im/aio-libs/Lobby :alt: |Chat on Gitter| asyncio client for Kafka AIOKafkaProducer **************** AIOKafkaProducer is a high-level, asynchronous message producer. Example of AIOKafkaProducer usage: .. code-block:: python from aiokafka import AIOKafkaProducer import asyncio async def send_one(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait("my_topic", b"Super message") finally: # Wait for all pending messages to be delivered or expire. await producer.stop() asyncio.run(send_one()) AIOKafkaConsumer **************** AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.11). Example of AIOKafkaConsumer usage: .. code-block:: python from aiokafka import AIOKafkaConsumer import asyncio async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() asyncio.run(consume()) Documentation ------------- https://aiokafka.readthedocs.io/ Running tests ------------- Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It's required for the ``keytool`` utility, used to generate ssh keys for some tests. Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):: sudo apt-get install -y libkrb5-dev krb5-user make setup Running tests with coverage:: make cov To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable:: make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1 Test running cheat-sheet: * ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor. * ``make test FLAGS="-k consumer"`` - run only the consumer tests. * ``make test FLAGS="-m 'not ssl'"`` - run tests excluding ssl. * ``make test FLAGS="--no-pull"`` - do not try to pull new docker image before test run.

Release History

VersionChangesUrgencyDate
v0.14.0New features: * Add rack-aware fetching from the closest in-sync replica (KIP-392) via the new ``client_rack`` option on :class:`AIOKafkaConsumer`. When set and the brokers support ``FetchRequest v11`` (Kafka 2.4+) with a ``replica.selector.class`` configured, the consumer will fetch from a same-rack follower instead of the partition leader, reducing cross-AZ traffic and tail latency. (prs #1159 and #1160 by @GlebShipilov) Bugfixes: * Fix type annotation for `AIOKafkaAdmiHigh4/29/2026
0.13.0Imported from PyPI (0.13.0)Low4/21/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.13.0Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementationsLow1/2/2026
v0.12.0New features: * Build mac x86_64 wheels (pr #1029) * Add support for Python 3.13, drop support for Python 3.8 due to end of life (pr #1061) * Remove duplicate error logging during rebalance (pr #1025 by @y4n9squared) Bugfixes: * Quote username in SCRAM auth (pr #1043) Improved Documentation: * Fix building of readthedocs documentation (pr #1034) * Fix typo in producer documentation (pr #1036 by @lgo)Low10/26/2024
v0.11.0New features: * Implement DeleteRecords API ([KIP-204](https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API)) (pr #969 by @vmaurin) Bugfixes: * Fix serialization for batch (issue #886, pr #887 by @ydjin0602) * Fix type annotation for `AIOKafkaAdminClient.create_partitions` (pr #978 by @alm0ra) * Fix `NotControllerError` in `AIOKafkaAdminClient.create_topics` and other methods (issue #995) * Fix unintended caLow6/30/2024
v0.10.0New features: * Support static membership protocol, [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) (issue #680, pr #941 by @patkivikram and @joshuaherrera) Bugfixes: * Fix extra dependencies (issue #952)Low12/15/2023
v0.10.0.a0New features: * Support static membership protocol, [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) (issue #680, pr #941 by @patkivikram and @joshuaherrera) Low12/13/2023
v0.9.0New features: * Include `kafka-python` into `aiokafka`'s code base (issue #928 and others) * Replace `python-snappy` and `zstandard` with `cramjam` (issue #930) * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) * Improve send performance (issue #943) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov)Low12/4/2023
0.9.0.rc1New features: * Include `kafka-python` into `aiokafka`'s code base * Replace `python-snappy` and `zstandard` with `cramjam` * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) * Improve send performance (issue #943) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov)Low11/29/2023
v0.9.0.rc0New features: * Include `kafka-python` into `aiokafka`'s code base * Replace `python-snappy` and `zstandard` with `cramjam` * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov)Low11/21/2023
v0.8.1New features: * Drop support for Python 3.7 due to end of life (pr #893) Bugfixes: * Add SASL authentication support to `AIOKafkaAdminClient` (issue #889, pr #890 by @selevit) Improved Documentation: * Update `security_protocol` argument docstring (issue #883, pr #884 by @gabrielmbmb) * Remove incorrect `await` for `AIOKafkaConsumer.highwater()` (pr #858 by @yi-jiayu)Low6/1/2023
v0.8.0New features: * Add codec for ZStandard compression (KIP-110) (pr #801) * Add basic admin client functionality (pr #811 started by @gabriel-tincu) * Drop support for Python 3.6, add support and pre-built packages for Python 3.10 (pr #841) Bugfixes: * Fix `KeyError` on solitary abort marker (issue #781, pr #782 by @pikulmar) * Fix handling unsupported compression codec (issue #795) * Handled other SASL mechanism in logging (issue #852, pr #861 by @mangin) Improved DocumentaLow11/21/2022
v0.7.2Bugfixes: * Fix `CancelledError` handling in sender (issue #710) * Fix exception for weakref use after object deletion (issue #755) * Fix consumer's `start()` method hanging after being idle for more than `max_poll_interval_ms` (issue #764) Improved Documentation: * Add `SASL_PLAINTEXT` and `SASL_SSL` to valid values of security protocol attribute (pr #768 by @pawelrubin)Low9/2/2021
v0.7.1Bugfixes: * Allow group coordinator to close when all brokers are unavailable (issue #659 and pr #660 by @dkilgore90) * Exclude `.so` from source distribution to fix usage of sdist tarball (issue #681 and pr #684 by ods) * Add `dataclasses` backport package to dependencies for Python 3.6 (pr #690 by @ods) * Fix initialization without running loop (issue #689 and pr #690 by @ods) * Fix consumer fetcher for python3.9 (pr #672 by @dutradda) * Make sure generation and member id are Low6/4/2021
v0.7.0New features: * Add support for Python 3.8 and 3.9. (issue #569, pr #669 and #676 by @ods) * Drop support for Python 3.5. (pr #667 by @ods) * Add OAUTHBEARER as a new sasl_mechanism. (issue #618 and pr #630 by @oulydna) Bugfixes: * Fix memory leak in kafka consumer when consumer is in idle state not consuming any message. (issue #628 and pr #629 by @iamsinghrajat) Low10/28/2020
v0.6.1.dev0Release v0.6.1.dev0Low6/19/2020
v0.6.0New features: * Add async context manager support for both Producer and Consumer. (pr #613 and #494 by @nimish) * Upgrade to kafka-python version 2.0.0 and set it as non-strict parameter. (issue #590 by @yumendy and #558 by @originalgremlin) * Make loop argument optional (issue #544) * SCRAM-SHA-256 and SCRAM-SHA-512 support for SASL authentication (issue #571 and pr #588 by @SukiCZ) * Added headers param to AIOKafkaProducer.send_and_wait (pr #553 by @megabotan) * Add `consumer.last_pLow5/15/2020
v0.5.2Bugfixes: * Fix ConnectionError breaking metadata sync background task (issue #517 and #512) * Fix event_waiter reference before assignment (pr #504 by @romantolkachyov) * Bump version of kafka-python Low5/15/2020
v0.5.1New features: * Add SASL support with both SASL plain and SASL GGSAPI. Support also includes Broker v0.9.0, but you will need to explicitly pass ``api_version="0.9"``. (Big thanks to @cyrbil and @jsurloppe for working on this) * Added support for max_poll_interval_ms and rebalance_timeout_ms settings ( issue #67) * Added pause/resume API for AIOKafkaConsumer. (issue #304) * Added header support to both AIOKafkaConsumer and AIOKafkaProducer for brokers v0.11 and above. (issue #4Low3/10/2019
v0.5.0New features: * Add full support for V2 format messages with a Cython extension. Those are used for Kafka >= 0.11.0.0 * Added support for transactional producing (issue #182) * Added support for indempotent producing with `enable_idempotence` parameter * Added support for `fetch_max_bytes` in AIOKafkaConsumer. This can help limit the amount of data transferred in a single roundtrip to the broker, which is essential for consumers with a large number of partitions Bugfixes: * Low12/28/2018
v0.4.3Fix issue #444 and #436 related to a memory leak in `asyncio.shield()`Low11/1/2018
v0.4.2The work here was concentrated on fixing bugs after the coordination refactors on v0.4.0. Hope it will serve you better now! Bugfix: * Added error propagation from coordinator to the main consumer. Before consumer just stopped with error logged. (issue #294) * Fix manual partition assignment, broken in 0.4.0 (issue #394) * Fixed RecursionError in MessageAccumulator.add_message (issue #409) * Update kafka-python to latest 1.4.3 and added support for Python3.7 * Dropped support for PLow9/12/2018
v0.4.1* Fix issue when offset commit error reports wrong partition in log (issue #353) * Add ResourceWarning when Producer, Consumer or Connections are not closed properly (issue #295) * Fix Subscription None in GroupCoordinator._do_group_rejoin (issue #306)Low5/13/2018
v0.4.0 Major changes: * Full refactor of the internals of AIOKafkaConsumer. Needed to avoid several race conditions in code (PR #286, fixes #258, #264 and #261) * Rewrote Records parsing protocol to allow implementation of newer protocol versions later * Added C extension for Records parsing protocol, boosting the speed of produce/consume routines significantly * Added an experimental batch producer API for unique cases, where user want's to control batching himself (by @shargan) Low2/1/2018
v0.3.1* Added `AIOKafkaProducer.flush()` method. (PR #209 by @vineet-rh) * Fixed a bug with uvloop involving `float("inf")` for timeout. (PR #210 by dmitry-moroz) * Changed test runner to allow running tests on OSX. (PR #213 by @shargan)Low2/1/2018
v0.3.0Starting from 0.3.0 we discourage imports from `kafka` namespace. Starting from `0.4.0` aiokafka will stop supporting objects imported from `kafka` namespace. Please import from `aiokafka` namespaces instead. * Moved all public structures and errors to `aiokafka` namespace. You will no longer need to import from `kafka` namespace. * Changed ConsumerRebalanceListener to support either function or coroutine for `on_partitions_assigned` and `on_partitions_revoked` callbacks. (PR #190 bLow8/17/2017
v0.2.3* Fixed retry problem in Producer, when buffer is not reset to 0 offset. Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184) * Fixed how Producer handles retries on Leader node failure. It just did not work before... Thanks to @blugowski for the help in locating the problem. (issue #176, issue #173) * Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)Low7/23/2017
v0.2.2* Force reconnect after KafkaTimeoutException. (PR #149 by @Artimi) * Fixed compacted topic handling. It could skip messages if some of them were compacted (issue #71) * Fixed old issue with new topics not visible in pattern subscription (issue #46) * Another fix for Consumer race condition on JoinGroup. This forces Leader to wait for new metadata before assigning partitions. (issue #118) * Changed metadata listener in Coordinator to avoid 2 rejoins in a rare condition (issue #108) * `geLow4/17/2017
v0.2.1- Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas) - Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls will result in ConsumerStoppedError (PR #81) - Added `exclude_internal_topics` option for Consumer (PR #111) - Better support for pattern subscription when used with `group_id` (part of PR #111) - Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now notiLow2/19/2017
v0.2.0- Added SSL support. (PR #81 by Drizzt1991) - Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas) - Fixed `next_record` recursion (PR #94 by fabregas) - Fixed Heartbeat fail if no consumers (PR #92 by fabregas) - Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991) - Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991) - Fix kafka-python typos in docs (PR #69 by jeffwidman) - Topics and partitions are nLow12/18/2016
v0.1.4- Bumped python-kafka version to 1.3.1 and Kafka to 0.10.1.0. - Fixed auto version detection, to correctly handle 0.10.0.0 version - Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers. This allows a `timestamp` to be associated with messages. - Changed lz4 compression framing, as it was changed due to KIP-57 in new message format. - Minor refactorings Big thanks to @fabregas for the hard work on this release (PR #60) Low11/7/2016
v0.1.3- Fixed bug with infinite loop on heartbeats with autocommit=True. #44 - Bumped python-kafka to version 1.1.1 - Fixed docker test runner with multiple interfaces - Minor documentation fixes Low10/18/2016
v0.1.2- Added Python3.5 usage example to docs - Don't raise retryable exceptions in 3.5's async for iterator - Fix Cancellation issue with producer's `send_and_wait` method Low4/30/2016
v0.1.1Fix packaging issues. Removed unneeded files from package. Low4/30/2016
v0.1.0Added full support for Kafka 9.0. Older Kafka versions are not tested. Low4/15/2016

Dependencies & License Audit

Loading dependencies...

Similar Packages

schemathesisProperty-based testing framework for Open API and GraphQL based appsv4.21.1
ctranslate2Fast inference engine for Transformer modelsv4.8.0
cadwynProduction-ready community-driven modern Stripe-like API versioning in FastAPI7.0.0
tqdmFast, Extensible Progress Meterv4.68.1
inspect-aiFramework for large language model evaluationsmain@2026-06-05

More from pypi

markitdownUtility tool for converting various files to Markdown
fastapiFastAPI framework, high performance, easy to learn, fast to code, ready for production
djangoA high-level Python web framework that encourages rapid development and clean, pragmatic design.
flaskA simple framework for building complex web applications.

More in Frameworks

spec_driven_developSpec-Driven Develop is a platform-agnostic AI agent skill that automates the pre-development workflow for large-scale complex tasks. It is not a framework, not a runtime, not a package manager — it is
deer-flowAn open-source long-horizon SuperAgent harness that researches, codes, and creates. With the help of sandboxes, memories, tools, skill, subagents and message gateway, it handles different levels of ta
tqdmFast, Extensible Progress Meter
simBuild, deploy, and orchestrate AI agents. Sim is the central intelligence layer for your AI workforce.