Using Apache Hudi snapshot queries with PySpark

Vinay Badhan
3 min readJun 19, 2021

Problem statement:

In my current organisation, we ingest data from multiple sources in our platform. We do use Apache Hudi for storing the processed data.

One of our spark jobs that runs daily, reads the processed data from the Hudi tables, assigns refreshed rating using a separate microservice for getting the new aggregated values which are written back to the Hudi table.

Our writes started to fail recently, because the rating assignment takes some time while the real-time pipeline is updating the Hudi table and updating the older parquet files in the Hudi table, the list of files that were sent by list_files api of S3 returned older paths. So the subsequent reads for the other sources used to fail with the following error message

org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://<hudi-table-read-path>=2021-05-24/source_type=<source-type>/29ed87ad-19fb-44cf-b5b6-d8d60c6949ca-0_3-841189-9985148_20210524053602.parquet

The file paths do not exist anymore so S3 reads themselves were failing causing our job to fail.

Solution:

After some investigation, we found that the root cause is the real-time pipeline overwriting the older files causing S3 reads to fail.

I first tried to cache the first read itself. And unfortunately, that day while the first read was happening, one of the existing files got overwritten causing it to fail itself.

Then, I thought that caching would not solve this problem for a long run and we will be frequently running into this issue. Going through the Apache Hudi documentation, I came across snapshot queries.

This method can be used to retrieve the data table at the present point in time. Note: The file path must be suffixed with a number of wildcard asterisk (/*) one greater than the number of partition levels. Eg: with table file path “tablePath” partitioned by columns “a”, “b”, and “c”, the load path must be tablePath + "/*/*/*/*"

Since, we were using Python and not Scala, next challenge was to write the equivalent code so we can leverage the existing snapshot query feature provided by Apache Hudi.

After going through the codebase of Apache Hudi, I made the changes to use snapshot queries in our spark job. Following is the code snippet for that:

Use snapshot queries with Pyspark:

def read_processed_datums(spark, processed_datums_base_path, processed_datums_partition_pattern):"""Processed datums with uid present and rating >= 0Args:spark (pyspark.sql.session.SparkSession): SparkSession object for the appprocessed_datums_base_path (string): processed datums base pathprocessed_datums_partition_pattern (string): processed datums partition pathReturns:dataframe: processed datums with uid present and rating >= 0"""  return (    spark.read.format("hudi")    .option("hoodie.datasource.query.type", "snapshot")    .load(f"{processed_datums_base_path}      {processed_datums_partition_pattern}")    .where(F.col("uid") != "")    .where(F.col("rating") >= 0)  )

On top of this, we added a cache for filtered data-frame for improving the performance, so we don’t have to read the same data again and again 😉.

datums = read_processed_datums(spark, processed_datums_base_path, processed_datums_partition_pattern)datums = get_filtered_datums(datums, start_time, end_time)datums.cache()

--

--