Access Data and Metadata across Workloads

Serverless Spark SQL

Working With External Metastore

Overview

Spark SQL uses Hive Metastore to manage the metadata of user’s applications tables, columns, partition information. By default, the database powering this meta store is an embedded Derby instance that comes with Spark cluster. You could choose to externalize this meta store DB to an external relational database instance outside of the Spark cluster so that you can reference your application tables across workloads. This, in combination with storing your data in object storage, helps persisting data and metadata across workloads.

This article is a walkthrough of how you can achieve this and access data across multiple workloads using Serverless Spark

Shared Data and Metadata across Spark Workloads

Steps — Overview

  • Step 0 : CREATE EXTERNAL DATABASE INSTANCE :- In this article we are taking the example of a Database for PostgreSQL instance. Note the credentials, database, port, user, password and certificate information.
  • STEP 1 : UPLOAD POSTGRES CERTIFICATE TO COS :- Get the base64 decoded certificate from the service credentials of the postgres instance and upload the file (call it say, postgres.cert) into a COS bucket in a specific location. Since the Databases for PostgreSQL uses self signed certificates, it is required to store the certificate in the Spark cluster for connecting to the database from Spark.
  • STEP 2: CREATE A SERVERLESS SPARK INSTANCE:- In this step, we shall create a serverless spark instance with some specific default spark configurations that will facilitate the use of PostGreSQL DB as metastore
  • STEP 3: CUSTOMIZE INSTANCE FOR CERTIFICATE USE :- Customize your Serverless Spark instance to associate the PostgreSQL self signed certificate to the instance. This way every workload that you run against the instance, the certificate can be made accessible across the internal Spark clusters that get spun up.
  • STEP 4: SET UP HIVE META STORE SCHEMA:- Create the Hive meta store Schema in the PostgreSQL database. This is a one time activity that will setup the hive metadata tables to support user’s application tables.
  • STEP 5: TEST “CREATE TABLE” FLOW:- Create an external table that uses the postgres meta store and has data located on IBM COS. This example creates a parquet table using Spark SQL with data located in COS
  • STEP 6: TEST “SELECT TABLE” FLOW:- Select the data from the table created previously.

Steps In Detail

Assuming that Step 0 and Step1 have been already performed, let’s start with STEP 2: CREATE A SERVERLESS SPARK INSTANCE with postgresql configs

In this payload for creating a serverless spark instance, we specify the COS credentials that will be used in the spark applications. It also specifies the PostgreSQL DB connection credentials (the db host ,port, user name, password details). And a few other hive metastore specific parameters.

The ConnectionURL and ae.spark.libraryset parameters references the postgres.cert that we shall use in later in STEP3

{
"name": "my-serverless-spark-instance",
"resource_plan_id": "8afde05e-5fd8-4359-a597-946d8432dd45",
"resource_group": "<CHANGEME>",
"target": "us-south",
"parameters": {
"default_runtime": {
"spark_version": "3.1"
},
"default_config": {
"spark.hadoop.fs.cos.mycosservice.endpoint": "https://s3.direct.<CHANGEME>.cloud-object-storage.appdomain.cloud",
"spark.hadoop.fs.cos.mycosservice.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.mycosservice.secret.key": "<CHANGEME>",
"spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.postgresql.Driver",
"spark.hadoop.javax.jdo.option.ConnectionUserName": "ibm_cloud_<CHANGEME>",
"spark.hadoop.javax.jdo.option.ConnectionPassword": "<CHANGEME>",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.schema.verification": "false",
"spark.hadoop.hive.metastore.schema.verification.record.version": "false",
"spark.hadoop.datanucleus.schema.autoCreateTables":"true",
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:postgresql://CHANGEME.databases.appdomain.CHANGEME/ibmclouddb?sslmode=verify-ca&sslrootcert=/home/spark/shared/user-libs/customize_integration_custom_lib/custom/postgres.cert&socketTimeout=30",
"ae.spark.librarysets":"customize_integration_custom_lib"
},
"instance_home": {
"provider": "ibm-cos",
"type": "objectstore",
"region": "us-south",
"endpoint": "s3.direct.CHANGEME.cloud-object-storage.appdomain.cloud",
"hmac_access_key": "<CHANGEME>",
"hmac_secret_key": "<CHANGEME>"
}
}
}

STEP 3: CUSTOMIZE INSTANCE FOR CERTIFICATE USE

Prereq Reading: https://cloud.ibm.com/docs/AnalyticsEngine?topic=AnalyticsEngine-cust-script

  • Upload the exact customization_script.py mentioned in the above page to a COS bucket
  • This code is assuming that a file named postgres.cert is located in a bucket in COS. (from step1)
  • Now run the Spark Submit REST API for submitting spark application using this payload.
postgres-cert-customization-submit.json
{
"application_details":
{
"application": "/opt/ibm/customization-scripts/customize_instance_app.py",
"arguments": ["{\"library_set\":{\"action\":\"add\",\"name\":\"customize_integration_custom_lib\",\"script\":{\"source\":\"py_files\",\"params\":[\"https://s3.direct.<CHANGME>.cloud-object-storage.appdomain.cloud\",\"<CHANGEME_BUCKET_NAME>\",\"postgres.cert\",\"<CHANGEME_ACCESS_KEY>\",\"<CHANGEME_SECRET_KEY>\"]}}}"],"py-files": "cos://CHANGEME_BUCKET_NAME.mycosservice/customization_script.py"
}
}

STEP 4: SET UP HIVE META STORE SCHEMA

Before executing this step, there will no tables in the public schema of PostgreSQL db.

Note This is just a dummy python file for invoking a meta store flow so that hive meta store schema tables can get created. Upload to COS

postgres-create-schema.pyfrom pyspark.sql import SparkSessionimport timedef init_spark():
spark = SparkSession.builder.appName("postgres-create-schema").getOrCreate()
sc = spark.sparkContext
return spark,sc
def create_schema(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
time.sleep(30)
def main():
spark,sc = init_spark()
create_schema(spark,sc)
if __name__ == '__main__':
main()

Upload this python file to COS and execute it by submitting an application using the following payload:

postgres-create-schema.json{
"application_details": {
"application": "cos://CHANGEME.mycosservice/postgres-create-schema.py"
}
}

After executing you will see the hive metadata tables created like so:

STEP 5: Test CREATE TABLE flow

This script creates a Parquet Table with metadata in PostgreSQL DB and data in COS

postgres-parquet-table-create.pyfrom pyspark.sql import SparkSessionimport timedef init_spark():
spark = SparkSession.builder.appName("postgres-create-parquet-table-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def generate_and_store_data(spark,sc):
data =[("1","Romania","Bucharest","81"),("2","France","Paris","78"),("3","Lithuania","Vilnius","60"),("4","Sweden","Stockholm","58"),("5","Switzerland","Bern","51")]
columns=["Ranking","Country","Capital","BroadBandSpeed"] df=spark.createDataFrame(data,columns)
df.write.parquet("cos://<CHANGEME-BUCKET>.mycosservice/bbspd")
def create_table_from_data(spark,sc):
spark.sql("CREATE TABLE MYPARQUETBBSPEED (Ranking STRING, Country STRING, Capital STRING, BroadBandSpeed STRING) STORED AS PARQUET location 'cos://CHANGEME-BUCKET.mycosservice/bbspd/'")
df2=spark.sql("SELECT * from MYPARQUETBBSPEED")
df2.show()
def main():
spark,sc = init_spark()
print("myparquetbb1")
generate_and_store_data(spark,sc)
print("myparquetbb2")
create_table_from_data(spark,sc)
print("myparquetbb3")
time.sleep(30)
if __name__ == '__main__':
main()

Upload the script to COS and submit the application using the following payload:

postgres-parquet-table-create-submit.json
{
"application_details": {
"application": "cos://CHANGEME.mycosservice/postgres-parquet-table-create.py"
}
}

STEP 6: Test SELECT TABLE flow

SELECT from the table created earlier from another spark workload

postgres-parquet-table-select.pyfrom pyspark.sql import SparkSession
import time
def init_spark():
spark = SparkSession.builder.appName("postgres-select-parquet-table-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def select_data_from_table(spark,sc):
df=spark.sql("SELECT * from MYPARQUETBBSPEED")
df.show()
def main():
spark,sc = init_spark()
print("myparquetbbspeed-show")
select_data_from_table(spark,sc)
time.sleep(60)
if __name__ == '__main__':
main()

Upload the application file to COS and run the Spark application using the payload below:

postgres-parquet-table-select-submit.json{
"application_details": {
"application": "cos://CHANGEME.mycosservice/postgres-parquet-table-select.py"
}
}

You can see that the table created from an earlier Spark application is accessible across a different application.

Conclusion

Executing SparkSQL and referencing tables across different spark workloads becomes easy once you customize the instance level configurations for Data and Metadata access.

This tutorial explores the concepts of specifying default spark configs, customization, creating and querying parquet data in dataframes; creating and querying parquet based on external tables in COS location.

Happy Learning!

(With inputs from Dharmesh K Jain)

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store