[SPARK-44462][SS][CONNECT] Fix the session passed to foreachBatch in Spark Connect#55410
Open
LuciferYang wants to merge 3 commits intoapache:masterfrom
Open
[SPARK-44462][SS][CONNECT] Fix the session passed to foreachBatch in Spark Connect#55410LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang wants to merge 3 commits intoapache:masterfrom
Conversation
Add a `dataFrameQueryIndex` map to `SessionHolder` that tracks the active cached DataFrame ID per streaming query. Before caching a new batch DataFrame in `dataFrameCachingWrapper`, we check for any stale entry from a previous batch and remove it, logging a warning. This addresses TODO 1 in StreamingForeachBatchHelper. Changes: - `SessionHolder`: Add `dataFrameQueryIndex` ConcurrentMap - `StreamingForeachBatchHelper`: Accept `queryIdRef` in `dataFrameCachingWrapper`, perform sanity check, clean up mapping in finally block. Update `scalaForeachBatchWrapper` and `pythonForeachBatchWrapper` return types to include `AtomicReference[String]`. - `SparkConnectPlanner`: Destructure new return types, set query id on the AtomicReference after query starts. - `SparkConnectSessionHolderSuite`: Update test to destructure 3-element tuple from `pythonForeachBatchWrapper`.
…nHolder for foreachBatch When a streaming query starts, `StreamExecution` clones the SparkSession. The DataFrame passed to foreachBatch has this cloned session, but the `SessionHolder` still references the original session. This causes: 1. Session mismatch: batch DataFrames operate on a different session 2. Session lifetime leak: Python worker keeps original session alive 3. CachedRemoteRelation resolves against the wrong session This commit fixes all three issues by: - Adding `registerExistingSession` to `SparkConnectSessionManager` to wrap an existing SparkSession (the stream clone) under a new session ID without creating another clone. - Adding `ForeachBatchSessionManager` that lazily creates a stream-level `SessionHolder` on the first batch invocation and reuses it for subsequent batches. - Adding `ForeachBatchCleaner` that closes both the Python runner and the stream `SessionHolder` on query termination. - Sending the stream session ID per-batch to the Python worker so it resolves `CachedRemoteRelation` against the correct session. - Updating `foreach_batch_worker.py` to read the stream session ID and create/switch the SparkSession accordingly. The stream `SessionHolder` uses `customInactiveTimeoutMs = -1` (never expire by inactivity); its lifecycle is managed by query termination via `CleanerCache`. Addresses TODO 2 and TODO 3 in StreamingForeachBatchHelper, removing all SPARK-44462 TODO comments.
Member
…chHelper Run scalafmt to fix formatting violations detected by CI linter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Two fixes for foreachBatch session handling in Spark Connect:
DataFrame cache sanity check: Track which DataFrame is cached per streaming query in
SessionHolder. When caching a new batch DataFrame, detect and remove any stale one left over from a previous batch. Pass the query ID into the foreachBatch wrappers so it can be set after the query starts.Dedicated stream SessionHolder: On the first batch, lazily create a new
SessionHolderwrappingStreamExecution's cloned SparkSession (instead of the original). Register it withSparkConnectSessionManagerwith no inactivity timeout since the streaming query manages its lifecycle. Clean up on query termination viaForeachBatchCleaner. On the Python side, each batch now receives(dfId, batchId, streamSessionId)from the server and the worker creates a session bound to the stream session ID.Why are the changes needed?
SessionHolderreferenced the original SparkSession, not theStreamExecutionclone (sparkSessionForStream) that batch DataFrames actually use:This caused: (1) batch DataFrames ran against the cloned session but
SessionHolderpointed at the original, so session-level state was invisible to Connect; (2) the Python foreachBatch worker operated under the original session ID, keeping the parent session active and delaying cleanup after client disconnect; (3) nothing cleaned up stale cached DataFrames from a previous batch that did not exit cleanly.Does this PR introduce any user-facing change?
No.
How was this patch tested?
SparkConnectSessionHolderSuiteto destructure the new 3-tuple return frompythonForeachBatchWrapper.test_nested_dataframesinStreamingForeachBatchParityTests-- exercises both closured and batch DataFrames inside foreachBatch, usingsaveAsTablefor cross-session visibility.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code