Working with Kafka data

During a simulation, the producer and the marketplace are constantly logging sales and the activity on the market to Kafka. These information are organised in topics. In order to estimate customer demand and predict good prices, merchants can use the Kafka API to access this data.

The merchants gets the data in form of a pandas DataFrame.

If you want to try the following examples, make sure that the Pricewars plattform is running. Either by deploying them individually or by using the docker setup.

The following step is specific for this notebook. It is not necessary if your merchant is in the repository root.


In [1]:
import sys
sys.path.append('../')

Initialize Kafka API

You need a merchant token to use the Kafka API. To get one, register the merchant at the marketplace.


In [6]:
from api import Marketplace
marketplace = Marketplace()
registration = marketplace.register(
    'http://nobody:55000/',
    merchant_name='kafka_notebook_merchant',
    algorithm_name='human')

registration


Out[6]:
{'algorithm_name': 'human', 'api_endpoint_url': 'http://nobody:55000/', 'merchant_name': 'kafka_notebook_merchant', 'merchant_token': 'ddObEKSew8sIa9aZvJrdSs78K5zVIuCvYaXlu0Rs8KBjUlJQ5B3W4AeOGetYZ6U7', 'merchant_id': 'YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew='}

It was not possible to connect to the marketplace if you got the following error:

ConnectionError: HTTPConnectionPool(host='marketplace', port=8080)

In that case, make sure that the marketplace is running and host and port are correct. If host or port are wrong, you can change it by creating a marketplace object with the host argument:

marketplace = Marketplace(host='www.another_host.com:1234')

Same is true for the upcoming Kafka API

Next, initialize the Kafka API:


In [9]:
from api import Kafka
kafka = Kafka(token=registration.merchant_token)

Request topic

You can request data for specific topics. The most important topics are buyOffer which contains your own sales and marketSituation which contains a history of market situations. The call will return the data in form of a pandas DataFrame. Depending on how active the simulation is and how much data is logged, this can take some time.


In [15]:
sales_data = kafka.download_topic_data('buyOffer')
sales_data.head()


Out[15]:
amount consumer_id http_code left_in_stock merchant_id offer_id price product_id quality timestamp uid
0 1 M+vLAIwHTf3zXqLi1rd/S9AjcOJFGHS2LwQN6p6V7nI= 200 29 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.0 1 1 2018-03-16 10:01:01.673 11
1 1 M+vLAIwHTf3zXqLi1rd/S9AjcOJFGHS2LwQN6p6V7nI= 200 28 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.0 1 1 2018-03-16 10:01:02.236 11
2 1 M+vLAIwHTf3zXqLi1rd/S9AjcOJFGHS2LwQN6p6V7nI= 200 27 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.0 1 1 2018-03-16 10:01:17.864 11
3 1 M+vLAIwHTf3zXqLi1rd/S9AjcOJFGHS2LwQN6p6V7nI= 200 26 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.0 1 1 2018-03-16 10:01:18.771 11
4 1 M+vLAIwHTf3zXqLi1rd/S9AjcOJFGHS2LwQN6p6V7nI= 200 25 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.0 1 1 2018-03-16 10:01:22.335 11

This method may return None if it was not possible to obtain the data. For example, this happens if the merchant doesn't have any sales.


In [16]:
len(sales_data)


Out[16]:
30

In [18]:
market_situations = kafka.download_topic_data('marketSituation')
print(len(market_situations))
market_situations.head()


5
Out[18]:
amount merchant_id offer_id price prime product_id quality shipping_time_prime shipping_time_standard timestamp triggering_merchant_id uid
0 1953 79Qj3UKaNep4GpXXtKLFt8Y1hEMTH1KQd+p+wFwvt/I= 2 22.75 False 1 1 1 5 2018-03-16 10:00:37.917 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 11
1 20 D0lQAcvhYUhRvqDOPoP068oubQxr8Seo0/nMEcp0Ye4= 5 22.45 False 1 1 1 5 2018-03-16 10:00:37.917 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 11
2 30 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 6 35.00 False 1 1 2 5 2018-03-16 10:00:37.917 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 11
3 1934 9vLjL+h81Nql8ZLBBxnm70SDZZE98IAGAuMaj1JRmC8= 1 22.65 False 1 1 1 5 2018-03-16 10:00:37.917 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 11
4 20 8Ezxj8Q/GvFcwa0CT3zoNyr5Hg3ZuNIs+E/LbVn9R3U= 3 22.55 False 1 1 1 5 2018-03-16 10:00:37.917 YWKtz6tuqjA6hfYSSZ2Uf+9jmi0PrzlWNKQfoRv1oew= 11