We have observed that some jobs fail with java.lang.OutOfMemoryError: Java heap space when fetching blocks of a cached table. The exception stack is as follows:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) ~[spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:112) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1275) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1219) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?]
at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1219) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1165) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1384) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1480) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1446) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.gluten.execution.ColumnarInputRDDsWrapper.$anonfun$getIterators$1(WholeStageTransformer.scala:579) ~[gluten-package-1.3.0.jar:?]
at scala.collection.immutable.List.flatMap(List.scala:366) ~[scala-library-2.12.18.jar:?]
at org.apache.gluten.execution.ColumnarInputRDDsWrapper.getIterators(WholeStageTransformer.scala:570) ~[gluten-package-1.3.0.jar:?]
at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.$anonfun$compute$1(WholeStageZippedPartitionsRDD.scala:48) ~[gluten-package-1.3.0.jar:?]
at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25) ~[gluten-package-1.3.0.jar:?]
at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37) ~[gluten-package-1.3.0.jar:?]
at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.compute(WholeStageZippedPartitionsRDD.scala:46) ~[gluten-package-1.3.0.jar:?]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:106) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.scheduler.Task.run(Task.scala:143) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:640) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) [spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) [spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:98) [spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643) [spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.Promise.complete(Promise.scala:53) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.Promise.complete$(Promise.scala:52) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.Promise.failure(Promise.scala:104) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.Promise.failure$(Promise.scala:104) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:187) ~[scala-library-2.12.18.jar:?]
at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:98) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.BlockFetchingListener.onBlockTransferSuccess(BlockFetchingListener.java:37) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.BlockTransferService$$anon$1.onBlockTransferSuccess(BlockTransferService.scala:81) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:266) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchSuccess(RetryingBlockTransferor.java:302) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:287) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:172) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
... 1 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_112]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_112]
at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1685) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
at io.netty.buffer.AbstractDerivedByteBuf.nioBuffer(AbstractDerivedByteBuf.java:122) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1231) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:46) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:94) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.BlockFetchingListener.onBlockTransferSuccess(BlockFetchingListener.java:37) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.BlockTransferService$$anon$1.onBlockTransferSuccess(BlockTransferService.scala:81) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:266) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchSuccess(RetryingBlockTransferor.java:302) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:287) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:172) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
In BlockTransferService, when receiving a block via Netty, the current implementation allocates a ByteBuffer equal to the block size and copies the data from the underlying Netty ByteBuf (code link).
However, if the underlying Netty buffer is a CompositeByteBuf containing more than one component, calling .nioBuffer() forces Netty to allocate another contiguous ByteBuffer internally and copy all separate components into it (Netty code link).
As a result, when fetching a large block, this process might allocate two temporary buffers of the same size as the block, leading to severe memory pressure and eventual OOM.
I noticed that ShuffleBlockFetcherIterator handles this much more efficiently. It exposes the input stream of the underlying Netty buffer directly, entirely avoiding the allocation of a new, contiguous ByteBuffer to receive the data (code link).
I understand that the current implementation simplifies memory lifecycle management (i.e., we don't have to manually track and release() the Netty ByteBuf once it's copied to a Java ByteBuffer). Aside from the buffer release concern, are there any other architectural constraints that prevent us from optimizing this?
Any insights or discussions would be greatly appreciated!
We have observed that some jobs fail with
java.lang.OutOfMemoryError: Java heap spacewhen fetching blocks of a cached table. The exception stack is as follows:In BlockTransferService, when receiving a block via Netty, the current implementation allocates a ByteBuffer equal to the block size and copies the data from the underlying Netty ByteBuf (code link).
However, if the underlying Netty buffer is a
CompositeByteBufcontaining more than one component, calling.nioBuffer()forces Netty to allocate another contiguous ByteBuffer internally and copy all separate components into it (Netty code link).As a result, when fetching a large block, this process might allocate two temporary buffers of the same size as the block, leading to severe memory pressure and eventual OOM.
I noticed that
ShuffleBlockFetcherIteratorhandles this much more efficiently. It exposes the input stream of the underlying Netty buffer directly, entirely avoiding the allocation of a new, contiguous ByteBuffer to receive the data (code link).I understand that the current implementation simplifies memory lifecycle management (i.e., we don't have to manually track and release() the Netty ByteBuf once it's copied to a Java ByteBuffer). Aside from the buffer release concern, are there any other architectural constraints that prevent us from optimizing this?
Any insights or discussions would be greatly appreciated!