Skip to content

Support firstWithTime/lastWithTime aggregation types for MergeRollupTask#18724

Open
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:support-first-last-aggregation-merge-rollup
Open

Support firstWithTime/lastWithTime aggregation types for MergeRollupTask#18724
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:support-first-last-aggregation-merge-rollup

Conversation

@xiangfu0

@xiangfu0 xiangfu0 commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Description

Adds firstWithTime and lastWithTime as accepted aggregationType values for MergeRollupTask (and RealtimeToOfflineSegmentsTask), e.g.:

"MergeRollupTask": {
  "1day.mergeType": "rollup",
  "1day.bucketTimePeriod": "1d",
  "1day.roundBucketTimePeriod": "1d",
  "gaugeMetric.aggregationType": "lastWithTime"
}

The rollup picks the metric value with the earliest/latest event time within each rollup group, which is useful for gauge-style metrics where summing makes no sense and the first/last reading per bucket should be kept. The values map to the existing FIRSTWITHTIME/LASTWITHTIME AggregationFunctionType constants (no new enum values, SQL surface untouched).

Design

A naive first/last on rollup would be non-deterministic: rows within a rollup group are ordered by an unstable quicksort, and the time column is rounded in place during the map phase, destroying the original ordering before reduce.

Instead, when an order-sensitive aggregation is configured:

  • SegmentMapper/EpochTimeHandler preserve the original (pre-rounding) epoch-millis time in a hidden column ($originalTimeMs$) appended as the last sort field of the intermediate generic row files. The presence of the hidden field is carried as an explicit hasOriginalTimeField flag on GenericRowFileManager (set by the mapper that appends the field, validated against the field layout) rather than inferred from the column name, so a schema column that happens to be named $originalTimeMs$ can never be mistaken for it.
  • RollupReducer compares group keys on all sort fields except the hidden one, so rows within each group arrive sorted by original time; the first/last aggregators then simply keep the accumulated value / take the new value. Null values are skipped consistently with the other aggregators (first/last non-null value).
  • The hidden column is stripped before output segments are built.

Semantics note

The ordering is based on the time column values of the input segments of each rollup task. Within a single rollup pass the ordering is exact (pre-rounding time). Across multiple passes (multi-level merges, or re-merging with late arriving data), the ordering is based on the already-rounded time of the earlier pass, so it is approximate at the granularity of the previous round bucket. Rows with identical time values within a group are ordered arbitrarily, so the pick among exact ties is non-deterministic, including across task retries.

Validation

  • MergeRollupTaskGenerator.validateTaskConfigs now validates *.aggregationType values: the type must parse and have an available value aggregator, and firstWithTime/lastWithTime additionally requires the table to have a time column and the column to be a METRIC in schema. Invalid or unsupported aggregation type values now fail at table config validation instead of only at task runtime. The first/last prerequisite checks are shared between both task generators via MergeTaskUtils.validateOrderSensitiveAggregation.
  • The AVAILABLE_CORE_VALUE_AGGREGATORS allowlist moved from RealtimeToOfflineSegmentsTask to the shared MergeTask constants (binary/source compatible — RealtimeToOfflineSegmentsTask extends MergeTask) so both generators validate against the same set, with FIRSTWITHTIME/LASTWITHTIME added.
  • Same metric-column check added to RealtimeToOfflineSegmentsTaskGenerator.

Upgrade note

If firstWithTime/lastWithTime is configured while old minions are still running, those minions fail the task loudly (ValueAggregatorFactory rejects the unsupported aggregation type — no silent wrong results). Configure the new aggregation types after minions are upgraded.

Testing

  • ReducerTest: ordering correctness with shuffled input, null handling for both first and last, failure when the time column is missing, a regression test that a real schema column named $originalTimeMs$ (without first/last configured) is treated as a normal group column, and the flag/layout consistency check.
  • BaseSegmentProcessorFrameworkTest#testRollupWithFirstLastAggregation: end-to-end map/reduce/segment-build with time rounding, partitioning, and out-of-order input.
  • MergeRollupFirstLastTaskExecutorTest: minion executor end-to-end with firstWithTime/lastWithTime in task config, verifying time-order (not input-order) semantics across segments.
  • MergeTaskUtilsTest / MergeRollupTaskGeneratorTest / RealtimeToOfflineSegmentsTaskGeneratorTest: parsing and config validation.

@xiangfu0 xiangfu0 added the feature New functionality label Jun 10, 2026
@codecov-commenter

codecov-commenter commented Jun 10, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 90.21739% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.66%. Comparing base (ca5c042) to head (8cb7115).

Files with missing lines Patch % Lines
.../org/apache/pinot/core/common/MinionConstants.java 0.00% 2 Missing ⚠️
...t/processing/framework/SegmentProcessorConfig.java 71.42% 1 Missing and 1 partial ⚠️
...gment/processing/timehandler/EpochTimeHandler.java 66.66% 2 Missing ⚠️
...t/processing/genericrow/GenericRowFileManager.java 87.50% 0 Missing and 1 partial ⚠️
...nt/processing/genericrow/GenericRowFileReader.java 66.66% 0 Missing and 1 partial ⚠️
...cessing/genericrow/GenericRowFileRecordReader.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18724      +/-   ##
============================================
- Coverage     64.67%   64.66%   -0.02%     
- Complexity     1309     1319      +10     
============================================
  Files          3381     3383       +2     
  Lines        209820   209891      +71     
  Branches      32804    32825      +21     
============================================
+ Hits         135700   135723      +23     
- Misses        63220    63259      +39     
- Partials      10900    10909       +9     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.66% <90.21%> (-0.02%) ⬇️
temurin 64.66% <90.21%> (-0.02%) ⬇️
unittests 64.66% <90.21%> (-0.02%) ⬇️
unittests1 56.97% <85.71%> (-0.01%) ⬇️
unittests2 37.17% <31.52%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 requested review from Jackie-Jiang and Copilot June 10, 2026 04:25
@xiangfu0 xiangfu0 added the query Related to query processing label Jun 10, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Pinot’s minion merge/rollup pipeline to support order-sensitive “first”/“last” metric aggregation (mapped to existing FIRSTWITHTIME/LASTWITHTIME) by preserving the pre-rounding event time for deterministic selection within each rollup group.

Changes:

  • Add "first"/"last" (and existing *WithTime) parsing + validation for minion task configs (MergeRollup + RealtimeToOfflineSegments).
  • Update the segment processing rollup pipeline to preserve original epoch-millis time in a hidden $originalTimeMs$ sort field and exclude it from the rollup group key/output.
  • Add FirstWithTimeValueAggregator/LastWithTimeValueAggregator support plus unit/integration-style tests validating ordering and null-handling.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java Adds coverage for "first"/"last" alias parsing to FIRSTWITHTIME/LASTWITHTIME.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java Adds validation tests ensuring first/last require time column + metric field.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupFirstLastTaskExecutorTest.java New executor e2e test validating time-order (not input-order) semantics across segments.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java Validates aggregationType values via shared parsing + adds metric-only constraint for first/last-with-time.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java Introduces shared getAggregationFunctionType() helper with "first"/"last" aliases.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java Adds early validation for misconfigured aggregation types + first/last prerequisites.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java Adds reducer-level tests for ordering correctness, null handling, and failure without original-time field.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/BaseSegmentProcessorFrameworkTest.java Adds end-to-end rollup test with rounding/partitioning verifying hidden column is not emitted.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java Adds logic to include $originalTimeMs$ as a hidden last sort field when needed.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java Passes “store original time” flag into EpochTimeHandler based on rollup config.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandler.java Defines the reserved hidden column name $originalTimeMs$.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java Preserves pre-rounding millis into the hidden column when enabled.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java Excludes hidden original-time column from group-key comparisons and strips it from output.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java Requests original-time field inclusion when order-sensitive aggregations are configured.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileRecordReader.java Adds compare overload to compare only the first N fields (group key excluding hidden time).
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java Adds compare overload supporting “first N fields” comparison.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java Enables FIRSTWITHTIME/LASTWITHTIME in value-aggregator factory.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/LastWithTimeValueAggregator.java New value-aggregator implementation for “last by time” (order-dependent).
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/FirstWithTimeValueAggregator.java New value-aggregator implementation for “first by time” (order-dependent).
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java Adds "first"/"last" constants and allows FIRST/LASTWITHTIME for RealtimeToOffline value aggregation.

@xiangfu0 xiangfu0 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

@xiangfu0 xiangfu0 changed the title Support first/last aggregation types for MergeRollupTask Support firstWithTime/lastWithTime aggregation types for MergeRollupTask Jun 11, 2026
@xiangfu0 xiangfu0 force-pushed the support-first-last-aggregation-merge-rollup branch from a4c6cf5 to eeb10d1 Compare June 11, 2026 20:22
The segment mapper appends a hidden $originalTimeMs$ column carrying the
original (pre-rounding) time as the last sort field, so the rollup reducer
can pick the first/last metric value within each rollup group by the
original time order. The hidden field is carried as an explicit flag on
GenericRowFileManager and stripped before output segments are built.

- FirstWithTime/LastWithTimeValueAggregator created by ValueAggregatorFactory;
  the shared order-sensitivity predicate ValueAggregatorFactory.isOrderSensitive
  is used by RollupReducer, MergeTaskUtils and
  SegmentProcessorConfig#requiresOriginalTimeOrdering
- MergeRollup/RTO generators validate aggregation type configs: types must
  parse and be in the shared AVAILABLE_CORE_VALUE_AGGREGATORS allowlist;
  first/last additionally requires a metric column and a resolvable DateTime
  time column
- Tests: reducer ordering and null handling, framework e2e (millis and
  seconds time formats), minion executor single and multi pass, generator
  validation
@xiangfu0 xiangfu0 force-pushed the support-first-last-aggregation-merge-rollup branch from eeb10d1 to 8cb7115 Compare June 11, 2026 20:43
@xiangfu0 xiangfu0 requested a review from Copilot June 16, 2026 01:25

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 24 changed files in this pull request and generated 3 comments.

Comment on lines +516 to +518
if (entry.getKey().endsWith(MergeTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
String column = StringUtils.removeEnd(entry.getKey(), MergeTask.AGGREGATION_TYPE_KEY_SUFFIX);
try {
Comment on lines +154 to +157
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
Preconditions.checkState(fieldSpec != null && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
"Aggregation type: %s on column: %s requires the column to be a metric column in schema!", aggregationType,
column);
Comment on lines +98 to +101
public int compare(int rowId1, int rowId2, int numFieldsToCompare) {
assert _sortedRowIds != null;
return _fileReader.compare(_sortedRowIds[rowId1], _sortedRowIds[rowId2], numFieldsToCompare);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality query Related to query processing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants