Skip to content

[SPARK-44462][SS][CONNECT] Fix the session passed to foreachBatch in Spark Connect#55410

Open
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-44462-foreachBatch-session-fix
Open

[SPARK-44462][SS][CONNECT] Fix the session passed to foreachBatch in Spark Connect#55410
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-44462-foreachBatch-session-fix

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Two fixes for foreachBatch session handling in Spark Connect:

  1. DataFrame cache sanity check: Track which DataFrame is cached per streaming query in SessionHolder. When caching a new batch DataFrame, detect and remove any stale one left over from a previous batch. Pass the query ID into the foreachBatch wrappers so it can be set after the query starts.

  2. Dedicated stream SessionHolder: On the first batch, lazily create a new SessionHolder wrapping StreamExecution's cloned SparkSession (instead of the original). Register it with SparkConnectSessionManager with no inactivity timeout since the streaming query manages its lifecycle. Clean up on query termination via ForeachBatchCleaner. On the Python side, each batch now receives (dfId, batchId, streamSessionId) from the server and the worker creates a session bound to the stream session ID.

Why are the changes needed?

SessionHolder referenced the original SparkSession, not the StreamExecution clone (sparkSessionForStream) that batch DataFrames actually use:

// StreamExecution.scala
protected[sql] val sparkSessionForStream: SparkSession = sparkSession.cloneSession()

This caused: (1) batch DataFrames ran against the cloned session but SessionHolder pointed at the original, so session-level state was invisible to Connect; (2) the Python foreachBatch worker operated under the original session ID, keeping the parent session active and delaying cleanup after client disconnect; (3) nothing cleaned up stale cached DataFrames from a previous batch that did not exit cleanly.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Updated SparkConnectSessionHolderSuite to destructure the new 3-tuple return from pythonForeachBatchWrapper.
  • Added test_nested_dataframes in StreamingForeachBatchParityTests -- exercises both closured and batch DataFrames inside foreachBatch, using saveAsTable for cross-session visibility.
  • Pass Github Actions

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Add a `dataFrameQueryIndex` map to `SessionHolder` that tracks the
active cached DataFrame ID per streaming query. Before caching a new
batch DataFrame in `dataFrameCachingWrapper`, we check for any stale
entry from a previous batch and remove it, logging a warning.

This addresses TODO 1 in StreamingForeachBatchHelper.

Changes:
- `SessionHolder`: Add `dataFrameQueryIndex` ConcurrentMap
- `StreamingForeachBatchHelper`: Accept `queryIdRef` in
  `dataFrameCachingWrapper`, perform sanity check, clean up mapping
  in finally block. Update `scalaForeachBatchWrapper` and
  `pythonForeachBatchWrapper` return types to include
  `AtomicReference[String]`.
- `SparkConnectPlanner`: Destructure new return types, set query id
  on the AtomicReference after query starts.
- `SparkConnectSessionHolderSuite`: Update test to destructure
  3-element tuple from `pythonForeachBatchWrapper`.
…nHolder for foreachBatch

When a streaming query starts, `StreamExecution` clones the SparkSession.
The DataFrame passed to foreachBatch has this cloned session, but the
`SessionHolder` still references the original session. This causes:

1. Session mismatch: batch DataFrames operate on a different session
2. Session lifetime leak: Python worker keeps original session alive
3. CachedRemoteRelation resolves against the wrong session

This commit fixes all three issues by:

- Adding `registerExistingSession` to `SparkConnectSessionManager` to
  wrap an existing SparkSession (the stream clone) under a new session
  ID without creating another clone.
- Adding `ForeachBatchSessionManager` that lazily creates a stream-level
  `SessionHolder` on the first batch invocation and reuses it for
  subsequent batches.
- Adding `ForeachBatchCleaner` that closes both the Python runner and
  the stream `SessionHolder` on query termination.
- Sending the stream session ID per-batch to the Python worker so it
  resolves `CachedRemoteRelation` against the correct session.
- Updating `foreach_batch_worker.py` to read the stream session ID and
  create/switch the SparkSession accordingly.

The stream `SessionHolder` uses `customInactiveTimeoutMs = -1` (never
expire by inactivity); its lifecycle is managed by query termination
via `CleanerCache`.

Addresses TODO 2 and TODO 3 in StreamingForeachBatchHelper, removing
all SPARK-44462 TODO comments.
@HyukjinKwon
Copy link
Copy Markdown
Member

@heyihong @HeartSaVioR FYI

@HyukjinKwon HyukjinKwon changed the title [SPARK-44462][CONNECT] Fix the session passed to foreachBatch in Spark Connect [SPARK-44462][SS][CONNECT] Fix the session passed to foreachBatch in Spark Connect Apr 19, 2026
…chHelper

Run scalafmt to fix formatting violations detected by CI linter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants