Let us introduce our new series of blog posts where you can discover how to build a system and a business intelligence platform that consists of streaming, real time analysis, predictive component, batch processing, and reporting. We will dive into our approach to a new challenge where sales data plays the leading part.
After winning Croatia osiguranje & Bird data challenge, we started looking for another challenge that we could learn from and have some fun. We targeted challenges with larger datasets from the real world and preferably from industries like supply chain, retail, insurance, and finance. Applicability and meaningfulness were important factors – the challenge has to have some purpose and the potential to turn into a product. After some research we’ve found the M5 data challenge that consists of two related competitions:
The competition is very popular, in this edition there were more than 7000 competitors.
In the series of blog posts, we will talk about our spin-off project based on this data competition through technology and business topics.
The original competition and spin-off
The task of the original competition is to estimate the unit sales for 3049 products in 10 Walmart stores based on the 5+ years of sales data. The test set contains 28 days, that is 10 x 3049 x 28 = 853720 predictions.
We have modified the task to get the most out of the dataset. The first thing that came into our minds was the streaming of sales events. Each sale can be represented by an event in Kafka. The dataset is static (a CSV file) so we’ll need some kind of generator process that will be used by a Kafka producer. Sales data is grouped by date, nevertheless, we can simulate the real world by applying appropriate distribution of sales to data and generating events w.r.t. that distribution.
Stream of sales events can be analyzed and visualized in real-time by Apache Spark or KStream applications and visualization tools. There is also room for real-time notifications and alerts, for instance, if the sales of an item are high we can recommend restocking.
The original dataset does not contain warehouse data, but you could generate it and implement warehouse management features. We have some experience in data generation in the insurance domain, so we know that data generation is tricky and requires a lot of time to do properly. That’s why this is on our nice-to-have list.
Streamed data is persisted in a data warehouse where additional batch analysis, reporting, and predictions (the original task!) can be done. Here we can use Apache Spark, Apache Airflow, Python ML libraries, and visualization tools.
You see how many additional tasks you can get from this dataset. It’s the exact thing that we were looking for!
The purpose of this section is not to analyze data in detail but to give you a quick overview of data so you can follow the rest of the post easily.
Data is organized into files:
- calendar.csv – contains information about the dates on which the products are sold;
- sales_train_evaluation.csv – contains the historical daily unit sales data per product and store for 1941 days. It contains the same data as sales_train_validation.csv with additional 28 days used for testing purposes;
- sell_prices.csv – contains information about the price of the products sold per store and date.
The figure shows the hierarchical organization of the dataset which is divided into levels: state – store – category – department – item.
The calendar contains a list of all 1941 dates and information about special events like sports and cultural events, national holidays, and religious holidays which can have an effect on sales. This is important because some holidays can change dates so forecasting algorithms can not rely just on date information. For instance, Christmas trees and decorations will sell a few weeks before the 25th of December each year, but chocolate Easter bunnies will have peak sales in different weeks or even months depending on the year.
Sales data contains daily sales of each item for all stores. The figure below shows the histogram of sales on a store level. The highest sales numbers happen in the CA_3 store, and the same store has the largest sales variance. We can see that stores in Texas and Wisconsin have lower sales variance and that those stores hit comparable sales numbers.
Data can be analyzed on a more detailed level, for example per category. The following figure shows the sales of food products. Each line represents a year, and you clearly see the seasonality in the data.
Sell prices data contains weekly average prices for each product and store. Prices for the same item vary depending on the store, which makes sense as some states have higher standards than others, and some items have higher shipping costs to certain areas. Here is the example for the first item in the dataset whose price also varies between states, but also within a state (look at the WI_3 store).
For some items we have detected outliers which we marked as bad data. For instance, there are products for which the price drops to $0.01. It is too cheap for any item, and even if an item of a $0.01 value exists there is another thing – for most of them the sales numbers also drop to zero so we conclude that represents an out of stock situation. However, there are cases when sales numbers do not drop to zero which confuses us a bit, that could be bad data caused by errors in the process.
In this section we’ll briefly explain components, data flow, and main tasks.
We divided work into three main components:
- Datasources preparation
- Streaming and stream processing
- Batch processing
Raw files are uploaded to AWS S3 for easier access from servers and local machines, after which it is loaded into a relational database that we named Source database. The raw files are not normalized so we needed to design tables in a normal form and do some transformations on the data. The picture below shows the ER diagram of the database. The database enables easier access to data, easier selecting and filtering data, and serves as a playground for some SQL analysis. This database is used as a source for the Streaming component explained below.
Second relational database is a data warehouse that serves as a stream sink. For some data (for instance calendar) there is no point in streaming so we decided to load it directly into a data warehouse. For the purpose of loading both databases, we have compared AWS Glue to Python and Pandas ETL jobs. Our conclusion is that for simpler ETL jobs and for developers that have data engineering knowledge it makes more sense to use Python. AWS Glue is more suitable for data scientists or similar roles who do not have much data engineering knowledge or experience as it offers a visual interface and premade components for loading and processing data. However, it requires some preparation (for instance setting a data catalog).Data generator process decouples data generation logic from the producer process. This way we can switch to another streaming technology without rewriting the data generation code (reading data, formatting data, generating data w.r.t. a distribution…). Data generator generates events from the starting point which is the first day by default. It generates one record per each individual item sold, meaning if a daily sales of item X was 10 items, it will generate 10 events. Data generator is implemented as a Python generator object with __iter__ and __next__ methods which saves memory as we do not need to load the whole dataset but read and generate records on demand. For instance, a join between sales data and price data requires calendar information. If you join sales and calendar data you’ll end up with almost 50 million records.
Streaming and stream processing
Kafka producer uses the generator and streams data into a sales topic that is partitioned into 10 partitions – one for each store. Messages are consumed by real-time analysis Apache Spark processes which save their results to the separate Kafka topic to be consumed by the real-time visualization process.
Some examples of real-time KPIs are
- the number of items sold,
- average/minimum/maximum/median value of items sold, and
- total item value of items sold
for the current day/week/month/year and comparison to previous periods. The KPIs are calculated for levels: item, department, store, state, and all stores.
With these KPIs, we can answer more complex questions like what is the correlation between the price and the sales numbers
- What is the optimal price range for certain products?
- Does the price have any influence on the sales of individual SKUs?
- Does price movement through time influence sales for that period?
- Do SNAP events have an effect on sales of items?
There are many other questions to answer (product sales trends based on the location, and market share analysis). We’ll cover those in a separate blog post.
A user gets those answers through a dashboard with KPIs and visualizations. Based on the simple metrics and SKU analysis we can send notifications to a separate topic and present them to users.
In the end, Kafka Connect helps us sink data into a data warehouse for permanent storage and further analysis.
Streaming data is stored to a Kafka broker that has a retention policy based on the size of the messages and time period. For a historical analysis, predictions, and reporting we decided to store data in a data warehouse. The plan is to implement batch analysis jobs for reporting purposes and forecasting future sales. Jobs will be orchestrated and scheduled with Apache Airflow.
All infrastructure is set on AWS. Our choice is a combination of EC2 instances and Docker containers. You can use AWS services like RDS, AWS EMR, and Kafka AWS MSK for convenience but those come with a cost. Estimated cost for an m5a.2xlarge EC2 instance, which is more than enough to host several services, is 40 $/month if used 4 hours a day 5 days a week. For the same use and same instance type the AWS RDS costs 100 $/month. AWS MSK with 3 brokers and 10 GB of storage costs roughly 700 $/month, and AWS EMR m5a.2xlarge instance costs 50 $/month but it only hosts Spark processes. As this is an internal project we can shut down infrastructure when not needed. We expect to have several m5a.2xlarge EC2 instances that will host Kafka cluster, Apache Spark cluster, source database, data warehouse, and other processes. You can estimate your costs with the official AWS calculator.
We have installed Kafka and Zookeeper through Bitnami Docker image. The image comes with a default configuration in the docker-compose.yml file but requires some editing and additional configuration to be specified in it. We will cover specifics of Kafka setup and some common issues in the following blog posts.
Apache Spark is also installed through Bitnami Docker image. From our experience, it requires less configuring than Kafka and is easier to set. However, there are a few things to watch out for when running Spark jobs that we will mention in later blog posts (for instance if using PySpark you have to run the job from the EC2 instance, as there is no client mode in PySpark, otherwise your cluster will try to connect to your local machine and fail as the IP of the local machine is local IP).
For the source database and data warehouse, we decided to use the PostgreSQL database that is installed through the official docker image.
In this post we presented our idea for the project and gave an overview of the main components, task, and dataset. In the following posts, we’ll focus on the business value of retail sales data analysis, give technical details and share useful tips about technologies like Kafka and Spark. We’ll also cover interesting aspects of data streaming and processing, and forecasting sales data.
Share your thoughts about the project with us in the comment section and stay tuned for more!
Read all our posts in the series: