Hello HBase World from Spark World
First steps on how to read and write pyspark applications to read and write to HBase tables
Overview
When working with big data, choosing the right storage for your data is a key design decision in your data processing architecture. If you had to compare S3, Hive and HBase,
S3 is best for durable, scalable, and cost-effective storage. Ideal for backup, archival, and storing large volumes of data with less frequent access
Hive is best for batch processing, data warehousing, and running complex queries using a SQL-like language. It is suitable for large-scale data analysis and transformation. Hive is often called the “warehouse” in the Hadoop ecosystem.
HBase is best for real-time read/write access, random access to data, and applications requiring low latency. Ideal for dynamic data with frequent updates. HBase is often called the “database” in Hadoop ecosystem.
Spark + HBase : Spark with its Streaming, Machine Learning, Batch Processing capabilities, can be used to transform, enrich data from various sources, including Hbase and store it back into Hbase. Both of them are powerful tools that work complementary for building scalable, robust data processing systems.
This article written along with @djain13 and @surbhibakhtiyar27 shows the preliminary steps on how to connect from Apache Spark using PySpark applications, to read and write data into HBase.
Scenario Overview
We are considering three scenarios here:
Scenario1:
Remote Machine : HBase server running as Docker container on a simple VM/ system
Client Machine: Apache Spark 3.3 running on a simple VM / system
Scenario2:
Remote Machine : HBase server running as Docker container
Client : IBM Cloud Pak for Data — Analytics Engine Spark
Scenario3:
Remote Machine : HBase server running as Docker container
Client : WatsonX.Data (CPD) — Native Engine Spark
Scenario4:
Remote Machine : HBase server running as Docker container
Client : WatsonX.Data (SAAS)— Native Engine Spark
Start up HBase
docker pull dajobe/hbase
mkdir data
./start-hbase.sh
# small modification within the start-hbase.sh script in order to make it
# accessible to remote client
# -h parameter changed to hostname of the machine where the container
# is running
id=$(docker run -d -h api.sbakhtiy-501-cls2.cp.fyre.ibm.com
...
# exposed port 16020 and 2181 as well
-p 16020:16020 -p 2181:2181
Create table using HBase shell
Access the hbase shell:
docker ps -a #copy the docker container id
docker run <docker-container-id> -- hbase shell
Create a table called “testspark” to which we shall write data and read data from Spark. Add some initial sample data.
list
create 'testspark', 'cf'
put 'testspark','1','cf:name','raju'
put 'testspark','1','cf:city','hyderabad'
HBase configurations needed on the client side
From the hbase docker container, get the contents of the file in this location /opt/hbase/conf/hbase-site.xml
In our case, it looks like this:
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>api.sbakhtiy-501-cls2.cp.fyre.ibm.com</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>file:////data/hbase</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>api.sbakhtiy-501-cls2.cp.fyre.ibm.com</value>
</property>
<property>
<name>hbase.regionserver.info.bindAddress</name>
<value>api.sbakhtiy-501-cls2.cp.fyre.ibm.com</value>
</property>
</configuration>
On your client machine where Spark is running, create a file called hbase-site.xml with the above contents.
Next create a jar out of the xml, You will need this jar file later as shown below.
jar cf hbase-site.xml.jar hbase-site.xml
Scenario 1 : Apache Spark + HBase
PySpark Application to Read from HBase
spark-submit command
JAR1=http://canali.web.cern.ch/res/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar
JAR2=http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar
JAR3=hbase-site.xml.jar
spark-submit
--num-executors 1 \
--executor-cores 2 \
--jars $JAR1,$JAR2,$JAR3 \
--packages org.apache.hbase:hbase-shaded-mapreduce:2.4.15 \
read-hbase.py
read-hbase.py
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("demo-base").getOrCreate()
sc = spark.sparkContext
return spark,sc
def main():
spark,sc = init_spark()
df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id STRING :key, name STRING cf:name, city STRING cf:city").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).load()
df.show()
if __name__ == '__main__':
main()
On the console you would see the output:
PySpark Application to Write to HBase
spark-submit command
JAR1=http://canali.web.cern.ch/res/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar
JAR2=http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar
JAR3=hbase-site.xml.jar
spark-submit
--num-executors 1 \
--executor-cores 2 \
--jars $JAR1,$JAR2,$JAR3 \
--packages org.apache.hbase:hbase-shaded-mapreduce:2.4.15 \
write-hbase.py
write-hbase.py
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("demo-base").getOrCreate()
sc = spark.sparkContext
return spark,sc
def main():
spark,sc = init_spark()
data = [('2','Telangana','Chhetri'),
('3','Sikkim','Bhutia'),
('4','Hyderabad','Shabbir'),
('5','Kerala','Vijayan'),
('6','Mizoram','Lalpekhlua')
]
columns = ["id","city","name"]
goalsDF = spark.createDataFrame(data=data, schema = columns)
goalsDF.show()
#write to hbase
goalsDF.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id STRING :key, name STRING cf:name, city STRING cf:city").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).save()
if __name__ == '__main__':
main()
results in output:
This time, if you run the read-hbase.py once again, it will show up the new data:
Scenario 2: IBM Cloud Pak for Data — Analytics Engine + HBase
For this scenario, we demonstrate the same set of pyspark applications, but this time running from IBM Cloud Pak For Data — Analytics Engine Spark.
In CPD, you can submit applications referring to this documentation: https://www.ibm.com/docs/en/cloud-paks/cp-data/4.8.x?topic=applications-submitting-spark-jobs
Pay special attention to the application payload. We have the some additional jars that are needed for the application to work including the hbase-site.xml.jar. All of these jars have been placed in a volume in CPD and then mounted that volume to the spark application.
Here’s how my storage volume looks like — it has all the jars, as well as the applications I need to run:
Note how that storage volume is mounted to the application.
submit-read.json:
{
"application_details": {
"application": "/mnts/myjars/hbase-read.py",
"conf": {
"spark.driver.extraClassPath":"/mnts/myjars/hbase-site.xml.jar:/mnts/myjars/hbase-shaded-mapreduce-2.4.15.jar:/mnts/myjars/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar:/mnts/myjars/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar:/mnts/myjars/scala-parser-combinators_2.12-1.0.7.jar:/mnts/myjars/slf4j-simple-1.6.1.jar:/mnts/myjars/htrace-core4-4.2.0-incubating.jar",
"spark.executor.extraClassPath":"/mnts/myjars/hbase-site.xml.jar:/mnts/myjars/hbase-shaded-mapreduce-2.4.15.jar:/mnts/myjars/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar:/mnts/myjars/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar:/mnts/myjars/scala-parser-combinators_2.12-1.0.7.jar:/mnts/myjars/slf4j-simple-1.6.1.jar:/mnts/myjars/htrace-core4-4.2.0-incubating.jar"
}
},
"volumes": [
{
"name": "cpd-instance::mrmadira-data-jar-volume",
"mount_path": "/mnts/myjars"
}
]
Submit the application
curl -k -v -X POST https://cpd-cpd-instance.apps.mrudula-cls-1.cp.fyre.ibm.com/v4/analytics_engines/73f3c755-73d9-46ae-ac3f-124670e82a15/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json" -d @submit-read.json
And when you fetch the logs of the application, you will see this:
Scenario 3: IBM WatsonX.Data (Native Engine) + HBase
To submit in WXD native engine, you can use the same payload. The curl command to submit the application slightly varies.
curl -k -v -X POST https://cpd-cpd-instance.apps.mrudula-cls-1.cp.fyre.ibm.com/lakehouse/api/v2/spark_engines/spark948/applications --header "Authorization: Bearer $token" --header "LhInstanceId: 1717708120488280" -H "content-type: application/json" -d @submit-read.json
Scenario 4: IBM WatsonX.Data (Native Engine) SAAS + HBase
For SAAS WXD, there are additional steps to be taken. You have to first customize the instance by creating a library set with all the required jars.
Customization Script
curl -v -X POST https://us-south.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/$instance_id/applications -H “AuthInstanceID: $crn” — header “Authorization: Bearer $token” -H “content-type: application/json” -d @customization-payload.json
where customization-payload.json is:
{
"application_details": {
"application": "/opt/ibm/customization-scripts/customize_instance_app.py",
"arguments": ["{\"library_set\":{\"action\":\"add\",\"name\":\"my_hbase_library_set\",\"script\":{\"source\":\"py_files\",\"params\":[\"https://s3.us-south.cloud-object-storage.appdomain.cloud\",\"my-may-bucket\",\"d479ffd516ec4aeb9a41eb0cc4dccbc3\",\"e0e5f39d7e5570ff75b8f1242d4fe373cbe2ff19a89d07e3\",\"hbase-client-libs\",[\"hbase-site.xml.jar\",\"hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar\",\"hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar\",\"hbase-shaded-mapreduce-2.4.15.jar\",\"scala-parser-combinators_2.12-1.0.7.jar\",\"slf4j-simple-1.6.1.jar\"]]}}}"],
"conf": {
"spark.hive.metastore.client.plain.username":"ibmlhapikey",
"spark.hive.metastore.client.plain.password":"<changeme>",
"spark.hadoop.wxd.cas.apiKey":"Basic <changeme>",
"spark.submit.pyFiles":"s3a://my-may-bucket/customization_script.py"
}
}
}
By submitting the above payload, you are customizing the instance with all the required jars and create a library set called my_hbase_library_set. More details on customization, read here: https://cloud.ibm.com/docs/AnalyticsEngine?topic=AnalyticsEngine-create-lib-set and https://cloud.ibm.com/docs/AnalyticsEngine?topic=AnalyticsEngine-cust-script
After that you submit the actual application that reads from hbase using the following.
curl -v -X POST https://us-south.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/$instance_id/applications -H “AuthInstanceID: $crn” — header “Authorization: Bearer $token” -H “content-type: application/json” -d @submit-read-chstest1-native.json
where submit-read-chstest1-native.json
{
"application_details": {
"application": "s3a://my-may-bucket/hbase-read.py",
"jars":"s3a://my-may-bucket/hbase-client-libs/hbase-site.xml.jar",
"conf": {
"spark.hive.metastore.client.plain.username":"ibmlhapikey",
"spark.hive.metastore.client.plain.password":"<changeme>",
"spark.hadoop.wxd.cas.apiKey":"Basic <changeme>",
"ae.spark.librarysets":"my_hbase_library_set",
"spark.driver.extraClassPath":"/home/spark/shared/user-libs/my_hbase_library_set/custom/*",
"spark.executor.extraClassPath":"/home/spark/shared/user-libs/my_hbase_library_set/custom/*"
},
"packages":"org.apache.hbase:hbase-shaded-mapreduce:2.2.4"
}
}
Here you make use of the library set created earlier so that the required jars are available for the hbase application.
More To Do
This was just, a Hello World application to show the basics of connecting between Spark & HBase. Of course, we need to add authentication for a real world application, but that’s for another day, another blog.
Using the steps in this blog, you learnt how to make Spark in IBM Analytics Engine in its various form factors and Hbase work together. Choose your Spark combination that suits your use case the best.
References:
https://hub.docker.com/r/dajobe/hbase/
List of Jars needed
http://canali.web.cern.ch/res/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar
http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar
https://mvnrepository.com/artifact/org.apache.hbase/hbase-shaded-mapreduce/2.4.15
https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.12/1.0.7/scala-parser-combinators_2.12-1.0.7.jar
https://mvnrepository.com/artifact/org.slf4j/slf4j-simple/1.6.1
https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4/4.2.0-incubating
+
hive-site.xml.jar