Description
aiokafka ======== .. image:: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml/badge.svg?branch=master :target: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml?query=branch%3Amaster :alt: |Build status| .. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master :target: https://codecov.io/gh/aio-libs/aiokafka/branch/master :alt: |Coverage| .. image:: https://badges.gitter.im/Join%20Chat.svg :target: https://gitter.im/aio-libs/Lobby :alt: |Chat on Gitter| asyncio client for Kafka AIOKafkaProducer **************** AIOKafkaProducer is a high-level, asynchronous message producer. Example of AIOKafkaProducer usage: .. code-block:: python from aiokafka import AIOKafkaProducer import asyncio async def send_one(): producer = AIOKafkaProducer(bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait("my_topic", b"Super message") finally: # Wait for all pending messages to be delivered or expire. await producer.stop() asyncio.run(send_one()) AIOKafkaConsumer **************** AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.11). Example of AIOKafkaConsumer usage: .. code-block:: python from aiokafka import AIOKafkaConsumer import asyncio async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() asyncio.run(consume()) Documentation ------------- https://aiokafka.readthedocs.io/ Running tests ------------- Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It's required for the ``keytool`` utility, used to generate ssh keys for some tests. Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):: sudo apt-get install -y libkrb5-dev krb5-user make setup Running tests with coverage:: make cov To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable:: make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1 Test running cheat-sheet: * ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor. * ``make test FLAGS="-k consumer"`` - run only the consumer tests. * ``make test FLAGS="-m 'not ssl'"`` - run tests excluding ssl. * ``make test FLAGS="--no-pull"`` - do not try to pull new docker image before test run.
Release History
| Version | Changes | Urgency | Date |
|---|---|---|---|
| 0.13.0 | Imported from PyPI (0.13.0) | Low | 4/21/2026 |
| v0.13.0 | Breaking changes: * Resolve API versions at connection with brokers `api_version` parameter has been removed from the different clients (admin/consumer/producer) (pr #1136 by @vmaurin) Improved Documentation: * Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times` (pr #1068 by @jzvandenoever) * Fix Java Client API reference (pr #1069 by @emmanuel-ferdman) Bugfixes: * Make KafkaStorageError retriable after metadata refresh like in other implementations | Low | 1/2/2026 |
| v0.12.0 | New features: * Build mac x86_64 wheels (pr #1029) * Add support for Python 3.13, drop support for Python 3.8 due to end of life (pr #1061) * Remove duplicate error logging during rebalance (pr #1025 by @y4n9squared) Bugfixes: * Quote username in SCRAM auth (pr #1043) Improved Documentation: * Fix building of readthedocs documentation (pr #1034) * Fix typo in producer documentation (pr #1036 by @lgo) | Low | 10/26/2024 |
| v0.11.0 | New features: * Implement DeleteRecords API ([KIP-204](https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API)) (pr #969 by @vmaurin) Bugfixes: * Fix serialization for batch (issue #886, pr #887 by @ydjin0602) * Fix type annotation for `AIOKafkaAdminClient.create_partitions` (pr #978 by @alm0ra) * Fix `NotControllerError` in `AIOKafkaAdminClient.create_topics` and other methods (issue #995) * Fix unintended ca | Low | 6/30/2024 |
| v0.10.0 | New features: * Support static membership protocol, [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) (issue #680, pr #941 by @patkivikram and @joshuaherrera) Bugfixes: * Fix extra dependencies (issue #952) | Low | 12/15/2023 |
| v0.10.0.a0 | New features: * Support static membership protocol, [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) (issue #680, pr #941 by @patkivikram and @joshuaherrera) | Low | 12/13/2023 |
| v0.9.0 | New features: * Include `kafka-python` into `aiokafka`'s code base (issue #928 and others) * Replace `python-snappy` and `zstandard` with `cramjam` (issue #930) * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) * Improve send performance (issue #943) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov) | Low | 12/4/2023 |
| 0.9.0.rc1 | New features: * Include `kafka-python` into `aiokafka`'s code base * Replace `python-snappy` and `zstandard` with `cramjam` * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) * Improve send performance (issue #943) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov) | Low | 11/29/2023 |
| v0.9.0.rc0 | New features: * Include `kafka-python` into `aiokafka`'s code base * Replace `python-snappy` and `zstandard` with `cramjam` * PEP518 compliant `pyproject.toml` * Python 3.12 support Bugfixes: * Fix type annotation for `ConsumerRecord` (pr #912 by @zschumacher) Improved Documentation: * Fix `AbstractTokenProvider.token` example (pr #919 by @mtomilov) | Low | 11/21/2023 |
| v0.8.1 | New features: * Drop support for Python 3.7 due to end of life (pr #893) Bugfixes: * Add SASL authentication support to `AIOKafkaAdminClient` (issue #889, pr #890 by @selevit) Improved Documentation: * Update `security_protocol` argument docstring (issue #883, pr #884 by @gabrielmbmb) * Remove incorrect `await` for `AIOKafkaConsumer.highwater()` (pr #858 by @yi-jiayu) | Low | 6/1/2023 |
| v0.8.0 | New features: * Add codec for ZStandard compression (KIP-110) (pr #801) * Add basic admin client functionality (pr #811 started by @gabriel-tincu) * Drop support for Python 3.6, add support and pre-built packages for Python 3.10 (pr #841) Bugfixes: * Fix `KeyError` on solitary abort marker (issue #781, pr #782 by @pikulmar) * Fix handling unsupported compression codec (issue #795) * Handled other SASL mechanism in logging (issue #852, pr #861 by @mangin) Improved Documenta | Low | 11/21/2022 |
| v0.7.2 | Bugfixes: * Fix `CancelledError` handling in sender (issue #710) * Fix exception for weakref use after object deletion (issue #755) * Fix consumer's `start()` method hanging after being idle for more than `max_poll_interval_ms` (issue #764) Improved Documentation: * Add `SASL_PLAINTEXT` and `SASL_SSL` to valid values of security protocol attribute (pr #768 by @pawelrubin) | Low | 9/2/2021 |
| v0.7.1 | Bugfixes: * Allow group coordinator to close when all brokers are unavailable (issue #659 and pr #660 by @dkilgore90) * Exclude `.so` from source distribution to fix usage of sdist tarball (issue #681 and pr #684 by ods) * Add `dataclasses` backport package to dependencies for Python 3.6 (pr #690 by @ods) * Fix initialization without running loop (issue #689 and pr #690 by @ods) * Fix consumer fetcher for python3.9 (pr #672 by @dutradda) * Make sure generation and member id are | Low | 6/4/2021 |
| v0.7.0 | New features: * Add support for Python 3.8 and 3.9. (issue #569, pr #669 and #676 by @ods) * Drop support for Python 3.5. (pr #667 by @ods) * Add OAUTHBEARER as a new sasl_mechanism. (issue #618 and pr #630 by @oulydna) Bugfixes: * Fix memory leak in kafka consumer when consumer is in idle state not consuming any message. (issue #628 and pr #629 by @iamsinghrajat) | Low | 10/28/2020 |
| v0.6.1.dev0 | Release v0.6.1.dev0 | Low | 6/19/2020 |
| v0.6.0 | New features: * Add async context manager support for both Producer and Consumer. (pr #613 and #494 by @nimish) * Upgrade to kafka-python version 2.0.0 and set it as non-strict parameter. (issue #590 by @yumendy and #558 by @originalgremlin) * Make loop argument optional (issue #544) * SCRAM-SHA-256 and SCRAM-SHA-512 support for SASL authentication (issue #571 and pr #588 by @SukiCZ) * Added headers param to AIOKafkaProducer.send_and_wait (pr #553 by @megabotan) * Add `consumer.last_p | Low | 5/15/2020 |
| v0.5.2 | Bugfixes: * Fix ConnectionError breaking metadata sync background task (issue #517 and #512) * Fix event_waiter reference before assignment (pr #504 by @romantolkachyov) * Bump version of kafka-python | Low | 5/15/2020 |
| v0.5.1 | New features: * Add SASL support with both SASL plain and SASL GGSAPI. Support also includes Broker v0.9.0, but you will need to explicitly pass ``api_version="0.9"``. (Big thanks to @cyrbil and @jsurloppe for working on this) * Added support for max_poll_interval_ms and rebalance_timeout_ms settings ( issue #67) * Added pause/resume API for AIOKafkaConsumer. (issue #304) * Added header support to both AIOKafkaConsumer and AIOKafkaProducer for brokers v0.11 and above. (issue #4 | Low | 3/10/2019 |
| v0.5.0 | New features: * Add full support for V2 format messages with a Cython extension. Those are used for Kafka >= 0.11.0.0 * Added support for transactional producing (issue #182) * Added support for indempotent producing with `enable_idempotence` parameter * Added support for `fetch_max_bytes` in AIOKafkaConsumer. This can help limit the amount of data transferred in a single roundtrip to the broker, which is essential for consumers with a large number of partitions Bugfixes: * | Low | 12/28/2018 |
| v0.4.3 | Fix issue #444 and #436 related to a memory leak in `asyncio.shield()` | Low | 11/1/2018 |
| v0.4.2 | The work here was concentrated on fixing bugs after the coordination refactors on v0.4.0. Hope it will serve you better now! Bugfix: * Added error propagation from coordinator to the main consumer. Before consumer just stopped with error logged. (issue #294) * Fix manual partition assignment, broken in 0.4.0 (issue #394) * Fixed RecursionError in MessageAccumulator.add_message (issue #409) * Update kafka-python to latest 1.4.3 and added support for Python3.7 * Dropped support for P | Low | 9/12/2018 |
| v0.4.1 | * Fix issue when offset commit error reports wrong partition in log (issue #353) * Add ResourceWarning when Producer, Consumer or Connections are not closed properly (issue #295) * Fix Subscription None in GroupCoordinator._do_group_rejoin (issue #306) | Low | 5/13/2018 |
| v0.4.0 | Major changes: * Full refactor of the internals of AIOKafkaConsumer. Needed to avoid several race conditions in code (PR #286, fixes #258, #264 and #261) * Rewrote Records parsing protocol to allow implementation of newer protocol versions later * Added C extension for Records parsing protocol, boosting the speed of produce/consume routines significantly * Added an experimental batch producer API for unique cases, where user want's to control batching himself (by @shargan) | Low | 2/1/2018 |
| v0.3.1 | * Added `AIOKafkaProducer.flush()` method. (PR #209 by @vineet-rh) * Fixed a bug with uvloop involving `float("inf")` for timeout. (PR #210 by dmitry-moroz) * Changed test runner to allow running tests on OSX. (PR #213 by @shargan) | Low | 2/1/2018 |
| v0.3.0 | Starting from 0.3.0 we discourage imports from `kafka` namespace. Starting from `0.4.0` aiokafka will stop supporting objects imported from `kafka` namespace. Please import from `aiokafka` namespaces instead. * Moved all public structures and errors to `aiokafka` namespace. You will no longer need to import from `kafka` namespace. * Changed ConsumerRebalanceListener to support either function or coroutine for `on_partitions_assigned` and `on_partitions_revoked` callbacks. (PR #190 b | Low | 8/17/2017 |
| v0.2.3 | * Fixed retry problem in Producer, when buffer is not reset to 0 offset. Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184) * Fixed how Producer handles retries on Leader node failure. It just did not work before... Thanks to @blugowski for the help in locating the problem. (issue #176, issue #173) * Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166) | Low | 7/23/2017 |
| v0.2.2 | * Force reconnect after KafkaTimeoutException. (PR #149 by @Artimi) * Fixed compacted topic handling. It could skip messages if some of them were compacted (issue #71) * Fixed old issue with new topics not visible in pattern subscription (issue #46) * Another fix for Consumer race condition on JoinGroup. This forces Leader to wait for new metadata before assigning partitions. (issue #118) * Changed metadata listener in Coordinator to avoid 2 rejoins in a rare condition (issue #108) * `ge | Low | 4/17/2017 |
| v0.2.1 | - Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas) - Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls will result in ConsumerStoppedError (PR #81) - Added `exclude_internal_topics` option for Consumer (PR #111) - Better support for pattern subscription when used with `group_id` (part of PR #111) - Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now noti | Low | 2/19/2017 |
| v0.2.0 | - Added SSL support. (PR #81 by Drizzt1991) - Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas) - Fixed `next_record` recursion (PR #94 by fabregas) - Fixed Heartbeat fail if no consumers (PR #92 by fabregas) - Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991) - Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991) - Fix kafka-python typos in docs (PR #69 by jeffwidman) - Topics and partitions are n | Low | 12/18/2016 |
| v0.1.4 | - Bumped python-kafka version to 1.3.1 and Kafka to 0.10.1.0. - Fixed auto version detection, to correctly handle 0.10.0.0 version - Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers. This allows a `timestamp` to be associated with messages. - Changed lz4 compression framing, as it was changed due to KIP-57 in new message format. - Minor refactorings Big thanks to @fabregas for the hard work on this release (PR #60) | Low | 11/7/2016 |
| v0.1.3 | - Fixed bug with infinite loop on heartbeats with autocommit=True. #44 - Bumped python-kafka to version 1.1.1 - Fixed docker test runner with multiple interfaces - Minor documentation fixes | Low | 10/18/2016 |
| v0.1.2 | - Added Python3.5 usage example to docs - Don't raise retryable exceptions in 3.5's async for iterator - Fix Cancellation issue with producer's `send_and_wait` method | Low | 4/30/2016 |
| v0.1.1 | Fix packaging issues. Removed unneeded files from package. | Low | 4/30/2016 |
| v0.1.0 | Added full support for Kafka 9.0. Older Kafka versions are not tested. | Low | 4/15/2016 |
