Background

Open table formats like ​Apache Iceberg​ have enhanced the ability to store structured/semi-structured data in data lakes. However, when building a Lakehouse architecture based on Apache Iceberg, users inevitably face the challenge of efficiently maintaining all Iceberg tables.

Apache Amoro (Incubating)​​ is a Lakehouse management system that automates the maintenance of data lake tables, including Apache Iceberg. It automatically detects tables requiring optimization, manages execution resources, schedules optimization tasks, and ensures all data lake tables achieve optimal performance with minimal resource overhead. The project has heavily optimized file compaction, delivering ​over 10x performance improvement​ compared to the Iceberg Spark RewriteFiles Procedure. This article deciphers the principles behind Apache Amoro's efficient compaction of Iceberg tables and validates its efficiency with practical test cases.

Compaction Comparison

To objectively compare the compaction efficiency of Apache Amoro and the Apache Iceberg Spark RewriteFiles Procedure, we designed a set of test cases. Under identical write workloads and compaction resources, we compared the execution time of compaction tasks and the query performance before and after compaction.

The test covers two primary scenarios: the append-only scenario​ and ​the upsert scenario.

Comparison Plan

Given that streaming write scenarios demand more frequent file compaction and better highlight efficiency differences, we used an ​Apache Flink Streaming​ job to continuously write data to an Iceberg table. Two compaction methods were applied concurrently, and ​Apache Spark​ was used to query the table data.

None

Table structure:

CREATE TABLE orders (
  user_id BIGINT COMMENT 'User ID',
  user_city STRING COMMENT 'User city location',
  user_gender INT COMMENT 'User gender, 0: female, 1: male',
  user_age INT COMMENT 'User age',
  order_id BIGINT NOT NULL COMMENT 'Order ID',
  order_status INT COMMENT 'Order status, 0: completed, 1: canceled, 2: ordered',
  order_duration INT COMMENT 'Product duration in days',
  buy_times INT COMMENT 'Purchase count (which time to buy)',
  payment_way STRING COMMENT 'Payment method',
  product_id STRING COMMENT 'Product ID',
  product_prices DOUBLE COMMENT 'Product price',
  product_discount DOUBLE COMMENT 'Product discount',
  product_color STRING COMMENT 'Product color',
  product_tags STRING COMMENT 'Product tags',
  delivery_id BIGINT COMMENT 'Delivery tracking number',
  order_date BIGINT COMMENT 'Order date in timestamp',
  feedback_level INT COMMENT 'Rating stars (1-5)')
USING iceberg

Query to validate before and after compaction efficiency:​

SELECT order_id FROM orders ORDER BY order_id LIMIT 1;

Append-Only Scenario

In this scenario, the Flink task generates a commit every minute. With a write parallelism of 8, each commit produces 8 files, each averaging ​6 KB​ in size. Frequent small file commits necessitate timely compaction. The target compacted file size is ​128 MB.

Resource Allocation:​​

  • Flink write job: ​8 cores, 16 GB memory​
  • Spark compaction job: ​1 core, 2 GB memory​
  • Amoro compaction job: ​1 core, 2 GB memory​
  • Spark query job: ​4 cores, 8 GB memory​

Spark RewriteFiles Procedure Results

The compaction frequency significantly impacts efficiency. We tested intervals of ​1 minute, ​10 minutes, and ​60 minutes.

Below are the compaction results with one compaction performed every minute, where the writing process lasted for an hour (partial results have been trimmed)

None

The total compaction time for the entire process is ​394 seconds, and the average query time before compaction is ​0.7 seconds.

Excessively frequent compaction intervals lead to severe ​write amplification​ caused by the rewrite process, but they ensure low query latency on the table.

Below are the results of performing a compaction every 10 minutes, with the writing process lasting for one hour:

None

The total compaction time for the entire process is ​75.3 seconds, and the average query time before compaction is ​1.7 seconds.

Reducing the compaction frequency significantly alleviates the ​write amplification issue, but it also leads to a notable increase in query time on the table.

Below is the result of performing a compaction every 1 hour, with the writing process lasting for 1 hour:

None

The total compaction time for the entire process is ​23.8 seconds, and the average query time before merging is ​7.0 seconds.

By further reducing the compaction frequency, the ​write amplification issue​ caused by merging continues to decrease. However, the query time on the table also increases significantly due to the accumulation of small files.

Apache Amoro Results

Apache Amoro ​automatically detects​ file conditions in tables and generates reasonable compaction plans, scheduling resources efficiently.

Below are the results of self-optimizing by Apache Amoro:

None
None

The total compaction time for the entire process is ​90 seconds, and the average query time before merging is ​1.0 seconds.

Apache Amoro monitors the accumulation of fragment files in tables in real-time (with a test scenario trigger threshold set to 40 files) and schedules compaction tasks at five-minute intervals. It first merges fragment files (<16MB) into medium-sized files (>16MB), and then progressively consolidates these medium-sized files to the target size. This approach achieves a balance between compaction resource overhead and performance optimization.

Comparison of Results

In the append-only scenario, selecting the appropriate ​compaction frequency​ becomes crucial for balancing ​compaction efficiency​ and ​query latency. Apache Amoro automatically determines the optimal time to trigger compaction based on the small file situation on the table. Compared to Spark Rewrite Files, Amoro achieves ​over 3x higher compaction efficiency​ (as per the first set of comparison data) and ​up to 8x better query performance​ (as per the third set of data).

Furthermore, in real-world scenarios where ​write traffic​ often fluctuates, Amoro's ​adaptive compaction strategy​ delivers significantly greater efficiency improvements compared to fixed-interval compaction schemes.

None

Upsert Scenario

The upsert scenario simulates ​CDC writes. We initialized the table with ​500 million records (230 GB). The Flink write job commits every minute, producing ​16–24 files per commit​ (8 Insert Files, 8 Equality Delete Files, and optionally 8 Position Delete Files). Frequent small files and Delete Files demand highly efficient compaction. The target file size is ​128 MB.

​Resource Allocation:​​

  • Flink write job: ​8 cores, 16 GB memory​
  • Spark compaction job: ​32 cores, 64 GB memory​
  • Amoro compaction job: ​32 cores, 64 GB memory​
  • Spark query job: ​32 cores, 64 GB memory​

Spark RewriteFiles Procedure Results

Below are the results of performing a compaction every 5minutes, with the writing process lasting for 30 minutes:

None

From the results, as writing operations continue, query latency increases progressively and does not decrease despite the execution of compaction tasks. Investigation reveals that this is because a large number of Delete Files are not being merged with historical insert files. As Delete Files accumulate, query performance on the table deteriorates significantly.

We added the parameter delete-file-threshold = 10 to the compaction job to trigger compaction when Felete Files accumulate on the table, thereby improving query performance.

Below are the results after adding the parameter, where compaction is executed every 10 minutes over a 1-hour period:

None

The entire compaction process took 2596 seconds. The average query time before compaction is 36.8 seconds, and after compaction, it is 29 seconds.

Apache Amoro Results

Below are the results of self-optimizing by Apache Amoro:

None
None

The entire compaction process took 583 seconds. The average response time before compaction is 32.8 seconds, and after compaction, it is 30.6 seconds. In Amoro's statistics, Delete Files that are repeatedly read during the compaction process are counted multiple times. Therefore, the number of files and the total file size before compaction are larger than those measured by Spark's RewriteFiles method.

Comparison of Results

In Upsert scenarios, a large number of Delete Files are generated, especially when using Flink for real-time writes, which simultaneously produce both Position Delete Files and Equality Delete Files. Among these, Equality Delete Files significantly impact the analytical performance of the table. The Spark RewriteFiles procedure, by default, does not merge these Delete Files, leading to progressively worse analytical performance on the table. This situation necessitates the use of the delete-file-threshold parameter to promptly merge the Delete Files; however, the merging process nearly rewrites the entire table, resulting in enormous overhead.

Amoro's self-optimizing processes employ ​minor optimizing​ to promptly convert Equality Delete Files into Position Delete Files. This not only avoids the negative impact of Equality Delete Files on query performance but also substantially reduces the cost of compaction. Amoro's self-optimizing processes achieve a nearly ​10x improvement in single merge efficiency​ and a nearly ​5x improvement in overall merge efficiency, while ensuring comparable query performance.

Compared to the Spark RewriteFiles procedure, Amoro ​significantly reduces the amount of data read/written per process, concurrently decreasing the request bandwidth to object storage and mitigating the cost amplification associated with storing new snapshots.

None

The principle of Apache Amoro's Self-Optimizing

From the test results of the above two scenarios, we observed that in the ​Append-Only scenario, Apache Amoro can automatically trigger compaction jobs based on the file status within the table. This avoids the write amplification problem caused by excessively frequent compactions, while also preventing the performance degradation that would occur if compactions were never performed.

In the ​Upsert scenario, Amoro first merges the Equality Delete Files, which significantly impact query performance, into Position Delete Files. This approach avoids the write amplification issue that would arise from frequently compacting historical files.

The compaction principles of Apache Amoro can be summarized into two key characteristics: ​automatic triggering​ and ​layered compaction.

Automatic triggering

None

The following diagram illustrates the architecture of Amoro. Its core components include:

  • Amoro Management Service (AMS): The central management component responsible for receiving read/write metrics reported by computing engines for Iceberg tables. Based on these metrics, it generates compaction jobs, dispatches them to Optimizers for execution, and receives the compaction results. It also exposes REST APIs to provide configuration and inspection interfaces for table maintenance.
  • Optimizer: Execution nodes that pull compaction tasks from the AMS, execute them, and report the results back to the AMS.
  • Metrics Reporter: A plugin installed within computing engines that collects read/write events on Iceberg tables and reports them to the AMS.
  • ​Meta Storage: The metadata storage system, typically a relational database.

The execution of an automatic optimization task for a table involves the following steps:

  1. A write operation occurs on the table within a computing engine.
  2. The Metrics Reporter generates a table write event and reports it to the AMS.
  3. The AMS uses the metrics to determine if the file fragmentation on the table has reached the compaction threshold.
  4. The AMS generates a compaction task and splits it into multiple subtasks.
  5. Optimizers pull the compaction tasks from the AMS and execute them.
  6. The AMS receives the execution results of all tasks and commits the compaction results.

A critical step in the above process is determining whether a table should be compacted and how to perform the compaction. This is key to ensuring the efficiency of table compaction. We refer to this compaction algorithm as ​Layered Compaction.

Layered compaction​

To ensure the efficiency of table compaction, we have categorized the compaction tasks for Iceberg tables into three types.

None

They are:

  • Minor Optimizing: Focuses on quickly merging fragment files on the table and converting Equality Delete Files into Position Delete Files. It is typically executed every few minutes in a streaming writing scenario.
  • Major Optimizing: Aims to merge medium-sized files to the target size. When too many Position Delete Files accumulate, merge them with the corresponding Data Files. This process is usually executed hourly.
  • Full Optimizing: Involves a complete rewrite of the entire table or specific partitions, often including a full table sort. This is generally performed on a daily basis.

Let's further compare the differences between Amoro and the Spark Rewrite Files Procedure to understand why they lead to variations in compaction efficiency.

None

The above is a comparison between Spark RewriteFiles and Amoro Self-Optimizing in the Append-Only scenario. Spark RewriteFiles merges all files that do not meet the target size in each operation, whereas Amoro first consolidates fragment files into medium-sized files and then merges those medium-sized files into the target-sized files. By splitting the process into two steps, Amoro effectively reduces the repeated reading and writing of larger files.

None

In Upsert scenarios, Spark RewriteFiles requires frequent rewriting of almost all Insert Files, a process that leads to significant write amplification issues. In contrast, the ​Minor Optimizing​ feature within Amoro's Self-Optimizing mechanism promptly converts ​Equality Delete Files​ in the table into ​Position Delete Files. This approach effectively avoids the write amplification problem associated with existing Insert Files. The process of transforming Equality Delete Files into Position Delete Files only requires reading the Upsert keys from the Insert Files, which greatly enhances the efficiency of compaction. This efficiency improvement is positively correlated with the number of fields in the table: the more columns a table has, the greater the performance gain achieved.