[SPARK-57478][SQL] Read text files from tar archives#56527
[SPARK-57478][SQL] Read text files from tar archives#56527akshatshenoi-db wants to merge 2 commits into
Conversation
Extend the text data source to read .tar/.tar.gz/.tgz archives when spark.sql.files.archive.reader.enabled is set. Each entry is streamed through the text reader (one row per line, or a whole-entry row under wholeText), never unpacked to disk; the whole archive is one non-splittable unit. Text has a fixed value STRING schema, so there is no inference. V1-only; DSv2 untouched.
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits.
Clean, minimal port of the archive-read pattern to text; readArchive faithfully mirrors readToUnsafeMem. One non-blocking cross-path consistency note on the (intentionally untouched) V2 text source.
Design / architecture (1)
TextFileFormat/ v2 text source: witharchiveFormatEnabled=true, a.tarread through the V1 source works correctly (this PR), but the V2 text source has no archive awareness —TextPartitionReaderFactoryreads viaHadoopFileLinesReader, so it scans the raw tar bytes as text lines and silently returns garbage rows. This is consistent with how V2 behaves for the sibling formats today rather than a new divergence: JSON has no archive support on master yet, and V2 CSV only fails loudly (UNABLE_TO_INFER_SCHEMA) while inferring — given an explicit.schema(...)the guard inCSVTable.inferSchemais bypassed (FileTable.dataSchemaprefers the user schema) and V2 CSV silently mis-reads too. Text has no inference step at all, so there is nowhere to hang that guard. Not a regression (text never supported archives on any path before this PR) and the V2 source is intentionally out of scope here, so non-blocking — but worth a follow-up to give the V2 path a uniform archive guard across formats.
Verification
Traced readArchive against the per-file readToUnsafeMem: identical value-column rows — an empty required schema yields the shared emptyUnsafeRow, otherwise unsafeRowWriter.reset()/write(0, bytes, 0, length)/getRow() with length-aware Text handling (getBytes+getLength, not bytes.length); wholeText reads the whole entry as one row. The single reused UnsafeRowWriter/Text buffer is safe across entries because ArchiveReader advances to the next entry only after the current entry's iterator is exhausted (parse-before-advance), so no live row aliases a buffer about to be overwritten. isSplitable=false makes the whole archive one split; empty and corrupt (ignoreCorruptFiles) archives are covered by the suite.
PR description suggestions
- Note the V2 limitation: with the flag enabled, the DSv2 text path silently reads archive bytes as text (it has no archive guard). This matches the other formats' V2 behavior today — JSON has no archive support yet, and V2 CSV's loud
UNABLE_TO_INFER_SCHEMAonly applies when inferring, not with an explicit schema.
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 2 non-blocking, 0 nits.
Clean, faithful port of the archive-read pattern to the V1 text source; readArchive mirrors readToUnsafeMem and the CSV analogue, and the new suite is thorough.
Design / architecture (1)
- V2 text source has no archive awareness: with the flag on and text forced through DSv2 (non-default), an archive is silently read as raw tar bytes (garbage rows), where v2 CSV/JSON raise
UNABLE_TO_INFER_SCHEMA. Not a regression and V2 is scoped out, so non-blocking, but worth a follow-up ticket. This concurs with the existing review on this PR; not re-flagging.
Suggestions (1)
- TextTarArchiveReadSuite.scala:173:
ignoreMissingFilesis named in the design description but onlyignoreCorruptFilesis tested — see inline.
Verification
Traced readArchive against the per-file readToUnsafeMem: identical value-column rows — an empty required schema yields the shared emptyUnsafeRow, otherwise length-aware writes into the single value column (getBytes + getLength, not bytes.length); wholeText reads the whole entry as one row. The single reused UnsafeRowWriter/Text buffer is safe across entries because ArchiveReader advances to the next entry only after the current one's iterator is exhausted, so no returned row aliases a buffer about to be overwritten. isSplitable=false makes the archive one split. BOM handling matches the non-archive path in the default config: the archive line path strips the UTF-8 BOM via ArchiveReader.lineIterator, as does the non-archive path via HadoopLineRecordReader (default on).
PR description suggestions
- Document the V2 limitation: with the flag enabled, the DSv2 text path silently reads archive bytes as text rather than raising
UNABLE_TO_INFER_SCHEMAlike CSV/JSON, because text has no schema-inference guard.
| } | ||
|
|
||
| Seq(true, false).foreach { ignoreCorrupt => | ||
| test(s"ignoreCorruptFiles=$ignoreCorrupt controls whether a corrupt archive is skipped") { |
There was a problem hiding this comment.
Optional: the design description names ignoreMissingFiles alongside ignoreCorruptFiles, but only the corrupt case is covered here. Consider a parameterized ignoreMissingFiles case mirroring this test (a missing archive path skipped vs. erroring). Low-value since the behavior is inherited from FileScanRDD and isn't archive-specific, but it's the one behavior named in the description that has no test.
Mirror the ignoreCorruptFiles parameterized test: a missing archive path is skipped under ignoreMissingFiles and errors otherwise. Covers the one behavior named in the PR description that previously had no test.
What changes were proposed in this pull request?
SPARK-57135 added reading CSV files packed in tar archives (
.tar/.tar.gz/.tgz) and SPARK-57321 added schema inference for them; SPARK-57419 extended both to JSON. All are gated byspark.sql.files.archive.reader.enabled. This extends archive reading to the text data source.When the flag is enabled, the V1 text data source reads a tar archive as if it were a directory of its entries: each entry is streamed through
ArchiveReader(never unpacked to disk) and read exactly like a standalone text file -- one row per line, or a single row holding the whole entry whenwholeTextis set (TextFileFormat.readArchive). The whole archive is one non-splittable unit (isSplitablereturns false for an archive path), and a corrupt/missing archive is skipped as a unit underignoreCorruptFiles/ignoreMissingFiles.Text has a fixed
value STRINGschema, so there is no schema inference. Archive scanning is wired into the V1 file source only; the DSv2 reader is left untouched.Why are the changes needed?
To let text ingestion read tar archives without unpacking them to disk, matching the CSV and JSON behavior already in Spark.
Does this PR introduce any user-facing change?
Yes. With
spark.sql.files.archive.reader.enabled=true(default false), the text data source can read.tar/.tar.gz/.tgzfiles.How was this patch tested?
New
TextTarArchiveReadSuite: reads of multi-entry archives across all three extensions, parity with a directory read of the same files,wholeTextand a custom line separator, empty and corrupt archives, the single-partition guarantee, and an archive mixed with loose files in the same directory.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code