Welcome to Asynckafka’s documentation!

Asynckafka

Fast python kafka client for asyncio. Asynckafka is written in Cython on top of Rdkafka as kafka driver.

Right now it is work in progress, so use it at our own risk. Before the 1.0.0 release i don’t warranty stability in the api between the minor version numbers.

Documentation url: WIP

Performance

This project was born from the need to have a high performance kafka library for asyncio.

Benchmark

Simple benchmark with one kafka broker, one partition, 200 bytes per message and 10 millions of messages:

Preparing benchmark.
Filling topic  benchmark_ad5682b7-9469-4f35-ad72-933c5e9879e1 with
10000000 messages of 200 bytes each one.
The time used to produce the messages is 21.905211210250854 seconds.
Throughput: 91.30247505050632 mb/s

Starting to consume the messages.
The time used to consume the messages is 20.685954093933105 seconds.
Throughput: 96.68396202167787 mb/s

Requirements

  1. Python 3.6 or greater
  2. Rdkafka 0.11.X

Install rdkafka from source

You need the Rdkafka headers to be able to compile asynckafka, download rdkafka from here, then unpack the package and run:

./configure
make
sudo make install

Install asynckafka package

The package is in pypi, you can install it with pip:

$ pip install asynckafka

Examples

Simple consumer

import asyncio
import logging
import sys

from asynckafka import Consumer

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


async def consume_messages(consumer):
    async for message in consumer:
        print(f"Received message: {message.payload}")

consumer = Consumer(
    brokers='localhost:9092',
    topics=['my_topic'],
    group_id='my_group_id',
)
consumer.start()

asyncio.ensure_future(consume_messages(consumer))

loop = asyncio.get_event_loop()
try:
    loop.run_forever()
finally:
    consumer.stop()
    loop.stop()

Simple producer

import asyncio
import logging
import sys

from asynckafka import Producer

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


async def send_messages(producer):
    while True:
        await producer.produce("my_topic", b"my_message")
        print('sent message')
        await asyncio.sleep(1)

producer = Producer(brokers="localhost:9092")
producer.start()

asyncio.ensure_future(send_messages(producer))

loop = asyncio.get_event_loop()

try:
    loop.run_forever()
finally:
    producer.stop()
    loop.stop()

RdKafka configuration

This library is built on top of rdkafka, and as wrapper it uses the same configuration that it. The rdkafka configuration is very extensive and can be found in the following link:

https://github.com/edenhill/librdkafka/blob/v0.11.3/CONFIGURATION.md

The asynckafka producer and consumer accepts two configuration dicts. The dicts be strings in the key and value.

Consumer configuration, example:

Consumer(
    brokers='127.0.0.1:9092',
    topics=['my_topic'],
    rdk_consumer_config={
        'api.version.request': 'true'
        'enable.auto.commit': 'false'
    },
    rdk_topic_config={
        'auto.offset.reset':  'smallest'
    }
)

Producer configuration, example:

producer = Producer(
    brokers='127.0.0.1:9092',
    rdk_producer_config={
        'batch.num.messages': '100000',
        'message.send.max.retries': 4,
    },
    rdk_topic_config={
        'message.timeout.ms': '10000'
    },
)

The configuration rdk_consumer_config correspond with those that can be found at ‘Global configuration properties’, section of rdkafka configuration documentation (less the exclusive configuration from the producer, indicated with a ‘P’ to the left of the name). The rdk_producer_config is the opposite of the consumer, so

you should exclude the ones indicated with ‘C’.

At the end of configuration you can find the ‘Topic configuration properties’, there is located the configuration that can be passed to the argument rdk_topic_config of the consumer of producer.

Logging

Asynckafka uses the standard python logging library, with “asynckafka” as logger.

To enable all the logging to stdout it is enough with:

import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

There are some python logger debug lines disabled by default in the consumer and producer. The reason is avoid python calls and python strings composition for the python logger in the critical path of cython. Anyway, you can enable them with:

import asynckafka

asynckafka.set_debug(True)

Error callback

The error callback can be passed to the consumer of producer, it should be a coroutine function and accepts one parameter. This parameter is a KafkaError. This error_callback is thread safe and it is executed in the loop used by the consumer or producer.

Example:

async def error_callback(kafka_error):
    print(kafka_error)

# Should be a wrong port
self.producer = Producer(
    brokers="127.0.0.1:6000",
    error_callback=error_callback
)

Develop

How to run the tests

WIP

API

Consumer

Producer

Message

class asynckafka.Messsage
payload

Kafka Error

Indices and tables