Apache Hudi: From Zero To One
Over the course of the last eight posts, I've explored many topics and internal designs of Hudi, including its storage layout, read and write operations, indexing, table services, and concurrency control mechanisms. The timing now feels right to broaden our perspective and start implementing some practical pipelines to streamline data flow into Hudi. In this post, my focus will shift to Hudi Streamer, a comprehensive data ingestion tool designed for deploying production-grade pipelines for Hudi tables. Given its versatility, a topic I will delve into further within this blog, I frequently liken it to a "Swiss Army knife" for importing data into Lakehouses.
Overview
Hudi Streamer is a Spark application1 designed to offer a wide range of customizable interfaces for managing the write process to Hudi tables. It enables users to configure source data, define schemas, schedule table services, keep data catalogs in sync, and so on. The diagram below presents a high-level view of Hudi Streamer's components and their workflow.
The workflow illustrates an ingestion pipeline, typically configured step by step by users. With Hudi Streamer, the setup process will be greatly simplified with its rich set of options. The key to master this tool is to understand what the options are meant for and how to configure them properly. Below, we provide explanations for some of the foundational options.
The "--table-type" (CoW or MoR), "--table-name" (for identifying the table), and "--target-base-path" (physical location of the table) are three required properties for writing to a Hudi table.
The "--continuous" flag indicates whether Hudi Streamer should operate in an ongoing manner or execute for once. If the flag is present, the application will keep fetching source data and writing to storage in a loop. Without the flag, Hudi Streamer performs one-time data fetching and writing before terminating. The "continuous" mode is ideal when there is a steady stream of data from upstream sources, whereas the "run-once" mode is tailored for batch or bootstrap use cases.
The "--min-sync-interval-seconds" works with the "continuous" mode, specifying the shortest allowable interval in seconds between ingestion cycles. For instance, if an ingestion operation requires 40 seconds to complete and the min-sync-interval is configured to 60 seconds, Hudi Streamer will pause for 20 seconds before initiating the subsequent ingestion cycle. This pause ensures the interval adheres to the minimum set duration. Conversely, if the ingestion duration extends to 70 seconds, surpassing the minimum interval, the application immediately proceeds to the next cycle without any delay. This functionality is crucial for ensuring that sufficient data accumulates at the upstream source for processing, thereby preventing the inefficiency of handling numerous small-scale writes.
The "--op" option represents the type of operation to be executed by Hudi Streamer, which fundamentally serves as another Hudi writer. It supports three write operations: UPSERT (default), INSERT, and BULK_INSERT. For a comprehensive review of write operations, please revisit post 3.
The "--filter-dupes" flag corresponds to the write client configuration
hoodie.combine.before.insert=false|true
. This setting allows users to pre-combine records by keys within the incoming batch, effectively reduce the amount of data to process. The flag is applicable when the write operation is set to INSERT or BULK_INSERT, however it should not be present when "--op" is set to UPSERT, since we don't want to lose potential updates before merging them with on-storage versions.The "--props" and "--hoodie-conf" options offer flexible ways to take in arbitrary Hudi properties. The former points to a file containing a collection of properties, and the latter accepts a single configuration in the format of "key=value". It is important to note that the properties specified through "--hoodie-conf" take precedence over those extracted via "--props".
In the forthcoming sections, we will delve into the major components depicted in the workflow diagram, offering a detailed exploration of additional options.
Source
Source is an abstraction for providing upstream source data for Hudi Streamer. Its primary responsibility is fetching data from the source system as an input batch for processing and writing. By extending the Source abstract class, Hudi Streamer can be seamlessly integrated with a wide range of data systems. Designed with a platform vision from day one, Hudi currently offers more than a dozen of Source implementations off-the-shelf, as shown in the following picture.
To use a Source for Hudi Streamer, set its fully qualified class name to "--source-class" and configure some Source-specific properties where applicable. For example, a KafkaSource would require setting hoodie.streamer.source.kafka.topic
. You may consult the configurations page for more details. Additionally, the "--source-limit" option sets an upper limit on the data amount to read during each fetch, enhancing control over the ingestion process.
Transformer
Upon retrieving incoming data from the Source, it often becomes necessary to perform lightweight transformations, such as adding or dropping specific columns or flattening the schema. The transformer interface facilitates these modifications in a straightforward yet effective manner. It processes a Spark Dataset and outputs the transformed version of the Dataset, enabling seamless data manipulation to meet the requirements of the ingestion pipeline.
The "--transformer-class" option takes in one or many class names of Transformer implementations. When multiple Transformers are given, they are applied sequentially, i.e., the output of one serving as the input for the next. This chained approach provides flexibility and facilitates code maintenance.
Run Table Services
Table services, as introduced in post 5, can be easily managed by Hudi Streamer alongside data writing. When configured as "async", compaction and clustering will be scheduled inline by the Hudi write client internal to Hudi Streamer, and will be executed asynchronously by HoodieAsyncTableService
, which uses a thread pool to submit and control table service jobs.
While async table service jobs are running, it might not always be desirable to write new data, for instance, the same cluster that is executing the table services may not have enough resources to perform ingestion. Furthermore, it's sometimes advisable to avoid running too many concurrent compaction or clustering jobs to prevent resource contention. Use "--max-pending-compactions" and "--max-pending-clustering" to limit the outstanding table service operations, and when the limits are reached, no new ingestion job will be scheduled.
When running ingestion jobs and table service jobs concurrently within the same Spark application, it's crucial to appropriately allocate the cluster's resources to ensure optimal performance and efficiency. Hudi Streamer facilitates this by enabling users to input scheduling configurations through specific options. These configurations play a key role in managing how resources are distributed between the ingestion, compaction, and clustering processes.
# for ingestion
--delta-sync-scheduling-weight
--delta-sync-scheduling-minshare
# for compaction
--compact-scheduling-weight
--compact-scheduling-minshare
# for clustering
--cluster-scheduling-weight
--cluster-scheduling-minshare
The properties shown will be used to generate an XML file, which is then referenced by the Spark property spark.scheduler.allocation.file
. To activate these settings, users should set spark.scheduler.mode=FAIR
for the Spark application.2 For more explanation on the scheduling mechanism, please consult this Spark documentation page.
Sync to Catalogs
Data catalogs play a crucial role in the data ecosystem, and Hudi supports multi-catalog sync out-of-the-box via its SyncTool classes. Hudi Streamer can integrate with SyncTools through the "--sync-tool-classes" option, which takes in a list of SyncTool class names:
# for AWS Glue Data Catalog
org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
# for Google BigQuery
org.apache.hudi.gcp.bigquery.BigQuerySyncTool
# for Hive Metastore
org.apache.hudi.hive.HiveSyncTool
# for DataHub
org.apache.hudi.sync.datahub.DataHubSyncTool
After each write, if the catalog sync is enabled using the "--enable-sync" flag, each of the configured SyncTools will run synchronously in sequence to upload metadata to the target data catalog. For example, if the write created some new partitions and added a new column to the table, the AwsGlueCatalogSyncTool
will update the partition list and the schema stored in the catalog table.
For SyncTools to function properly, users should supply additional SyncTool-specific properties through the "--props" or "--hoodie-conf" options. For detailed configurations, please refer to this section of the documentation page.
Other Notable Features
The Schema Provider, specified through "--schemaprovider-class", serves the schema for reading from the Source and writing to the target table. A notable implementation of this is the SchemaRegistryProvider
, which proves particularly useful when integrating with a KafkaSource
. This implementation enables Hudi Streamer to access Kafka's schema registry, ensuring that data ingested from Kafka is accurately interpreted and processed.
The "--checkpoint" and "--initial-checkpoint-provider" facilitate pausing and resuming data fetching from the Source, avoiding data loss or duplication. The "--post-write-termination-strategy-class" allows for a graceful shutdown of Hudi Streamer in the "continuous" mode. The "--run-bootstrap" flag instructs the Hudi Streamer to perform a one-time bootstrap operation for a new Hudi table.
Recap
In this post, we've provided an overview of Hudi Streamer's workflow, followed by an in-depth exploration of its diverse options in Hudi Streamer. These discussions aim to highlight Hudi's rich platform capabilities in building an end-to-end ingestion pipeline. Please feel free to share your feedback and suggest content in the comments section.
Apache Hudi has a thriving community - come and engage with us via Slack, GitHub, LinkedIn, X (Twitter), and YouTube!
For running with Flink, Hudi offers a similar utility tool HoodieFlinkStreamer
.
To activate the scheduling options, Hudi Streamer also needs to be running in the "continuous" mode and the target table type should be MoR.