Apache Hudi: From Zero To One (3/10)
Understand write flows and operations
Apache Hudi: From Zero To One
In the previous post, we discussed Hudi query types and their integration with Spark. In this post, we will delve into the other aspect - write flows, with Spark as the example engine. There are numerous configurations and settings you can adjust when it comes to writing data. Therefore, this post does not aim to serve as a complete usage guide. Instead, my primary goal is to present the internal data flows and break down the steps involved. This will provide readers with a deeper understanding of running and fine-tuning Hudi applications. For various practical usage examples, please consult Hudi’s official documentation page.
Overall Write Flow
The picture below illustrates the typical high-level steps involved in a Hudi write operation within the context of an execution engine. I will provide a brief introduction to each step in this section.
Thanks for reading Datumagic! Subscribe for free to receive new posts and support my work.
Create write client
A Hudi write client serves as the entry point for write operations, and Hudi write support is achieved by creating an engine-compatible write client instance. For instance, Spark utilizes the
SparkRDDWriteClient, Flink employs the
HoodieFlinkWriteClient, and Kafka Connect generates the
HoodieJavaWriteClient. Typically, this step involves reconciling user-provided configurations with the existing Hudi table properties and subsequently passing the final configuration set to the client.
Before a write client processes the input data, several transformations occur, including the construction of
HoodieRecords and schema reconciliation. Let's delve deeper into the
HoodieRecord, as it is a fundamental model in the write paths.
Hudi identifies unique records using the
HoodieKey model, which consists of "recordKey" and "partitionPath". These values are populated by implementing the
KeyGenerator API. This API offers flexibility in extracting and transforming custom fields into the key based on the input schema. For usage examples, please refer to the documentation page.
Both "currentLocation" and "newLocation" consist of a Hudi Timeline's action timestamp and a FileGroup's ID. Recalling the logical FileGroup and FileSlice concepts from post 1, the timestamp points to a FileSlice within a specific FileGroup. The "location" properties are employed to locate physical files using logical information. If "currentLocation" is not null, it indicates where a record with the same key exists in the table, while "newLocation" specifies where the incoming record should be written.
The "data" field is a generic type that contains the actual bytes for the record, also known as the payload. Typically, this property implements
HoodieRecordPayload, which guides engines on how to merge an old record with a new one. Starting from release 0.13.0, a new experimental interface,
HoodieRecordMerger, has been introduced to replace
HoodieRecordPayload and serve as the unified merging API.
At this step, a write client always checks if there are any failed actions remaining on the table's Timeline and performs a rollback accordingly before initiating the write operation by creating a "requested" commit action on the Timeline.
HoodieRecords may optionally undergo deduplication and indexing based on user configurations and the operation type. If deduplication is necessary, records with the same key will be merged into one. If indexing is required, the "currentLocation" will be populated if the record exists.
The topic of indexing logic with various index types is crucial and warrants a dedicated post. For the purpose of understanding write flows, it is important to remember that an index is responsible for locating physical files for the given records.
This is an essential pre-write step that determines which record goes into which FileGroup and, ultimately, which physical file. Incoming records will be assigned to update buckets and insert buckets, implying different strategies for subsequent file writing. Each bucket represents one RDD partition for distributed processing, as is the case with Spark.
Write to storage
This is when the actual I/O operations occur. Physical data files are either created or appended to using file writing handles. Before that, marker files may also be created in the
.hoodie/.temp/ directory to indicate the type of write operation that will be performed for the corresponding data files. This is valuable for efficient rollback and conflict resolution scenarios.
After data is written to disk, there may be a need to immediately update the index data to ensure read/write correctness. This applies specifically to index types that are not synchronously updated during writing, such as the HBase index hosted in an HBase server.
In this final step, the write client will undertake multiple tasks to correctly conclude the transactional write. For example, it may run pre-commit validation if configured, check for conflicts with concurrent writers, save commit metadata to the Timeline, reconcile WriteStatus with marker files, and so on.
Upserting data is a common scenario in Lakehouse pipelines. In this section, we will delve into the Upsert flow for CoW table in detail, followed by a brief overview of all other supported write operations.
Write client starts the commit and creates the "requested" action on Timeline.
Input records undergo the preparation step: duplicates are merged, and target file locations are populated by the index. At this point in the process, we have the exact records to be written and know which of those exist in the table, along with their respective locations (FileGroups).
Prepared records are categorized into "update" and "insert" buckets. Initially, a WorkloadProfile is constructed to gather information on the number of updates and inserts in the relevant physical partitions. This data is then serialized into an "inflight" action on the Timeline. Subsequently, based on the WorkloadProfile, buckets are generated to hold the records. For updates, each updating FileGroup is assigned as an update bucket. In the case of inserts, the small-file handling logic comes into play: any BaseFile smaller than a specified threshold (determined by
hoodie.parquet.small.file.limit) becomes a candidate for accommodating the inserts, with its enclosing FileGroup being designated as an update bucket. If no such BaseFile exists, insert buckets will be allocated, and new FileGroups will be created for them later.
The bucketized records are then processed through file-writing handles for actual persistence to storage. In the case of records in the update buckets, "merge" handles are used, resulting in the creation of new FileSlices within the existing FileGroups (achieved by merging with data from the old FileSlices). For records in the insert buckets, "create" handles are utilized, leading to the creation of entirely new FileGroups. This process is done by
HoodieExecutors, which employ a producer-consumer pattern for reading and writing records.
Once all data has been written, the file-writing handles return collections of WriteStatus that contain metadata about the writes, including the number of errors, the number of inserts performed, the total written size in bytes, and more. This information is sent back to the Spark driver for aggregation. If no errors have occurred, the write client will generate commit metadata and persist it as a completed action on the Timeline.
Upserting to a MoR table follows a very similar flow, with a different set of conditions to determine the types of file-writing handles used for both updates and inserts.
Insert & Bulk Insert
The Insert flow is very similar to Upsert, with the key difference being the absence of an indexing step. This implies that the entire writing process is faster (will be even faster if deduplication is turned off), but it may result in duplicates in the table.
Bulk Insert follows the same semantics as Insert, meaning it can also result in duplicates due to the absence of indexing. However, the distinction lies in the absence of small-file handling for Bulk Insert. The records partitioning strategy is determined by setting
BulkInsertSortMode or can be customized by implementing
BulkInsertPartitioner. Bulk Insert also enables row-writing mode by default for Spark, bypassing Avro data model conversion at the "transform input" step and working directly with the engine-native data model
Row. This mode gives even more efficient writes.
Overall, Bulk Insert is generally more performant than Insert but may require additional configuration tuning to address small-file issues.
The Delete flow can be viewed as a special case of the Upsert flow. The primary difference is that, during the "transform input" step, input records are transformed into
HoodieKeys and passed on to subsequent stages, as these are the minimum required data for identifying the records to be deleted. It's important to note that this process results in a hard delete, meaning that the target records will not exist in the new FileSlices of the corresponding FileGroups.
Delete Partition follows a completely different flow compared to those introduced above. Instead of input records, it takes a list of physical partition paths, which is configured via
hoodie.datasource.write.partitions.to.delete. Because there are no input records, processes such as indexing, partitioning, and writing to storage do not apply. Delete Partition saves all the FileGroup IDs of the target partition paths in a
.replacecommit action on the Timeline, ensuring that subsequent writers and readers treat them as deleted.
Insert Overwrite & Insert Overwrite Table
Insert Overwrite completely rewrite partitions with the provided records. This flow can be effectively seen as a combination of Delete Partition and Bulk Insert: it extracts affected partition paths from the input records, marks all existing FileGroups in those partitions as deleted, and simultaneously creates new FileGroups to store the incoming records.
Insert Overwrite Table is a variation of Insert Overwrite. Instead of extracting affected partition paths from input records, it fetches all partition paths of the table for the purpose of overwriting.
In this post, we have explored the common high-level steps in Hudi write paths, delved into the CoW Upsert flow with a detailed explanation of record partitioning logic, and introduced all other write operations. Please feel free to share your feedback and suggest content in the comments section.