Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-52516

Memory Leak with coalesce foreachpartitions and v2 datasources

    XMLWordPrintableJSON

Details

    Description

      Doing the following should not leak any significant amount of memory.

      sparkSession.sql("select * from icebergcatalog.db.table").coalesce(4).foreachPartition( 
          (iterator) -> { while (iterator.hasNext()) iterator.next(); }
      ); 

      Some of the details of this are contained in this thread here

      https://github.com/apache/iceberg/issues/13297

      In summary there is a bug where adding a heavy reference in 

      context.addTaskCompletionListener

      can lead to an OOM as the callback is preventing garbage collection of those heavy references. In particular doing a coalesce piles up "sub-tasks" such that they cannot be cleaned up until the coalesce task completes.

      This same issue manifested in 2 different scala classes 

      sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
      sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala 

      Iceberg is affected by the first but using the v2 parquet readers are affected by the 2nd.

      The proposed solution is to use a delegate class to de-reference the heavy objects on iterator exhaustion or close. Which only requires changes local to those classes without any public api changes.

      The proposed changes were tested on spark 3.4.X but not on 4.0.0 But I believe 4.0.0 is likely impacted.

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              jkolash Joshua Kolash
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: