Skip to content

Fetching of blocks of cache table may cause high memory pressure #55387

@jiangjiangtian

Description

@jiangjiangtian

We have observed that some jobs fail with java.lang.OutOfMemoryError: Java heap space when fetching blocks of a cached table. The exception stack is as follows:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) ~[spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:112) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1275) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1219) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at scala.Option.orElse(Option.scala:447) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1219) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1165) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1384) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1480) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1446) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.gluten.execution.ColumnarInputRDDsWrapper.$anonfun$getIterators$1(WholeStageTransformer.scala:579) ~[gluten-package-1.3.0.jar:?]
	at scala.collection.immutable.List.flatMap(List.scala:366) ~[scala-library-2.12.18.jar:?]
	at org.apache.gluten.execution.ColumnarInputRDDsWrapper.getIterators(WholeStageTransformer.scala:570) ~[gluten-package-1.3.0.jar:?]
	at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.$anonfun$compute$1(WholeStageZippedPartitionsRDD.scala:48) ~[gluten-package-1.3.0.jar:?]
	at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25) ~[gluten-package-1.3.0.jar:?]
	at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37) ~[gluten-package-1.3.0.jar:?]
	at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.compute(WholeStageZippedPartitionsRDD.scala:46) ~[gluten-package-1.3.0.jar:?]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:106) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.scheduler.Task.run(Task.scala:143) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:640) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) [spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) [spark-common-utils_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:98) [spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643) [spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: java.util.concurrent.ExecutionException: Boxed Error
	at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.Promise.complete(Promise.scala:53) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.Promise.complete$(Promise.scala:52) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.Promise.failure(Promise.scala:104) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.Promise.failure$(Promise.scala:104) ~[scala-library-2.12.18.jar:?]
	at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:187) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:98) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.BlockFetchingListener.onBlockTransferSuccess(BlockFetchingListener.java:37) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.BlockTransferService$$anon$1.onBlockTransferSuccess(BlockTransferService.scala:81) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:266) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchSuccess(RetryingBlockTransferor.java:302) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:287) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:172) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.96.Final.jar:4.1.96.Final]
	... 1 more
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_112]
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_112]
	at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1685) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.buffer.AbstractDerivedByteBuf.nioBuffer(AbstractDerivedByteBuf.java:122) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1231) ~[netty-buffer-4.1.96.Final.jar:4.1.96.Final]
	at org.apache.spark.network.buffer.NettyManagedBuffer.nioByteBuffer(NettyManagedBuffer.java:46) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:94) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.BlockFetchingListener.onBlockTransferSuccess(BlockFetchingListener.java:37) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.BlockTransferService$$anon$1.onBlockTransferSuccess(BlockTransferService.scala:81) ~[spark-core_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:266) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchSuccess(RetryingBlockTransferor.java:302) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:287) ~[spark-network-shuffle_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:172) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) ~[spark-network-common_2.12-3.5.1-mt-1.0.0-rc1-SNAPSHOT.jar:3.5.1-mt-1.0.0-rc1-SNAPSHOT]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.96.Final.jar:4.1.96.Final]

In BlockTransferService, when receiving a block via Netty, the current implementation allocates a ByteBuffer equal to the block size and copies the data from the underlying Netty ByteBuf (code link).

However, if the underlying Netty buffer is a CompositeByteBuf containing more than one component, calling .nioBuffer() forces Netty to allocate another contiguous ByteBuffer internally and copy all separate components into it (Netty code link).

As a result, when fetching a large block, this process might allocate two temporary buffers of the same size as the block, leading to severe memory pressure and eventual OOM.

I noticed that ShuffleBlockFetcherIterator handles this much more efficiently. It exposes the input stream of the underlying Netty buffer directly, entirely avoiding the allocation of a new, contiguous ByteBuffer to receive the data (code link).

I understand that the current implementation simplifies memory lifecycle management (i.e., we don't have to manually track and release() the Netty ByteBuf once it's copied to a Java ByteBuffer). Aside from the buffer release concern, are there any other architectural constraints that prevent us from optimizing this?

Any insights or discussions would be greatly appreciated!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions