After setting up Apache Kafka, the next step in our Retail Business Intelligence Platform project was to set up Spark, another widely used software solution from the Apache workshop. Apache Spark, the real spark necessary to ignite our project and turn it into a true stream processing arrangement, is probably the most famous streaming engine available in the modern IT world. Spark was initially developed at the University of California, Berkeley in 2009 as a research project and, as years went by, it has grown into an open-source project that is developed and maintained by a large and collaborative community.
Just like all other components of this project, Spark has also been configured to run inside Amazon Elastic Compute Cloud. We have created two EC2 instances, both of m5a.2xlarge type: one that serves as a master node and one that serves as a worker node. It’s important to mention that the default amount of storage on the worker node wasn’t enough, we had to increase it to 40 GiB. Of course, the optimal amount of storage is different for each individual project and it’s often very hard to estimate it beforehand. We recommend starting with the default amount of storage and, if necessary, upgrading it later.
Before configuring Spark to run on EC2 instances, we had configured it for testing purposes by installing it locally, which was pretty simple and straightforward. It is enough to download and extract Spark to a folder, install Java and, given that we are writing our code in Python, install PySpark via pip package installer. After that, all that is left to do is to run the job using the following command:
spark-submit --packages <NECESSARY_PACKAGE_NAMES_SEPARATED_BY_COLON> <PATH_TO_THE_PROGRAM>
Back to the real stuff, let’s talk a little more about our in-cloud setup. First of all, when working with EC2 instances, one should be aware of the differences and possibilities each of the different AMIs comes with. For example, we explicitly needed to work with Python 3.9, but Amazon Linux 2 AMI, the default AMI for EC2 instances, at the moment of writing this blog post comes with Python 3.7 preinstalled. One would think: ‘Piece of cake, I just need to upgrade the Python version from 3.7 to 3.9, that’s a few minutes of work.’ Well, unfortunately, because of a lot of factors that aren’t related to the main theme of this blog post which is why they won’t be discussed any further, it turns out it isn’t that simple if wanted to be done properly. Although it is possible, we strongly recommend you, if in need of Python 3.9, simply choose another AMI. For instance, Red Hat AMI doesn’t come with Python pre-installed at all, which is why it’s simple to manually install whatever version is necessary (sudo yum install python39 in this case).
Just as we installed Spark, Java, and PySpark on our local machine, we had to do it all again for Spark EC2 instances. Furthermore, it was necessary to generate an SSH key pair in order to use launch scripts in Spark’s sbin directory. The public key had to be copied to each worker node in the cluster. Finally, in order to use Spark’s launch scripts, we needed to edit the conf/slaves file on master node and fill in the workers’ hostnames.
After everything mentioned is taken care of, the cluster should be started by running start-all.sh script in the sbin directory. However, as PySpark does not offer cluster mode, but only client mode, the job has to be executed in the client mode from one of the machines inside the cluster. We confirmed this with a little experiment. What we did was execute the job from our local PC and, as expected, the driver couldn’t communicate with the cluster. The thing is that the driver process runs on the machine that executed the spark-submit command.
Generally, after the driver is started, it tries to communicate with nodes inside the cluster. Firstly with the master node in order to get an allocation of workers, and secondly with those workers. It is crucial that both master and workers can reach the driver, and vice versa.
In this case, they couldn’t communicate with the driver that was being run inside our local network. The IP address shown in Figure 1 confirms that the driver was indeed inside our local network.
Figure 1: Screenshot of a Spark Application UI
To conclude, the job had to be run from either master or worker nodes in the client mode in order for everything to work smoothly. We had chosen to run it from the master node and that is why we installed Git on our master node. This allows us to keep the code up to date efficiently and effortlessly.
At last, we’ve arrived at the final piece of the puzzle, a command to get the job going:
spark-submit --master spark://<HOST:PORT> --packages <NECESSARY_PACKAGE_NAMES_SEPARATED_BY_COLON> --py-files <PATH_TO_THE_PYTHON_LIBRARIES> <PATH_TO_THE_PROGRAM>
Port for the cluster master is 7077 by default. Also, --py-files argument is here to allow us to distribute required Python libraries and dependencies.
In order to prevent the unwanted termination of your job in case SSH connection suddenly closes, add & at the end of the command.
Our use case
If you don’t follow the thread that our project is, here’s a little recap of what had been done until Spark came into play. We handled preliminary data analysis, datasource preparation, and architecture planning. Also, we set up Kafka in order to store our messages into different topics and partitions. Now the time has come to take the raw data stored inside Kafka and turn it into some useful insights. The sales data we’re dealing with could be transformed into various key performance indicators, famously abbreviated as KPIs, that could help us gain a much better understanding of what is really going on in the world of retail. We are storing them in Kafka because that will allow us to easily implement real-time notifications later in the project. Apart from implementing notifications somewhere down the road, our main goal is to turn these KPIs into interesting and attractive visualizations.
The KPIs we are calculating vary from tremendously simple to slightly complicated. For example, the simple one would be revenue, calculated as the accumulated product of quantity and price per item for each sale. On the other hand, an example of a complex one would be price elasticity of demand which is a ratio between the percentage change in the quantity demanded and the percentage change in price. All of them are listed in the table below.
|Key Performance Indicator (KPI)||Description|
|Revenue||The total earnings produced by a certain item, department, store, or state.|
|Market share||Percentage of the total revenue that each SKU makes up.|
|Price elasticity of demand||Measure of sensitivity between the quantity demanded and the price for each SKU.|
|Sales growth||How revenue changes over a fixed period of time.|
|Price premium||How much more are customers willing to pay for a certain product than for similar products.|
Table 1: Commonly used KPIs in SKU analysis
Unfortunately, we couldn’t make it all happen with Spark as it has some limitations regarding the combination of multiple streaming input sources and different levels of aggregation. To calculate some of the KPIs we wanted, it was necessary to group the data a number of times, once for each level of aggregation we defined, which made some KPIs impossible to calculate using only Spark. The levels and periods of aggregation that we used are defined in tables 2 and 3.
|Level of aggregation||Key example|
Table 2: Levels of aggregation
|Aggregation period||Key example|
Table 3: Aggregation periods
Dealing with multiple queries
As already mentioned, we combined different levels and periods of aggregation which resulted in us having a large number of streaming queries. More specifically, we used 4 levels of aggregation and 6 periods of aggregation. In addition to that, we also included one additional level and one additional period of aggregation, the ones with no grouping at all. That brings us to the final number of 5 * 7 = 35 different streaming queries. Although that’s not a problem on its own, there are some things you should be on the lookout for.
For example, if multiple queries have the same source, many people would assume it’s perfectly safe to write the part of the queries where reading is handled only once and share it across all queries. Well, the thing is that it is and it isn’t safe at the same time. It is safe because Spark will, unbeknownst to us, actually read the data once for each query when it gets started. On the other hand, the problem may arise if the source can generate the data only once or if the data generation is random or nondeterministic. Firstly, if the data can be generated once at most, only the query that was started first will get some input. Secondly, if the data generation is random or nondeterministic, each query might end up with different inputs.
Furthermore, if you are following good practice for programmers that says reuse the code as much as you can, you would probably extract the redundant code, such as the already mentioned readStream part of the query, into reusable functions. So, as long as you’re extracting the readStream part of the query, it should make sense to extract the writeStream part of the query, too. However, you need to be aware that this code snippet will be executed by multiple different queries. If you’re writing the data back to Kafka, just as we did, you need to make sure the output of each query goes to the right topic. Also, each of the queries must have its own individual checkpoint directory. Remember, one query equals one checkpoint directory.
Simple Kafka-Spark-Kafka example
Let’s check out how Spark can help you by looking at a simple example of a Spark data stream that both reads from and writes to Kafka.
First, a simple code snippet that demonstrates how the data should be read from Kafka into Spark.
input = spark \ .readStream \ .format('kafka') \ .option( 'kafka.bootstrap.servers', ', '.join(constants.KafkaConstants.BOOTSTRAP_SERVERS) ) \ .option( 'subscribe', constants.KafkaConstants.TOPIC ) \ .option( 'group_id', 'SparkStreamingExampleConsumerGroup' ) \ .option( 'startingOffsets', 'earliest' ) \ .load()
After loading the data, before the actual job is done, we have to manually deserialize the data from Avro because it, unfortunately, can’t be done automatically in Python.
stream_df = input \ .withColumn( 'value', avro_functions.from_avro( F.substring( F.col('value'), pos=6, len=2147483647 ), schema ) )
Finally, the part of the code where magic happens. Here we calculated a few basic values: revenue, number of total items sold, average price of an item and both the minimum and maximum price. Given that results have to be stored back to Kafka, it was necessary to convert them to a key-value pair. Once again, a reminder that, in case there are multiple queries running the same code, each query needs to have its own checkpoint directory.
stream_df \ .groupBy(F.col('item_id')) \ .agg( F.sum( F.col('sold') * F.col('sell_price') ).alias('revenue'), F.sum( F.col('sold') ).alias('total_items_sold'), F.avg( F.col('sell_price') / F.col('sold') ).alias('average_price'), F.min( F.col('sell_price') ).alias('lowest_price'), F.max( F.col('sell_price') ).alias('highest_price') ) \ .select( F.struct('item_id').alias('key'), F.struct( 'sales/revenue', 'total_items_sold', 'average_price', 'lowest_price', 'highest_price' ).alias('value') ) \ .selectExpr( 'CAST(key as STRING)', 'CAST(value as STRING)' ) \ .writeStream \ .format('kafka') \ .option( 'kafka.bootstrap.servers', ', '.join(constants.KafkaConstants.BOOTSTRAP_SERVERS) ) \ .option( 'topic', constants.KafkaConstants.TOPIC ) \ .outputMode('update') \ .option( 'checkpointLocation', '/checkpoint_directory_' + unique_id ) \ .start() \ .awaitTermination(
A few words about KStreams
While we were working on the problem of joining a dynamic stream with static data, we came across KStreams, a Kafka’s library designed and developed to work with streaming data. It’s a Java client library and, unfortunately, it can’t be used with Python.
KStreams can process data from Kafka via topics or streams and write the results back to Kafka topics. While Spark works with data in micro batches, KStreams deal with events in real-time, just as they arrive. Also, KStreams provides much lower latency in comparison to Spark. If you’re in need of a specific low latency event-at-a-time stream processing engine and Python isn’t obligatory to use, then it would make sense to use KStreams instead of Spark. In any other case, Spark will do and it will probably be a better solution because of its active and immense community.
Potential for improvements
Real-time notifications would be a very powerful tool for retailers and corporate salespeople. Actually, a system that helps people to prevent unwanted and potentially money-losing situations would be interesting to practically anyone, regardless of their field of work. We strive to create a capable and dynamic stream processing system powered by Kafka and Spark that will make people’s jobs and, more importantly, lives much easier.
For example, imagine a scenario where a certain product’s sale suddenly skyrockets contrary to the known trends without anyone noticing it. That can lead to stock shortage and leave someone in a hurry to do a supply renewal. Usually, whenever something has to be done quickly, there is very little room for error and a lot of pressure on the people responsible for these tasks. This often results in their stress levels rising and ultimately may even drive them to reconsider their career and job-related decisions. Fortunately, this whole situation can be avoided thanks to a possible real-time notifications system that would inform them to resupply on time if it notices a sudden deviation from the current trend. That’s why we made it our priority to implement this system while developing the project further.
In this blog post, the fourth in this series, we talked mainly about Spark and how we used it for the course of our Retail Business Intelligence Platform project. For starters, we talked about the infrastructure we run it on and how we got it up and running in the first place. After that, we discussed the specifics of our use case and how multiple queries in Spark can be trickier than one might expect. Finally, we mentioned KStreams and when it can be used as an alternative to Spark, as well as the next big thing we have in the plan for this project. That’s all for today, but we’ll be back soon with more interesting posts about this project!
Read all our posts in this series: