Data Ingestion and Table Management in the Lakehouse using Presto, Iceberg and Spark

Mrudula Madiraju
9 min readSep 28, 2023

This blog (cowritten with Dharmesh Jain and inputs from Shrinivas Kulkarni) is about why and how to get Presto, Iceberg and Spark to work in tandem for the data ingestion and table management in a typical lakehouse scenario. We demonstrate this with IBM watsonx.data

https://cloud.ibm.com/watsonxdata

This blog is divided into three main sections:
1. Provision & Configure an instance of IBM WatsonX.Data
2. Data Ingestion through Spark and Query the data using Presto
3. Table Management using Spark

Overview

There’s enough material out there on lakehouse, however, just to set the context for this blog:

Lakehouse = DataLake + WareHouse

A lakehouse combines the best of worlds of a
a) Data Lake (think “unstructured”, “lowcost”, “flexibility”, “s3 object storage”) and
b) Ware House (think “ACID”, “CRUD”, “ETL”)

IBM watsonx.data is the lakehouse solution from IBM. It lets you explore, load, transform and query data across a number of datasources, irrespective of where your data sets lie, in a governed, secure, scalable manner.

IBM Watsonx.data, as in typical lakehouse has several layers and open components in each of those layers that work together to with complementary features that helps you deal with large data sets. Let’s list them below.

- Presto is used as the engine for executing your SQL based queries. Presto is an open source SQL query engine that can work with data in several different data sources. You can also use it to load and ingest small amounts of data in IBM watsonx.data. At the time of writing this article, Presto is the only choice for the query engine for WatsonX. (Watch out for more engine options in upcoming releases). Read more here.

- Apache Iceberg is a powerful table format for managing data in a table. Note this is different from a data format like say parquet / csv. Iceberg’s features such as data snapshots, schema evolution, compacting data etc are really powerful constructs to manage and maintain your huge data sets.
☝️ And here’ the key thing to note : These operations are supported only through Spark!
As a side note, Iceberg, also offers ACID transactions unheard of previously in the datalake world. You can update and delete rows for tables managed through iceberg.

- Hive Meta Store: The Iceberg runtime needs an associated backend system to maintain, manage and catalog the table’s metadata. IBM Watsonx.data’s implementation for this backend is through a HMS (Hive Meta Store)

-Apache Spark : We already mentioned above how Spark is crucial for leveraging key Iceberg operations for table maintenance. Similarly, there are other usecases where Spark’s exclusive capabilities are the need of the hour.
-> For example, you may need to do some data processing (filtering, transforming of your raw data) that can be done easier and natively using Spark procedural code (as opposed to say, using SQL). After the data is processed, you would store this in an S3 compatible object storage that you can later query through Presto.
-> Conversely, there maybe data in Watsonx.data, say in a DB2, ingested through other sources that can best be analyzed using Spark procedural code rather than SQL.
-> Yet another use case for Spark is the varied options for data ingestion (think- streaming, dataframes, RDDs, supported dataformats)

Each component in the Watsonx.data stack brings to the table unique capabilities and in IBM Watsonx.data, these different components work together in tandem to provide the best data management capability for your analytics workloads.

1. Provision & Configure WatsonX.Data

From cloud.ibm.com — search for watsonx.data in the catalog. Go for default options if you are trying things out.

Once, provisioned, you will get the instance page in this format.

Next you need to configure it with an (a) engine, (b) catalog and (c ) the bucket to be associated with the catalog. There’s a nice intuitive UI wizard that walks you through the steps. 😃

First step — Configure a S3/COS bucket

This object store bucket will be used by IBM WatsonX.data to store data and metadata. Here you can choose to either have bucket created automatically for you or you give a pre-created bucket. The picture below shows the latter, where we have specified a bucket, in this case “lh-mrmadira

Second Step — Choose the Catalog

Choose Iceberg for purpose of this blog’s demonstration

Third Step — Choose the query engine

Here we choose the default options — Presto for now.

2. Data Ingestion through Spark and Query using Presto

First, let’s see the buckets involved in this exercise. For same of simplicity we have placed both buckets in the same IBM COS instance. These buckets can be in different IBM COS instances and even on EMR S3 storage as well.

“lh-mrmadira” : This is the bucket that we mapped while setting up the catalog-bucket earlier.

lh-mrmadira-data” : This is bucket containing the datasets to read from and create tables into the lakehouse.

Buckets in IBM COS

Now, within the test data bucket we have uploaded the data following the steps in documentation link.

Spend some time to read and understand the sample pyspark application file. It demonstrates several important concepts.

References: https://iceberg.apache.org/docs/latest/getting-started/

Concept 1: Create a Database/Schema in watsonx.data

Here, using spark, you are creating a schema under the catalog created in previous steps. And within that schema, you will be creating a few tables and loading data into them.

Concept 2: Data Ingestion using INSERT: Create a simple table and insert data

Note the clause using iceberg. Iceberg is the table format specified.

Concept 3: Data Ingestion of parquet data: Create a Parquet format table managed by Iceberg table format

Note that table format is Iceberg, data format is Parquet

Here, we are reading data from one test bucket and storing it back into the lakehouse bucket specified from earlier step.

We have not specified explicitly iceberg table format here. But that is implied because of the catalog type chosen from earlier step.

Concept 4: Data Ingestion of CSV data: Create a CSV format table managed by Iceberg table format

Here also we are reading a csv file from a test bucket.. using the CTAS (Create Table As) .. that create a new table explicitly using iceberg

Query the data from Presto

Here you can see all the tables created using the Spark application under the associated catalog and you query them from the Watsonx.data’s Query interface.

Query from Watsonx.Data’s Query Workspace

You can also query this from a standalone presto CLI using the following command

./presto\
--server <https://<watsonx.data_MetastoreHost:port> \
--catalog iceberg_data \
--schema default \
--user ibmlhapikey_<your-username> \
--password
Query from Presto CLI

Now at the end of data ingestion exercise, you can see the data files and the metadata files in the bucket associated with watsonx.data catalog.

In this case it was the bucket “lh-mrmadira” and here you can see the files associated for the table “yellow_taxi-2023”

3. Table Management using Spark

Table management is one of the key feature of Iceberg+Spark combo that is very essential in the bigdata world. When you deal with large datasets, you would need to constantly take care of the upkeep of the table for efficient storage and queries. Several interesting concepts coming up below…

Concept 1: Table Management: Compacting data

Compacting data files using Iceberg feature from Spark
https://iceberg.apache.org/docs/latest/spark-structured-streaming/#compacting-data-files

In this code, from the first query (.files) we can see that there are several small sized data files which can result into inefficient queries.

Before Compaction

The next part in code calls the Iceberg procedure rewrite_data_files and instructing it to write it into 200MB file chunks. The result of this operation shows the number of files rewritten and the bytes

Compact information

And finally when you check the metadata of files again, it shows that there are smaller number of files with roughly 200MB data each

After Compaction

Concept 2: Table Management: Expiring Snapshots

Expiring snapshots using Iceberg from Spark

A really cool feature in iceberg table formats is that “each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.”

https://iceberg.apache.org/docs/latest/spark-structured-streaming/#expire-old-snapshots

Here we first query, all the snapshots information, then expire all snapshots except the latest one and query once again to confirm. We make use of the Iceberg procedure expire_snapshots

All snapshots before calling the expire_snapshots procedure
Only the latest snapshot remains after the expire_snapshots procedure executes

Concept 3: Delete Orphan Files

Calls the Iceberg remove_orphan_files procedure from Spark
No more orphan files!

Concept 4: Table Management: Rewrite Manifest files

Calls the Iceberg rewrite_manifests procedure from Spark
Result of rewrite manifests procedure

Concept 5: Table Management: Schema Evolution

Schema evolution is another really cool feature of Iceberg where you can add, drop, rename, update, reorder or drop columns just like you can do for a regular DB. Without Iceberg, doing these kind of operations with bigdata is a cumbersome process with a lot of rewrites. Read more here: https://iceberg.apache.org/docs/latest/evolution/#schema-evolution

New column added : ” fare_per_mile” in the end

A word on ..Configuring Analytics Engine (Serverless Spark)

All of the examples showing through the PySpark application works through IBM Analytics Engine which provides the Spark runtime. At the time of writing of this article, you will need to provision an instance of Analytics Engine Spark and configure it to work with the HiveMetastore from Watsonx.data. Read more here. In future this will be a more seamless experience. Keep watching this space!

A note on … the “ibm-lh" tool

All of the table maintenance operations that were described using the pyspark application above, can also be done through a convenient toolibm-lh after some minimal configurations. Read more here.

A note on … Watsonx.Data Infrastructure Manager

This picture below shows the different associations with the engines, catalogs and associated buckets. As you can see you can have multiple catalogs that can work with one engine. You can also have more than one engine within the watsonx.data instance. Within each catalog, you can have multiple DBs or Schemas, and further tables within those DBs/schemas.

When you have a large organization and multiple datasources and projects working together, this visualization will help to understand the bigger picture. This helps you mix and match different engines, catalogs to come up with the solution that suits best for each dataset.

Infrastructure Manager view of the engines, catalogs and associated buckets

Conclusion

Hope this was a good introduction to how the stack of open engines work in tandem in Watsonx.data — for the complementary data management usecases. Let us know through comments or likes or reach out to us through IBM Support. We shall come up with more topics in this series — like CPD, AWS and so on. Watch this space for more.

--

--

Mrudula Madiraju

Dealing with Data, Cloud, Compliance and sharing tit bits of epiphanies along the way.