[multistage] Fix stream-mode stats handling for plugin-defined operator types (follow-up to #18458)#18736
Conversation
…or types Follow-up to apache#18458. While exercising the OperatorTypeDescriptor SPI from an out-of-tree plugin extension we found that plugin-defined operator types (ids >= 256) do not work end to end: - The broker rendered stageStats by pairing the flat per-stage stats list against the stage's PlanNode tree, assuming built-in types at every position. With plugin types, visitMailboxSend read PARALLELISM through the built-in StatMap key class (an EnumMap with a different key class returns 0) and every division by parallelism threw ArithmeticException, replacing the whole stageStats tree with an error object. Even without the division, the pairing silently dropped every plugin-typed node. - OperatorTypeRegistry discovered descriptors only via the context-classpath ServiceLoader, so descriptors packaged in PluginManager-isolated plugin jars were never registered and the broker marked their stages mergeFailed. Fixes: - Render stream-mode stages directly from the decoded StageStatsTreeNode (the explicit shape is exactly what the new wire format carries) instead of re-deriving the shape from PlanNodes: MultiStageStatsTreeBuilder gains an explicit-tree mode, QueryDispatcher.QueryResult carries the per-stage trees, and sender stages are nested under mailbox-receive nodes resolved by plan-node id (the broker reproduces the same pre-order numbering the servers use). PIPELINE_BREAKER nodes are collapsed so the JSON shape stays identical to the legacy renderer; nodes now also expose planNodeIds. - OperatorTypeRegistry walks PluginManager.get().getPluginClassLoaders() like PinotRuleSet, deduplicating providers by class name. - OpChainExecutionContext.getPlanNodeIdMap() lets plugin operators register synthetic stats operators for plan-node-id resolution. - InStageStatsTreeBuilder (legacy path) only reads PARALLELISM when the stats at the send position really are the built-in MAILBOX_SEND ones, and guards the division. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18736 +/- ##
============================================
+ Coverage 64.52% 64.78% +0.25%
- Complexity 1305 1309 +4
============================================
Files 3380 3380
Lines 209642 209630 -12
Branches 32776 32822 +46
============================================
+ Hits 135277 135801 +524
+ Misses 63503 62893 -610
- Partials 10862 10936 +74
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
yashmayya
left a comment
There was a problem hiding this comment.
Reviewed the plugin-type rendering fix (follow-up to #18458). The plugin /0 fix, the registry classloader walk, the legacy visitMailboxSend defensive guard, and cross-stage plan-node-id determinism all check out, and the top-level mergeInto aggregation + broker-side plugin StatMap.Key deserialization are already plugin-safe. One parity gap inline.
| } | ||
| JsonNode executionTimeMs = json.get("executionTimeMs"); | ||
| if (executionTimeMs != null) { | ||
| json.put("clockTimeMs", executionTimeMs.asLong(0) / parallelism); |
There was a problem hiding this comment.
The explicit-tree renderer drops the self* stat fields that the legacy renderer emits, so turning on stream stats changes the per-stage JSON — which the PR description says this mode must not do ("bit-compatible … external tooling parses this format").
InStageStatsTreeBuilder.selfNode adds selfExecutionTimeMs/selfClockTimeMs (addClockTimeMs), selfAllocatedMB (addSelfAllocatedBytes) and selfGcTimeMs (addSelfGcTime) — parent-minus-children, when non-zero. jsonFromStatsTreeNode emits clockTimeMs here but none of the self* fields.
It's visible side-by-side in a single stream-mode response (semi-join, stream stats on): the root stage 0 still falls through to the legacy renderer (no StageStatsTreeNode for the broker-local stage) and carries selfExecutionTimeMs/selfClockTimeMs, but every server-reported stage rendered here does not:
MAILBOX_RECEIVE (stage 0, legacy-rendered) selfExecutionTimeMs:223, selfClockTimeMs:223 (no planNodeIds)
MAILBOX_SEND (stage 1) planNodeIds:[0] (no self*)
LEAF mytable planNodeIds:[1,2,3,4] (no self*)
MAILBOX_RECEIVE ...
MAILBOX_SEND (stage 2) planNodeIds:[0] (no self*)
LEAF daysOfWeek planNodeIds:[1,2] (no self*)
So within one response the broker-local root has the self-time breakdown and the server stages don't, and vs legacy mode every node would have it. These feed the controller stage-stats / flamegraph self-time view, so it's a real (if non-fatal) parity regression — and one the tests don't catch (MultiStageStatsTreeBuilderTest / StreamStatsReportingIntegrationTest assert types/planNodeIds/nesting shape, never the self* fields).
jsonFromStatsTreeNode already builds the child JSON above this point, so the parent-minus-children self* (and unconditional clockTimeMs) can be computed the same way addClockTimeMs/addSelfAllocatedBytes/addSelfGcTime do. (If dropping them is intentional, worth softening the "bit-compatible" wording instead.) Either way a test asserting a server stage's node carries self* would lock the parity down.
FWIW I also checked the cross-stage nesting for the dynamic-broadcast build side (whether the sender stage gets dropped from the tree) — that one's fine: the LEAF's own planNodeIds include the pipeline-breaker MailboxReceiveNode, so the sender stage (daysOfWeek above) nests correctly. Only the self* fields diverge.
The jsonFromStatsTreeNode path emitted clockTimeMs but omitted the selfExecutionTimeMs, selfClockTimeMs, selfAllocatedMB and selfGcTimeMs derived fields that InStageStatsTreeBuilder produces via parent-minus- children subtraction. This caused a parity regression when stream-stats mode was active: any stage rendered via the explicit-tree path would be missing these fields in the broker response JSON, breaking tooling that parses the stageStats output. Adds the same parent-minus-children computation with a sumChildrenStat helper that mirrors the PIPELINE_BREAKER-collapse logic already present in the children-rendering loop, so the derived fields are computed over the same logical set of children as the legacy renderer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
yashmayya
left a comment
There was a problem hiding this comment.
Re-reviewed the new commit (Add self* derived fields to explicit-tree stats renderer). It addresses the prior self* parity gap — the fields are back and match the legacy renderer for the common cases (non-pipeline-breaker nodes), and the computation correctly does not subtract the cross-stage sender stage at a MAILBOX_RECEIVE (matching legacy, where the sender is appended after selfNode).
LGTM — approving. One minor, non-blocking parity nit inline (the pipeline-breaker leaf case over-subtracts self*); worth a quick follow-up but not a blocker.
| private static long sumChildrenStat(StageStatsTreeNode node, String statKey) { | ||
| long sum = 0; | ||
| for (StageStatsTreeNode child : node.getChildren()) { | ||
| if (child.getType().getId() == MultiStageOperator.Type.PIPELINE_BREAKER.getId()) { |
There was a problem hiding this comment.
Minor / non-blocking (parity nit): sumChildrenStat collapses the PIPELINE_BREAKER and subtracts its receive's stats from the LEAF — but the legacy renderer deliberately does not subtract the breaker. InStageStatsTreeBuilder renders a PB-bearing LEAF via selfNode(..., adjustWithChildren=false), so the leaf's self* stays its full value (the breaker ran pre-stage, so its time isn't part of the leaf's cumulative time).
Net effect: for a semi-join / lookup-join leaf, selfExecutionTimeMs / selfClockTimeMs / selfAllocatedMB / selfGcTimeMs come out smaller than the legacy renderer by the breaker's contribution. I confirmed it with a unit test — a LEAF with executionTimeMs=100 over a PB whose receive has executionTimeMs=30 renders selfExecutionTimeMs=70, where legacy gives 100.
Small fix: skip PIPELINE_BREAKER children in sumChildrenStat (don't recurse into the grandchildren) to mirror adjustWithChildren=false. Everything else in the self* computation matches legacy (including correctly not subtracting the cross-stage sender at a MAILBOX_RECEIVE). Diagnostic-field only, so not a blocker — flagging for a follow-up since it's the one case this renderer was extended for.
sumChildrenStat was recursing into PIPELINE_BREAKER grandchildren and subtracting their stats from the parent's self* fields. The legacy renderer (InStageStatsTreeBuilder) uses adjustWithChildren=false for LEAF nodes with a pipeline-breaker child, so the breaker's cumulative time is never subtracted — the leaf's self* stays its full value. Fix: skip PIPELINE_BREAKER children entirely in sumChildrenStat, matching legacy semantics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
#18458 introduced the
OperatorTypeDescriptorSPI so plugin jars can register custom MSE operator types (ids ≥ 256) and report their stats over theSubmitWithStreamchannel. While exercising that SPI from an out-of-tree plugin extension we found the plugin path does not work end to end — every query whose stages report plugin-typed stats returnedstageStats: {"error": "... java.lang.ArithmeticException: / by zero"}, and on classloader-isolated deployments the descriptors were never registered at all.Root causes
stageStatsby pairing the flat per-stage stats list positionally against the stage'sPlanNodetree (InStageStatsTreeBuilder), assuming built-in types at every position.visitMailboxSendcasts the send-position StatMap to the built-in key class; for a plugin type theEnumMaplookup returns 0, soContext(parallelism=0)makescpuTimeMs / parallelismthrow. Independently, the type-mismatch branch silently drops every plugin-typed node, so even without the exception the plugin stats would never appear.OperatorTypeRegistrydiscovers plugin descriptors with the plain context-classpathServiceLoader, unlikePinotRuleSet, which also walks thePluginManagerplugin classloaders.Fixes
StageStatsTreeNode— the explicit tree shape is exactly what the new wire format carries, so no shape re-derivation is needed and plugin types render faithfully. Cross-stage nesting (sender stage under its mailbox-receive node) is resolved through plan-node ids; the broker reproduces the same pre-order numbering the servers use.PIPELINE_BREAKERnodes are collapsed so the JSON shape stays bit-compatible with the legacy renderer (external tooling parses this format). Nodes additionally exposeplanNodeIds.OperatorTypeRegistrynow walksPluginManager.get().getPluginClassLoaders()(deduplicated by provider class name), matching thePinotRuleSetpattern.OpChainExecutionContext#getPlanNodeIdMap()accessor so plugin operator trees can register synthetic stats operators for id resolution via the existingrecordPlanNodesForOperator.visitMailboxSendonly readsPARALLELISMfrom genuine built-inMAILBOX_SENDstats and clamps the divisor to ≥ 1.Test plan
MultiStageStatsTreeBuilderTest— explicit-tree rendering with plugin-only operator types carrying a plugin-definedStatMap.Keyenum (cross-stage nesting by plan-node id,PIPELINE_BREAKERcollapse parity, legacy-path/0regression)StreamStatsReportingIntegrationTest— 7/7 (including the pipeline-breaker shape tests, which pin JSON parity of the new renderer)MultiStageStatsTreeEncoderTest/MultiStageStatsTreeDecoderTest/StreamingQuerySessionTest/OpChainSchedulerServiceTest/OperatorTypeRegistryTest— all greenstageStatswith correct cross-stage nesting and no errors🤖 Generated with Claude Code