Kafka is an incredibly powerful service that can help you process huge streams of data. It is written in Scala and has been undergoing lots of changes. Historically, the JVM clients have been better supported then those in the Python ecosystem. However, this doesn't need to be case! The Kafka binary protocol has largely solidified and many people in open source community are working to provide first class support for non-JVM languages.
In this post we will benchmark the three main Python Kafka clients. pykafka, python-kafka and the newest arrival confluent-kafka-client. This will also be useful as an introduction to the different clients in the python ecosystem and their high level APIs.
We are going to keep the setup simple and run a single broker that will auto-create topics with one partition. Please note this is a contrived setup. Any production deployment would be multi-broker and likely more partitions, but for simplicity sake we are going to use one.
Spin up a single Kafka 0.9 broker
We are going to produce and consume 1 million 100 bytes messages with each client.
We will require a producer acks
of 1. meaning that only the leader (we one have one broker anyways) needs to ack a message. Increasing this will ensure your data is not lost due to broker failure but will slow down production.
I ran these tests within Vagrant hosted on a MacBook Pro 2.2Ghz i7.
Like all benchmarks, take this with a grain of salt. A single broker on a local machine is hardly a production deployment. All settings are largely left to their defaults. Also, the amount of file caching broker does really help the client consumption speed. To help combat this we will rerun each consumption test to compensate for caching recently accessed data.
Even with all the normal stipulations, I hope you find this informative.
Installing clients can be complicated by the fact of some C extensions. We are big fans of conda and maintain python3 linux builds of some of these clients here with recipes here.
The easiest way is to use conda to install the clients:
conda create -n kafka-benchmark python=3 ipython jupyter pandas seaborn -y
source activate kafka-benchmark
conda install -c activisiongamescience confluent-kafka pykafka -y # will also get librdkafka
pip install kafka-python # pure python version is easy to install
If you would like to run this notebook please find the full repo here
Included in this repo is a docker-compose file that will spin up a single kafka 0.9 broker and zookeeper instance locally. We can shell out and start it with docker compose.
In [1]:
!docker-compose up -d
In [2]:
msg_count = 1000000
msg_size = 100
msg_payload = ('kafkatest' * 20).encode()[:msg_size]
print(msg_payload)
print(len(msg_payload))
In [3]:
bootstrap_servers = 'localhost:9092' # change if your brokers live else where
In [4]:
import time
producer_timings = {}
consumer_timings = {}
In [5]:
def calculate_thoughput(timing, n_messages=1000000, msg_size=100):
print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
print("{0:.2f} Msgs/s".format(n_messages / timing))
The first client we will examine is pykafka. It the client we have to most experience with and have used with success. If tries less hard to replicate the existing java client API. It also has a partition balancing code for kafka broker version 0.8.
In [6]:
from pykafka import KafkaClient
def pykafka_producer_performance(use_rdkafka=False):
# Setup client
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
producer = topic.get_producer(use_rdkafka=use_rdkafka)
msgs_produced = 0
produce_start = time.time()
for i in range(msg_count):
# Start producing
producer.produce(msg_payload)
producer.stop() # Will flush background queue
return time.time() - produce_start
In [7]:
producer_timings['pykafka_producer'] = pykafka_producer_performance()
calculate_thoughput(producer_timings['pykafka_producer'])
If you are monitoring this function, you will notice that the produce loop completes and then the function stalls at the produce.stop()
. This is because the producer is asynchronous and batches produce calls to Kafka. Kafka's speed comes from the ability to batch many message together. To take advantage of this, the client will keep a buffer of messages in the background and batch them. So, when you call producer.produce
you are performing no external I/O. That message is queued in an in-memory buffer and the method returns immediately. So we are able to load the in-memory buffer faster then pykafka can send them to kafka. producer.stop()
will block until all messages are sent.
So when producing messages make sure you allow the producer to flush the remaining messages before you exit.
Another way to ensure that the messages where produced is to check the topic offsets.
In [8]:
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
Pykafka has an optional producer backend that wraps the librdkafka package. librdkafka is a pure C kafka client and holds very impressive benchmarks. Let rerun our pykafka producer test with rdkafka enabled.
In [9]:
producer_timings['pykafka_producer_rdkafka'] = pykafka_producer_performance(use_rdkafka=True)
calculate_thoughput(producer_timings['pykafka_producer_rdkafka'])
In [10]:
def pykafka_consumer_performance(use_rdkafka=False):
# Setup client
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'pykafka-test-topic']
msg_consumed_count = 0
consumer_start = time.time()
# Consumer starts polling messages in background thread, need to start timer here
consumer = topic.get_simple_consumer(use_rdkafka=use_rdkafka)
while True:
msg = consumer.consume()
if msg:
msg_consumed_count += 1
if msg_consumed_count >= msg_count:
break
consumer_timing = time.time() - consumer_start
consumer.stop()
return consumer_timing
In [11]:
_ = pykafka_consumer_performance(use_rdkafka=False)
consumer_timings['pykafka_consumer'] = pykafka_consumer_performance(use_rdkafka=False)
calculate_thoughput(consumer_timings['pykafka_consumer'])
In [12]:
# run it once thorough to warm the cache
_ = pykafka_consumer_performance(use_rdkafka=True)
consumer_timings['pykafka_consumer_rdkafka'] = pykafka_consumer_performance(use_rdkafka=True)
calculate_thoughput(consumer_timings['pykafka_consumer_rdkafka'])
kafka-python aims to replicate the java client api exactly. This is a key difference with pykafka, which trys to maintains "pythonic" api. In earlier versions of kafka, partition balancing was left to the client. Pykafka was the only python client to implement this feature. However, with kafka 0.9 the broker provides this, so the lack of support within kafka-python is less important.
In [13]:
from kafka import KafkaProducer
def python_kafka_producer_performance():
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
producer_start = time.time()
topic = 'python-kafka-topic'
for i in range(msg_count):
producer.send(topic, msg_payload)
producer.flush() # clear all local buffers and produce pending messages
return time.time() - producer_start
In [14]:
producer_timings['python_kafka_producer'] = python_kafka_producer_performance()
calculate_thoughput(producer_timings['python_kafka_producer'])
In [15]:
from kafka import KafkaConsumer
def python_kafka_consumer_performance():
topic = 'python-kafka-topic'
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
auto_offset_reset = 'earliest', # start at earliest topic
group_id = None # do no offest commit
)
msg_consumed_count = 0
consumer_start = time.time()
consumer.subscribe([topic])
for msg in consumer:
msg_consumed_count += 1
if msg_consumed_count >= msg_count:
break
consumer_timing = time.time() - consumer_start
consumer.close()
return consumer_timing
In [16]:
_ = python_kafka_consumer_performance()
consumer_timings['python_kafka_consumer'] = python_kafka_consumer_performance()
calculate_thoughput(consumer_timings['python_kafka_consumer'])
With the latest release of the Confluent platform, there is a new python client on the scene. confluent-kafka-python is a python wrapper around librdkafka and is largely built by the same author. The underlying library is basis for most non-JVM clients out there. We have already mentioned it earlier when looking at pykafka.
In [22]:
import confluent_kafka
topic = 'confluent-kafka-topic'
def confluent_kafka_producer_performance():
topic = 'confluent-kafka-topic'
conf = {'bootstrap.servers': bootstrap_servers}
producer = confluent_kafka.Producer(**conf)
messages_to_retry = 0
producer_start = time.time()
for i in range(msg_count):
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
messages_to_retry += 1
# hacky retry messages that over filled the local buffer
for i in range(messages_to_retry):
producer.poll(0)
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
producer.poll(0)
producer.produce(topic, value=msg_payload)
producer.flush()
return time.time() - producer_start
In [23]:
producer_timings['confluent_kafka_producer'] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings['confluent_kafka_producer'])
In [24]:
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[b'confluent-kafka-topic']
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
In [25]:
import confluent_kafka
import uuid
def confluent_kafka_consumer_performance():
topic = 'confluent-kafka-topic'
msg_consumed_count = 0
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = confluent_kafka.Consumer(**conf)
consumer_start = time.time()
# This is the same as pykafka, subscribing to a topic will start a background thread
consumer.subscribe([topic])
while True:
msg = consumer.poll(1)
if msg:
msg_consumed_count += 1
if msg_consumed_count >= msg_count:
break
consumer_timing = time.time() - consumer_start
consumer.close()
return consumer_timing
In [26]:
_ = confluent_kafka_consumer_performance() # Warm cache
consumer_timings['confluent_kafka_consumer'] = confluent_kafka_consumer_performance()
calculate_thoughput(consumer_timings['confluent_kafka_consumer'])
The confluent_kafka client is crushingly fast. It can consume over 250K messages a second from a single broker. Note that the raw C client has been benchmarked at over 3 million messages/sec, so you see how much overhead python adds. But on the side of developer speed, you don't have to code in C!
In [27]:
import pandas as pd
import seaborn as sns
%pylab inline
In [28]:
consumer_df = pd.DataFrame.from_dict(consumer_timings, orient='index').rename(columns={0: 'time_in_seconds'})
producer_df = pd.DataFrame.from_dict(producer_timings, orient='index').rename(columns={0: 'time_in_seconds'})
In [29]:
consumer_df['MBs/s'] = (len(msg_payload) * msg_count) / consumer_df.time_in_seconds / (1024*1024)
producer_df['MBs/s'] = (len(msg_payload) * msg_count) / producer_df.time_in_seconds / (1024*1024)
consumer_df['Msgs/s'] = msg_count / consumer_df.time_in_seconds
producer_df['Msgs/s'] = msg_count / producer_df.time_in_seconds
In [30]:
producer_df.sort_index(inplace=True)
producer_df
Out[30]:
In [31]:
consumer_df.sort_index(inplace=True)
consumer_df
Out[31]:
In [32]:
producer_df.plot(kind='bar', subplots=True, figsize=(10, 10), title="Producer Comparison")
Out[32]:
In [33]:
consumer_df.plot(kind='bar', subplots=True, figsize=(10, 10), title="Consumer Comparison")
Out[33]:
This is not a complete overview of these clients. There are many trade-offs to consider before you jump and pick one. First, is the confluent and rdkafka version of pykafka are C backed and will not work with pypy. Also, packaging and distributing python packages with C extension can be a pain. We are big fans of conda to help alleviate this pain. We have package both pykafka, confluent-kafka-client and librdkafka on our public conda channel.
Another limitation of this benchmark approach that negatively affects the performance of pykafka and confluent_kafka is the lack of other work other then blindly ripping through messages. Pykafka uses a background thread to consume messages before you ever call consume()
. The vagaries of python threading and limitations of the GIL are beyond the scope of this post but keep in mind that pykafka's background will continue to poll brokers while you do work in your program. This effect is more pronounced for any work that is I/O bound.
The confluent_kafka client was released on May 25th, so while the underlying librdkafka is hardened as widely used, the python client is very fresh. Also, some of the metadata APIs are not exposed to the client. Specifically, we have found issues with the Offset API and Offset Commit/Fetch API, so getting the starting and ending offset for a partition and the current lag of a consumer group is not straight forward. Confluent seems committed to the client, so I'm sure a sane client metadata api will be added.
Not to devolve this post into the ravings of a petulant language zealot, but why should the JVM get all the goodies first! It is really exciting to see the python ecosystem around Kafka develop. Having first class Python support for Kafka is huge for our work here at Activision and for anyone who is interested in stream processing and streaming analytics.
In [31]:
!docker-compose down # Cleanup