From b832933f9fd51d737701c6b33bab748b549f8d14 Mon Sep 17 00:00:00 2001 From: Ates Goral Date: Sun, 19 Apr 2026 11:54:03 -0400 Subject: [PATCH] Parse SSE responses in HTTP client via event_stream_parser The Streamable HTTP spec allows servers to respond with either `application/json` or `text/event-stream`. The client previously rejected the latter. Add SSE parsing via the optional `event_stream_parser` gem (users add it to their Gemfile when their server uses SSE), dispatch on the response Content-Type, and scan the event stream for the first JSON-RPC response message. Document the new optional dependency in both the README's HTTP Transport Layer section and docs/building-clients.md. No other Streamable HTTP features (202 handling, session IDs, `connect`, DELETE termination) are included here; those will land in follow-up PRs. Co-Authored-By: Claude Opus 4.7 --- Gemfile | 1 + README.md | 3 +- docs/building-clients.md | 3 +- lib/mcp/client/http.rb | 49 +++++++++++++-- test/mcp/client/http_test.rb | 112 +++++++++++++++++++++++++++++++++-- 5 files changed, 156 insertions(+), 12 deletions(-) diff --git a/Gemfile b/Gemfile index 5166516d..8b76222d 100644 --- a/Gemfile +++ b/Gemfile @@ -23,6 +23,7 @@ gem "yard", "~> 0.9" gem "yard-sorbet", "~> 0.9" if RUBY_VERSION >= "3.1" group :test do + gem "event_stream_parser", ">= 1.0" gem "faraday", ">= 2.0" gem "minitest", "~> 5.1", require: false gem "mocha" diff --git a/README.md b/README.md index 866c213a..6ef1c08c 100644 --- a/README.md +++ b/README.md @@ -1599,11 +1599,12 @@ The stdio transport automatically handles: Use the `MCP::Client::HTTP` transport to interact with MCP servers using simple HTTP requests. -You'll need to add `faraday` as a dependency in order to use the HTTP transport layer: +You'll need to add `faraday` as a dependency in order to use the HTTP transport layer. Add `event_stream_parser` as well if the server uses SSE (`text/event-stream`) responses: ```ruby gem 'mcp' gem 'faraday', '>= 2.0' +gem 'event_stream_parser', '>= 1.0' # optional, required only for SSE responses ``` Example usage: diff --git a/docs/building-clients.md b/docs/building-clients.md index 45c7e8dc..48352eed 100644 --- a/docs/building-clients.md +++ b/docs/building-clients.md @@ -51,11 +51,12 @@ stdio_transport.close ## HTTP Transport -Use `MCP::Client::HTTP` to interact with MCP servers over HTTP. Requires the `faraday` gem: +Use `MCP::Client::HTTP` to interact with MCP servers over HTTP. Requires the `faraday` gem, plus `event_stream_parser` if the server uses SSE (`text/event-stream`) responses: ```ruby gem 'mcp' gem 'faraday', '>= 2.0' +gem 'event_stream_parser', '>= 1.0' # optional, required only for SSE responses ``` ```ruby diff --git a/lib/mcp/client/http.rb b/lib/mcp/client/http.rb index 1637b0ee..8b135e37 100644 --- a/lib/mcp/client/http.rb +++ b/lib/mcp/client/http.rb @@ -18,8 +18,7 @@ def send_request(request:) params = request[:params] || request["params"] response = client.post("", request) - validate_response_content_type!(response, method, params) - response.body + parse_response_body(response, method, params) rescue Faraday::BadRequestError => e raise RequestHandlerError.new( "The #{method} request is invalid", @@ -92,14 +91,52 @@ def require_faraday! "See https://rubygems.org/gems/faraday for more details." end - def validate_response_content_type!(response, method, params) + def require_event_stream_parser! + require "event_stream_parser" + rescue LoadError + raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \ + "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \ + "See https://rubygems.org/gems/event_stream_parser for more details." + end + + def parse_response_body(response, method, params) content_type = response.headers["Content-Type"] - return if content_type&.include?("application/json") + + if content_type&.include?("text/event-stream") + parse_sse_response(response.body, method, params) + elsif content_type&.include?("application/json") + response.body + else + raise RequestHandlerError.new( + "Unsupported Content-Type: #{content_type.inspect}. Expected application/json or text/event-stream.", + { method: method, params: params }, + error_type: :unsupported_media_type, + ) + end + end + + def parse_sse_response(body, method, params) + require_event_stream_parser! + + json_rpc_response = nil + parser = EventStreamParser::Parser.new + parser.feed(body.to_s) do |_type, data, _id| + next if data.empty? + + begin + parsed = JSON.parse(data) + json_rpc_response = parsed if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error")) + rescue JSON::ParserError + next + end + end + + return json_rpc_response if json_rpc_response raise RequestHandlerError.new( - "Unsupported Content-Type: #{content_type.inspect}. This client only supports JSON responses.", + "No valid JSON-RPC response found in SSE stream", { method: method, params: params }, - error_type: :unsupported_media_type, + error_type: :parse_error, ) end end diff --git a/test/mcp/client/http_test.rb b/test/mcp/client/http_test.rb index 081f0e6a..89d3649b 100644 --- a/test/mcp/client/http_test.rb +++ b/test/mcp/client/http_test.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "test_helper" +require "event_stream_parser" require "faraday" require "webmock/minitest" require "mcp/client/http" @@ -25,6 +26,26 @@ def test_raises_load_error_when_faraday_not_available assert_includes(error.message, "Add it to your Gemfile: gem 'faraday', '>= 2.0'") end + def test_raises_load_error_when_event_stream_parser_not_available + stub_request(:post, url) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: "data: {}\n\n", + ) + + HTTP.any_instance.stubs(:require).with("faraday").returns(true) + HTTP.any_instance.stubs(:require).with("event_stream_parser") + .raises(LoadError, "cannot load such file -- event_stream_parser") + + error = assert_raises(LoadError) do + client.send_request(request: { method: "tools/list" }) + end + + assert_includes(error.message, "The 'event_stream_parser' gem is required to parse SSE responses") + assert_includes(error.message, "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'") + end + def test_headers_are_added_to_the_request headers = { "Authorization" => "Bearer token" } client = HTTP.new(url: url, headers: headers) @@ -267,7 +288,7 @@ def test_block_customizes_faraday_connection custom_client.send_request(request: request) end - def test_send_request_raises_error_for_non_json_response + def test_send_request_raises_error_for_unsupported_content_type request = { jsonrpc: "2.0", id: "test_id", @@ -278,8 +299,8 @@ def test_send_request_raises_error_for_non_json_response .with(body: request.to_json) .to_return( status: 200, - headers: { "Content-Type" => "text/event-stream" }, - body: "data: {}\n\n", + headers: { "Content-Type" => "text/html" }, + body: "", ) error = assert_raises(RequestHandlerError) do @@ -287,13 +308,96 @@ def test_send_request_raises_error_for_non_json_response end assert_equal( - 'Unsupported Content-Type: "text/event-stream". This client only supports JSON responses.', + 'Unsupported Content-Type: "text/html". Expected application/json or text/event-stream.', error.message, ) assert_equal(:unsupported_media_type, error.error_type) assert_equal({ method: "tools/list", params: nil }, error.request) end + def test_send_request_parses_sse_response + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/list", + } + + sse_body = <<~SSE + : comment + data: {"jsonrpc":"2.0","method":"notifications/progress","params":{}} + + data: {"jsonrpc":"2.0","id":"test_id","result":{"tools":[{"name":"echo"}]}} + + SSE + + stub_request(:post, url) + .with(body: request.to_json) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: sse_body, + ) + + response = client.send_request(request: request) + + assert_equal({ "tools" => [{ "name" => "echo" }] }, response["result"]) + end + + def test_send_request_parses_sse_error_response + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/list", + } + + sse_body = <<~SSE + data: {"jsonrpc":"2.0","id":"test_id","error":{"code":-32600,"message":"Invalid request"}} + + SSE + + stub_request(:post, url) + .with(body: request.to_json) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: sse_body, + ) + + response = client.send_request(request: request) + + assert_equal(-32600, response.dig("error", "code")) + assert_equal("Invalid request", response.dig("error", "message")) + end + + def test_send_request_raises_error_for_sse_without_response + request = { + jsonrpc: "2.0", + id: "test_id", + method: "tools/list", + } + + sse_body = <<~SSE + : just a comment + data: {"jsonrpc":"2.0","method":"notifications/progress","params":{}} + + SSE + + stub_request(:post, url) + .with(body: request.to_json) + .to_return( + status: 200, + headers: { "Content-Type" => "text/event-stream" }, + body: sse_body, + ) + + error = assert_raises(RequestHandlerError) do + client.send_request(request: request) + end + + assert_includes(error.message, "No valid JSON-RPC response found in SSE stream") + assert_equal(:parse_error, error.error_type) + end + private def stub_request(method, url)