If you’ve read the previous blog post about this project, you should already know what was the main idea of the project. We’re trying to estimate the unit sales of more than 3000 distinct items in 10 different Walmart stores. The dataset provides us with more than 5 years of data about the dates on which the product was sold, the price of each product sold, and historical daily unit sales. Our idea is to simulate a real-life scenario by streaming the sales data. That’s where Kafka comes in.
What is Apache Kafka?
Apache Kafka is a very popular open-source platform used for event storing and streaming.
It is basically a system with two main roles:
- producers who create and publish new messages
- consumers who subscribe to and read new messages
Additionally, storing and processing data streams are tasks that can be and are handled by Kafka, as well.
In Kafka, a server is called a broker. It receives and stores messages from producers to the disk and fetches and sends messages from the disk to consumers. Multiple brokers can constitute a group of brokers, also known as a cluster.
Kafka messages are arranged into topics and each topic is furthermore broken down into partitions. To illustrate, a topic can be described as a bookcase, a partition is then represented by each individual shelf inside the bookcase, and books, of course, imitate messages.
Infrastructure
We are running all components of the project on virtual servers inside Amazon Elastic Compute Cloud, one of Amazon Web Services. These virtual servers are called EC2 instances. There is a large number of instance types one can choose while combining various CPU, memory, storage, and networking capacity options. We picked an m5a.2xlarge instance running on Amazon Linux 2 Kernel 5.10 AMI. It provides us with 32 GiB of Memory and 8 vCPUs.
Docker is a very popular software platform designed to facilitate and accelerate building, testing, and deploying applications. It allows one to create an environment called container which makes it easier to run the code remotely or on multiple machines. All services we use for data streaming are packed in their own Docker containers.
Furthermore, in order to use Kafka, another Apache service needed to be installed. Its name is Apache ZooKeeper and it is an open-source server used to coordinate distributed cloud applications. It basically offers a hierarchical key-value store that can be used in different ways and for different reasons. Given that we are generating a stream from a static dataset, we use ZooKeeper to store the current offset so the producer could know where to start again in case it crashes.
Finally, we need to use a schema registry in order to enable the serialization of messages. Apache’s data serialization service is named Apache Avro. Every serialization schema that is defined and used in the producer is automatically added to the schema registry. One of the best things about Avro is that it can recognize which schema from the registry was used to generate a certain message on its own, without the need to specify the schema in the consumer.
Kafka setup
First, we need to launch an EC2 instance. Details about the instance we had chosen have already been discussed in the Infrastructure section.
The second step is to download and install Docker which can be done in two ways:
- Docker Desktop (Docker with UI client) – official instructions
- Docker without UI client – instructions vary depending on your OS
After that, we can install Kafka via Bitnami Kafka Docker image. There is a docker-compose.yml file that contains ZooKeeper dependency. However, that file sets up Kafka with a single broker and without the ability to connect external producers and consumers to the broker. External producers and consumers are those outside of a docker container and outside the EC2 instance. In other words, if the machine on which a producer or a consumer is being run has a different IP address from the EC2 instance, they will not be able to connect to the broker. You would probably want to have producers and consumers hosted on the EC2 instances inside the same VPC like Kafka, but when you are developing and testing it is convenient to start the producer or consumer from your local machine.
That means we have to make some changes inside the docker-compose.yml file in order to make Kafka work the way we want. The most important ones are adding a service entry for each broker and editing the KAFKA_CFG_ADVERTISED_LISTENERS variable. This is crucial because of the way connections are handled inside Kafka. When a client starts, it connects to a broker and requests information about the partition leader broker. That happens because a client needs to connect to the partition leader broker in order to write or read messages. By default, the address that the client gets is a localhost address. Because of that, the client that is external might not be able to reach the partition leader broker. That’s why it’s important to add information about the public IP address and a reachable port.
Also, we need to configure the schema-registry in docker-compose.yml file which is shown below.
After taking care of everything aforementioned, this is what the final version of our docker-compose.yml file looks like:
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: always
kafka1:
image: docker.io/bitnami/kafka:3.2
ports:
- "9093:9093"
volumes:
- "kafka1_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://<host public ip>:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
restart: always
kafka2:
image: docker.io/bitnami/kafka:3.2
ports:
- "9094:9094"
volumes:
- "kafka2_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:9092,EXTERNAL://<host public ip>:9094
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
restart: always
kafka3:
image: docker.io/bitnami/kafka:3.2
ports:
- "9095:9095"
volumes:
- "kafka3_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka3:9092,EXTERNAL://<host public ip>:9095
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
restart: always
schema-registry:
image: confluentinc/cp-schema-registry:7.1.2
hostname: schema-registry
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
restart: always
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
volumes:
zookeeper_data:
driver: local
kafka1_data:
driver: local
kafka2_data:
driver: local
kafka3_data:
driver: local
Implementation details
Python library
All of our code is written in Python. At first, we had planned to use the kafka-python library, but later we switched to confluent_kafka. The reason for this change was the fact that the kafka-python library doesn’t support asynchronous message sending which means we wouldn’t be able to define callback functions.
Streaming
Given that we are trying to stream static data, we had to somehow simulate a data source. We did that by creating a data generator that sequentially creates messages that are eventually loaded and processed by the producer. The interesting thing is that in the original dataset one entry represents sales data for one item across all available days (as sales data is stored in columns), while our generator creates a message for only one day.
Then we executed one data generator process for each store. This makes our simulated streaming very similar to what would happen in a possible real-life scenario where we could expect data coming from multiple stores.
Partitioning
Furthermore, considering that the dataset consists of sales data for 10 different stores, we have created a single topic named sales that is divided into 10 partitions, one for each store. Our idea was to come as close as possible to the real world and simulate the situation that could happen inside an environment comparable to the one we created.
As all stores have comparable sales numbers, thus the number of events will also be comparable for each partition. This is important because of partition balancing and leader broker load. If that were not the case we could categorize stores by size (area, items in the warehouse, average buyers count), for instance: small-store, mid-store, large-store, or even create a separate topic with multiple partitions for each store if needed. This depends on the performance of the consumer process and data volume.
Serialization schema
This is the Apache Avro schema used to serialize messages that our producers produce and consumers consume:
{
"namespace": "m5.forecasting.avro",
"type": "record",
"name": "SalesRecord",
"fields": [
{
"name": "item_id",
"type": "string",
"doc": "ID of the sold item."
},
{
"name": "dept_id",
"type": "string",
"doc": "Department of the sold item."
},
{
"name": "cat_id",
"type": "string",
"doc": "Category of the sold item."
},
{
"name": "store_id",
"type": "string",
"doc": "Store where the item was sold."
},
{
"name": "state_id",
"type": "string",
"doc": "State in which the item was sold."
},
{
"name": "day",
"type": "int",
"doc": "Day no."
},
{
"name": "sold",
"type": "int",
"doc": "Quantity of items sold."
},
{
"name": "sell_price",
"type": ["null", "float"],
"doc": "Item price."
}
]
}
Summary
In this blog post, we briefly explained what Apache Kafka is and gave a short overview of the infrastructure used in this project. Also, some guidelines on how to set up the Apache Kafka project were discussed. Finally, we stated some implementation details specific to this project.
We’ll delve even deeper into the project in upcoming posts, so stay tuned for the next part in this series!
Read all our posts in this series:
Intro: How to turn SKU analysis to your advantage
Part 1: Approach for building a business intelligence platform for retail companies