Skip to content

Flink: Add equality delete conversion API and integration tests#16948

Open
mxm wants to merge 4 commits into
apache:mainfrom
mxm:resolve-equality-deletes.breakdown6
Open

Flink: Add equality delete conversion API and integration tests#16948
mxm wants to merge 4 commits into
apache:mainfrom
mxm:resolve-equality-deletes.breakdown6

Conversation

@mxm

@mxm mxm commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Flink's upsert mode writes equality deletes, which are cheap to produce but force every reader to join the data against the delete files (merge-on-read). ConvertEqualityDeletes is a maintenance task that rewrites them into deletion vectors on a target branch, so reads apply deletes by position. The writer keeps appending equality deletes to a staging branch while the conversion runs in the background.

Example:

    TableMaintenance.forTable(env, tableLoader)
        .add(
            ConvertEqualityDeletes.builder()
                .stagingBranch("staging")
                .targetBranch("main")
                .equalityFieldColumns(List.of("id"))
                .scheduleOnEqDeleteFileCount(1)
        .append();

The table must be V3 for deletion vectors, the equality field columns must match the writer's, and every partition column must be an equality field, because the converter keys rows on equality values alone and would otherwise resolve a delete against the wrong partition. The conversion task has checks in place to ensure this.

This PR is factored out of #15996.

Flink's upsert mode writes equality deletes, which are cheap to produce but
force every reader to join the data against the delete files (merge-on-read).
ConvertEqualityDeletes is a maintenance task that rewrites them into deletion
vectors on a target branch, so reads apply deletes by position. The writer keeps
appending equality deletes to a staging branch while the conversion runs in the
background.

Example:
```
    TableMaintenance.forTable(env, tableLoader)
        .add(
            ConvertEqualityDeletes.builder()
                .stagingBranch("staging")
                .targetBranch("main")
                .equalityFieldColumns(ImmutableList.of("id"))
                .scheduleOnEqDeleteFileCount(1)
        .append();
```

The table must be V3 for deletion vectors, the equality field columns must match
the writer's, and every partition column must be an equality field, because the
converter keys rows on equality values alone and would otherwise resolve a
delete against the wrong partition. The conversion task has checks in place to
ensure this.
@github-actions github-actions Bot added the flink label Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants