Skip to content

Spark: Doing a Coalesce and foreachpartitions in spark directly on an iceberg table is leaking memory heavy iterators #13297

Description

@jkolash

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

Summary

Doing the following should not leak any significant amount of memory.

sparkSession.sql("select * from icebergcatalog.db.table").coalesce(4).foreachPartition( (iterator) -> {
 while (iterator.hasNext()) iterator.next();
});

A workaround is to use repartition() instead however this requires more resources to handle spilling shuffling etc..

Spark version: Spark 3.4.X

Details

The Below code can be run on a sufficiently large iceberg table.

  static AtomicInteger partitionCounter = new AtomicInteger(0);

    static void reproduceBug(SparkSession sparkSession, String table) {

        sparkSession.sql("select * from "+table).coalesce(4).foreachPartition( (iterator) -> {
            int partition = partitionCounter.getAndIncrement();
            AtomicLong rowCounter = ThreadLocal.withInitial(() -> new AtomicLong(0)).get();

            while (iterator.hasNext()) {
                iterator.next();
                if (rowCounter.getAndIncrement() % 100000 == 0) {
                    System.out.println(partition + " " + rowCounter.get());
                }
            }
        });
    }

The following image is me running the reproduceBug method over sufficiently large table that we have in our environment with ~500 columns.

Image

The following image shows the "Dominators" report in VisualVM org.apache.spark.TaskContextImpl

Image

Digging deeper we see that the onCallbacks is keeping an anonymous class inside org.apache.spark.sql.execution.datasources.v2.DataSourceRDD and that is holding a reference to org.apache.iceberg.spark.source.RowDataReader

I believe this callback is added here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L90
We also see the org.apache.iceberg.util.Filter iterator holding a heavy reference.
Image

Exploring the problem

Is this inherently a bug in org.apache.spark.sql.execution.datasources.v2.DataSourceRDD? Or should iterators not hold onto state no longer needed once advanced to the end? Is the iterator even exhausted? Once an iterator is exhausted there is no longer a need for referencing. However this kind of breaks the concept of a CloseableIterator which has an explicit close vs an implicit close where you could detect hasNext() is false and auto-close. then even ignore a duplicate close() as it was handled by an implicit close() of iterator exhaustion. I believe an iterator accumulating hundreds of megabytes of state kind of breaks the implicit "expected" contract of an iterator being a streaming set of objects. There might even be a distinction between closing and simply holding onto large object references.

Digging deeper I see items = org.apache.iceberg.parquet.ParquetReader$FileIterator#6 holding onto a model reference. It might be possible to null out the model references when hasNext() is false.

   @Override
    public boolean hasNext() {
      boolean hasNext = valuesRead < totalValues; 
      if (!hasNext) {
        this.model = null;
      }
      return hasNext;
    }
Image

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingstale

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions