Streaming Applications on Serverless Spark

How to Push Data Events to a Kafka Topic, Read Them Using SparkStreaming and Persist to ObjectStore

Mrudula Madiraju
SelectFrom

--

Contents

  • Terminology Basics Brush Up
  • Create “Topic” in IBM Event Streams
  • Run a DataProducer Application That Generates Events
  • Create and Run Spark Streaming Application
  • Check Data Persisted on Bucket in IBM Cloud Object Storage
  • End Notes

Terminology Basics Brush Up

Event Streaming: “Event streaming is the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in real-time as well as retrospectively; and routing the event streams to different destination technologies as needed.”

Apache Kafka® is an event streaming platform — a distributed event store and stream-processing platform. The project provides a unified, high-throughput, low-latency platform for handling real-time data feeds.

Producers are those client applications that publish (write) events to Kafka.
Consumers are those that subscribe to (read and process) these events.

In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers.

In this article, Producer is a standalone application that comes as part of Kafka Client example. Consumer is the Apache Spark Streaming application running on IBM Cloud

Topics: Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be “payments”. Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events. Events in a topic can be read as often as needed — unlike traditional messaging systems, events are not deleted after consumption. Instead, you define for how long Kafka should retain your events through a per-topic configuration setting, after which old events will be discarded. Kafka’s performance is effectively constant with respect to data size, so storing data for a long time is perfectly fine.

Source: kafka.apache.org

Create “Topic” in IBM Event Streams

IBM Event Streams on IBM Cloud is a fully managed service built on top of Apache Kafka. It additionally offers useful features such as Schema Registry, Geo Replication, Enterprise Connectors, Enterprise-grade security, Scalable Rest API and an awesome user experience. The UI allows you to visually look at the topic/partition usage. Read more here.

For this tutorial, you can work with a free plan of the Event Streams instance. From the catalog — https://cloud.ibm.com/catalog/services/event-streams, select the Lite plan. Go to “Topics” menu and create a topic with a name of your choice. You can go with default options.

I have created a Topic with name “stock” with single partition and retention time of 1 day.

My EventStreams instance and topic got set up in less than 5 minutes!

I then created a “Service Credential” that will give the details of the kafka brokers, user id (“token”) and the password that I shall use in the following steps to connect to the topic.

Run a DataProducer Application That Generates Events

We shall use only the client part of the Kafka download. There’s a nifty, easy to use script called kafka-console-producer.sh that can be used to connect to the topic created earlier and publish events/data to the topic.

  • Download and unzip the Binary for Scala 2.12 from https://kafka.apache.org/downloads
  • Change directory to bin and create a text file named message-hub.config with the following contents.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password=”PASSWORD";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
  • Replace USER and PASSWORD in your message-hub.config file with the user and password values seen in Service Credentials from the previous section.
  • From the bin directory, run the following command. Replace KAFKA_BROKERS_SASL with the kafka_brokers_sasl value seen in Service Credentials.
./kafka-console-producer.sh — broker-list KAFKA_BROKERS_SASL \ 
— producer.config message-hub.config — topic stock
After replacing: ./kafka-console-producer.sh — broker-list \
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,\
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093,\
kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,\
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,\
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093 \
— producer.config message-hub.config — topic stock > data.json
  • The Kafka console tool is now awaiting input. Sample data can be pasted into the console or the entire file can be piped to the prompt of the producer script.
  • One line is considered as one event data unit
  • An example is given below:
{"GENDER": "M","AGE": 27,"MARITAL_STATUS": "Married","PROFESSION": "Teacher","IS_TENT": "TRUE","PRODUCT_LINE": "Camping Equipment","PURCHASE_AMOUNT": 144.78}
{"GENDER": "F","AGE": 37,"MARITAL_STATUS": "Single","PROFESSION": "Doctor","IS_TENT": "TRUE","PRODUCT_LINE": "Sports Shoes","PURCHASE_AMOUNT": 154.78}
{"GENDER": "M","AGE": 40,"MARITAL_STATUS": "Married","PROFESSION": "Software Engineer","IS_TENT": "TRUE","PRODUCT_LINE": "Swimwear","PURCHASE_AMOUNT": 164.78}
{"GENDER": "F","AGE": 58,"MARITAL_STATUS": "Single","PROFESSION": "Lawyer","IS_TENT": "TRUE","PRODUCT_LINE": "Hiking accessories","PURCHASE_AMOUNT": 174.78}

Create and Run Spark Streaming Application

a. Create Spark Streaming Application

Here’s the simple Spark Scala application that reads the data from the Kafka stream, by connecting to the topic specified in config, filters or transforms data and finally persists the data to a bucket in Cloud Object Store.

Read more about Spark Streaming for Kafka here.

b. Create an instance of Analytics Engine — Serverless Spark plan

IBM Analytics Engine — Serverless Spark plan — is a managed service against which you can run Spark applications and get billed for only the resources that are consumed. Behind the scenes, a Spark cluster is created on-the-fly, against which your applications runs. Learn more here.

You can create an instance here: https://cloud.ibm.com/catalog/services/analytics-engine

Here’s a nice “Getting Started” tutorial, if you want to get started with same basics: https://cloud.ibm.com/docs/AnalyticsEngine?topic=AnalyticsEngine-using-cli

c. Customize your Analytics Engine instance for jaas.conf

Some background info: https://cloud.ibm.com/docs/EventStreams?topic=EventStreams-kafka_java_using

For the Spark application to be able to connect to the Kafka brokers and read from the topic, we need to provide it the authentication information.

  • So for this create a jaas.conf file with the following content and upload it to some location in COS
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="CHANGEME"
password="CHANGEME";
};
  • Upload the file to a bucket on your COS instance (in my case: bucket named matrix)
curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/$instance_id/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json"  -d @submit-customization.json

where submit-customization.json contains the following content.

  • What this customization step does, is to create a “library set” ( here called kafka_jaas_conf) , containing the file specified (jaas.conf). This “library set” is made available to all subsequent spark applications, when you specify the library set name in the spark applications.
  • Note that the customization is also nothing but submitting of Spark application — just that in this case, you are submitting an in-built application that comes as part of the AE-Spark cluster. You just need to pass some specific arguments like the name of the library set, the bucket a file name you need to include in the library set and of course the access credentials. If this is still a bit hazy, read that background reading link you skipped earlier :)
  • The library set file is made available to you in this path: /home/spark/shared/user-libs/

d. Submit the Scala Application to run on your IBM Analytics Engine instance

The following is a very busy screenshot. You might need to toggle a bit from the source code to this slide to understand it completely.

  • It specifies all of the application configuration information required by the Scala application like the “spark.cos.url”, “spark.kafka_bootstrap_servers”, “spark.kafka_topic”, “spark.trigger_time_ms
  • It contains the application information itself , “application” contains the location of the scala jar file to be executed (it is always a COS location for AE-Serverless), and the main “class” to be executed. It also uses the “packages” parameter required for the Kafka connectors to work. (Side Note:- This is all very similar to a spark-submit in the Spark world, you can draw parallels. Read more here)
  • It contains the endpoint, access key, secret key information that is required for accessing the application to be executed. In this case, it is the same bucket and endpoints we are using within the application also to persist the data. (Side Note:- that we use the identifier “mycosservice”. You can give use any string of your choice, but that string should be in sync in the identifier specified in the COS url cos://bucketname.identfier/object)
  • It contains the ae.spark.librarysets parameter, so that the ‘library set’ created in the earlier customization step is accessible. The ‘kafka_jaas_conf’ library set that contains the jaas.conf file can be used with this payload now.
  • Finally, a word on the extraJavaOptions. This is what will be used by Spark application to authenticate and connect to the Kafka Topic. Notice how the jaas.conf is made available on the standard path for AE custom library location :
/home/spark/shared/user-libs/kafka_jaas_conf/custom/jaas.conf
curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/$instance_id/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json"  -d @submit.json

Check Data on Bucket in IBM Cloud Object Storage

So, once you get the application running, it will start processing the data that you sent in earlier at the producer end and save files to COS. You can check on the data path specified in the application (in my case “kafkadata”) to confirm that data is getting stored as expected into the COS bucket path

End Notes

All of the source code for this tutorial can be found here: https://github.com/IBM-Cloud/IBM-Analytics-Engine/tree/master/solution-samples/streaming

Note that as long as your streaming application is running, it is consuming resources and it will be billed. If you need to stop the application you can issue this API

While this tutorial was based out of the REST API, you can try all of this with the corresponding IBM Cloud CLI commands.

Finally, note that for periodic security updates, Analytics Engine — Serverless Spark has a limitation for the duration of running applications. So, care needs to be taken when restarting any applications that get evicted by the cleanup process.

Maximum time applications can run is 72 hours  https://cloud.ibm.com/docs/AnalyticsEngine?topic=AnalyticsEngine-limits

Additional Reference Article: Data Engine’s Streaming feature

Here’s an excellent blog tutorial that shows a fully managed method that let’s you do the JSON-Parquet landing from Event Stream Topics to COS

IBM Cloud Data Engine can consume an Event Streams topic and write the events to IBM Cloud Object Storage as Parquet objects. Later you can use the super cool FLATTEN() function on the Parquet data to flatten it out into a CSV format! Multi level nested JSON data can then be queried in the familiar SQL style syntax.

To learn more about Data Engine, read here.

And with that, you’ve crossed another level to becoming a boss coder. GG! 👏

I hope you found this article instructional and informative. If you have any feedback or queries, please let me know in the comments below. And follow SelectFrom for more tutorials and guides on topics like Big Data, Spark, and data warehousing.

The world’s fastest cloud data warehouse:

When designing analytics experiences which are consumed by customers in production, even the smallest delays in query response times become critical. Learn how to achieve sub-second performance over TBs of data with Firebolt.

--

--

Dealing with Data, Cloud, Compliance and sharing tit bits of epiphanies along the way.