diff --git a/src/mcp/client/auth/oauth2.py b/src/mcp/client/auth/oauth2.py index 72309f5775..66ad5b3c79 100644 --- a/src/mcp/client/auth/oauth2.py +++ b/src/mcp/client/auth/oauth2.py @@ -35,6 +35,7 @@ handle_token_response_scopes, is_valid_client_metadata_url, should_use_client_metadata_url, + union_scopes, ) from mcp.client.streamable_http import MCP_PROTOCOL_VERSION from mcp.shared.auth import ( @@ -624,20 +625,25 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx. error = extract_field_from_www_auth(response, "error") # Step 2: Check if we need to step-up authorization - if error == "insufficient_scope": # pragma: no branch + if error == "insufficient_scope": try: - # Step 2a: Update the required scopes - self.context.client_metadata.scope = get_client_metadata_scopes( + # Step 2a: Union previously requested scopes with the + # step-up challenge so prior grants survive (SEP-2350). + challenge_scopes = get_client_metadata_scopes( extract_scope_from_www_auth(response), self.context.protected_resource_metadata, self.context.oauth_metadata, self.context.client_metadata.grant_types, ) + self.context.client_metadata.scope = union_scopes( + self.context.client_metadata.scope, + challenge_scopes, + ) # Step 2b: Perform (re-)authorization and token exchange token_response = yield await self._perform_authorization() await self._handle_token_response(token_response) - except Exception: # pragma: no cover + except Exception: logger.exception("OAuth flow error") raise diff --git a/src/mcp/client/auth/utils.py b/src/mcp/client/auth/utils.py index d75324f2f0..bdc8200286 100644 --- a/src/mcp/client/auth/utils.py +++ b/src/mcp/client/auth/utils.py @@ -95,6 +95,22 @@ def build_protected_resource_metadata_discovery_urls(www_auth_url: str | None, s return urls +def union_scopes(existing: str | None, incoming: str | None) -> str | None: + """Union two space-separated OAuth scope strings, preserving order.""" + if existing is None: + return incoming + if incoming is None: + return existing + + seen: set[str] = set() + merged: list[str] = [] + for scope in existing.split() + incoming.split(): + if scope not in seen: + seen.add(scope) + merged.append(scope) + return " ".join(merged) + + def get_client_metadata_scopes( www_authenticate_scope: str | None, protected_resource_metadata: ProtectedResourceMetadata | None, diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index bb0bce4c92..d6915f998a 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -1,6 +1,7 @@ """Tests for refactored OAuth client authentication implementation.""" import base64 +import logging import time from unittest import mock from urllib.parse import parse_qs, quote, unquote, urlparse @@ -25,6 +26,7 @@ handle_registration_response, is_valid_client_metadata_url, should_use_client_metadata_url, + union_scopes, ) from mcp.server.auth.routes import build_metadata from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions @@ -1320,96 +1322,6 @@ async def test_token_exchange_accepts_201_status( assert oauth_provider.context.current_tokens.access_token == "new_access_token" assert oauth_provider.context.token_expiry_time is not None - @pytest.mark.anyio - async def test_403_insufficient_scope_updates_scope_from_header( - self, - oauth_provider: OAuthClientProvider, - mock_storage: MockTokenStorage, - valid_tokens: OAuthToken, - ): - """Test that 403 response correctly updates scope from WWW-Authenticate header.""" - # Pre-store valid tokens and client info - client_info = OAuthClientInformationFull( - client_id="test_client_id", - client_secret="test_client_secret", - redirect_uris=[AnyUrl("http://localhost:3030/callback")], - ) - await mock_storage.set_tokens(valid_tokens) - await mock_storage.set_client_info(client_info) - oauth_provider.context.current_tokens = valid_tokens - oauth_provider.context.token_expiry_time = time.time() + 1800 - oauth_provider.context.client_info = client_info - oauth_provider._initialized = True - - # Original scope - assert oauth_provider.context.client_metadata.scope == "read write" - - redirect_captured = False - captured_state = None - - async def capture_redirect(url: str) -> None: - nonlocal redirect_captured, captured_state - redirect_captured = True - # Verify the new scope is included in authorization URL - assert "scope=admin%3Awrite+admin%3Adelete" in url or "scope=admin:write+admin:delete" in url.replace( - "%3A", ":" - ).replace("+", " ") - # Extract state from redirect URL - parsed = urlparse(url) - params = parse_qs(parsed.query) - captured_state = params.get("state", [None])[0] - - oauth_provider.context.redirect_handler = capture_redirect - - # Mock callback - async def mock_callback() -> tuple[str, str | None]: - return "auth_code", captured_state - - oauth_provider.context.callback_handler = mock_callback - - test_request = httpx.Request("GET", "https://api.example.com/mcp") - auth_flow = oauth_provider.async_auth_flow(test_request) - - # First request - request = await auth_flow.__anext__() - - # Send 403 with new scope requirement - response_403 = httpx.Response( - 403, - headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="admin:write admin:delete"'}, - request=request, - ) - - # Trigger step-up - should get token exchange request - token_exchange_request = await auth_flow.asend(response_403) - - # Verify scope was updated - assert oauth_provider.context.client_metadata.scope == "admin:write admin:delete" - assert redirect_captured - - # Complete the flow with successful token response - token_response = httpx.Response( - 200, - json={ - "access_token": "new_token_with_new_scope", - "token_type": "Bearer", - "expires_in": 3600, - "scope": "admin:write admin:delete", - }, - request=token_exchange_request, - ) - - # Should get final retry request - final_request = await auth_flow.asend(token_response) - - # Send success response - flow should complete - success_response = httpx.Response(200, request=final_request) - try: - await auth_flow.asend(success_response) - pytest.fail("Should have stopped after successful response") # pragma: no cover - except StopAsyncIteration: - pass # Expected - @pytest.mark.parametrize( ( @@ -2618,3 +2530,570 @@ async def callback_handler() -> tuple[str, str | None]: await auth_flow.asend(final_response) except StopAsyncIteration: pass + + +class TestUnionScopes: + """Unit tests for the scope union helper.""" + + @pytest.mark.parametrize( + ("existing", "incoming", "expected"), + [ + (None, None, None), + (None, "a b", "a b"), + ("a b", None, "a b"), + ("a b", "c d", "a b c d"), + ("a b", "b c", "a b c"), + ("a b", "a b", "a b"), + ("a b", "c", "a b c"), + ("admin", "read", "admin read"), + ("", "a", "a"), + ("a", "", "a"), + ], + ) + def test_union_scopes(self, existing: str | None, incoming: str | None, expected: str | None): + assert union_scopes(existing, incoming) == expected + + +class TestStepUpAuthorization: + """End-to-end tests for the step-up authorization flow.""" + + @pytest.mark.anyio + async def test_step_up_retries_request_with_new_access_token( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + ): + """After step-up succeeds, the retried request carries the new access token.""" + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + await mock_storage.set_tokens(valid_tokens) + await mock_storage.set_client_info(client_info) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + captured_state: str | None = None + + async def capture_redirect(url: str) -> None: + nonlocal captured_state + captured_state = parse_qs(urlparse(url).query).get("state", [None])[0] + + oauth_provider.context.redirect_handler = capture_redirect + + async def mock_callback() -> tuple[str, str | None]: + return "auth_code", captured_state + + oauth_provider.context.callback_handler = mock_callback + + # 1. Original request goes out with the existing access token. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + initial_request = await auth_flow.__anext__() + assert initial_request.headers["Authorization"] == "Bearer test_access_token" + + # 2. Server replies 403 insufficient_scope → flow yields a token-exchange request. + token_request = await auth_flow.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="admin"'}, + request=initial_request, + ) + ) + + # 3. Token endpoint returns the stepped-up token → flow retries the original request. + retry_request = await auth_flow.asend( + httpx.Response( + 200, + json={ + "access_token": "stepped_up_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read write admin", + }, + request=token_request, + ) + ) + # The retry MUST use the new access token, not the one that just got rejected. + assert retry_request.headers["Authorization"] == "Bearer stepped_up_token" + + # 4. Server accepts the retry → flow completes. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(200, request=retry_request)) + + @pytest.mark.anyio + async def test_403_step_up_accumulates_across_multiple_step_ups( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + ): + """Two consecutive 403 step-ups accumulate scopes via union, not replace.""" + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + oauth_provider.context.client_metadata.scope = "read" + await mock_storage.set_tokens(valid_tokens) + await mock_storage.set_client_info(client_info) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + captured_state: str | None = None + + async def capture_redirect(url: str) -> None: + nonlocal captured_state + parsed = urlparse(url) + captured_state = parse_qs(parsed.query).get("state", [None])[0] + + oauth_provider.context.redirect_handler = capture_redirect + + async def mock_callback() -> tuple[str, str | None]: + return "auth_code", captured_state + + oauth_provider.context.callback_handler = mock_callback + + assert oauth_provider.context.client_metadata.scope == "read" + + # First step-up: starting from "read", challenge "write" → "read write". + flow_1 = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request_1 = await flow_1.__anext__() + token_request_1 = await flow_1.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="write"'}, + request=request_1, + ) + ) + assert oauth_provider.context.client_metadata.scope == "read write" + + # Complete flow_1 so client_metadata.scope is committed for the next request. + final_request_1 = await flow_1.asend( + httpx.Response( + 200, + json={ + "access_token": "token_1", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read write", + }, + request=token_request_1, + ) + ) + with pytest.raises(StopAsyncIteration): + await flow_1.asend(httpx.Response(200, request=final_request_1)) + + # Second step-up: starting from "read write" (carried over), challenge "admin" + # → "read write admin". This pins the union-not-replace behavior across requests. + flow_2 = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request_2 = await flow_2.__anext__() + token_request_2 = await flow_2.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="admin"'}, + request=request_2, + ) + ) + assert oauth_provider.context.client_metadata.scope == "read write admin" + + final_request_2 = await flow_2.asend( + httpx.Response( + 200, + json={ + "access_token": "token_2", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read write admin", + }, + request=token_request_2, + ) + ) + with pytest.raises(StopAsyncIteration): + await flow_2.asend(httpx.Response(200, request=final_request_2)) + + @pytest.mark.anyio + async def test_step_up_unions_with_requested_not_granted_when_they_diverge( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + ): + """Union source is the requested scope set, not the granted scope set. + + When the AS grants a strict subset of what was requested (e.g. user declines + a scope at consent), a subsequent step-up must still union with the previously + *requested* scope set so previously-declined scopes are re-requested rather + than silently dropped. + """ + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + # Requested: read write admin. Granted: only read write (user declined admin). + oauth_provider.context.client_metadata.scope = "read write admin" + granted_tokens = OAuthToken( + access_token="granted_token", + token_type="Bearer", + expires_in=3600, + refresh_token="granted_refresh", + scope="read write", + ) + await mock_storage.set_tokens(granted_tokens) + await mock_storage.set_client_info(client_info) + oauth_provider.context.current_tokens = granted_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + captured_scope: str | None = None + captured_state: str | None = None + + async def capture_redirect(url: str) -> None: + nonlocal captured_scope, captured_state + parsed = urlparse(url) + params = parse_qs(parsed.query) + captured_scope = params["scope"][0] + captured_state = params.get("state", [None])[0] + + oauth_provider.context.redirect_handler = capture_redirect + + async def mock_callback() -> tuple[str, str | None]: + return "auth_code", captured_state + + oauth_provider.context.callback_handler = mock_callback + + assert oauth_provider.context.client_metadata.scope == "read write admin" + assert oauth_provider.context.current_tokens is not None + assert oauth_provider.context.current_tokens.scope == "read write" + + # 1. Original request goes out. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request = await auth_flow.__anext__() + + # 2. 403 challenges only "delete" → step-up unions with REQUESTED scopes. + token_request = await auth_flow.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="delete"'}, + request=request, + ) + ) + + # Union is with the requested set ("read write admin"), not the granted set + # ("read write"): "admin" must survive even though the AS didn't grant it last time. + assert oauth_provider.context.client_metadata.scope == "read write admin delete" + assert captured_scope is not None + assert set(captured_scope.split()) == {"read", "write", "admin", "delete"} + + # 3. Token endpoint returns the new token → flow retries the original request. + final_request = await auth_flow.asend( + httpx.Response( + 200, + json={ + "access_token": "new_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read write admin delete", + }, + request=token_request, + ) + ) + # 4. Server accepts the retry → flow completes. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(200, request=final_request)) + + @pytest.mark.anyio + async def test_403_step_up_with_no_scope_in_challenge_falls_back_to_prm( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + ): + """403 with no scope= attribute falls back to PRM scopes_supported, then unioned. + + Confirms the priority-ladder fallback in ``get_client_metadata_scopes`` still + works when the challenge omits ``scope=``. + """ + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + # Pre-seed a scope ("admin") absent from PRM so the union result differs + # from the priority-ladder result and exercises the union codepath. + oauth_provider.context.client_metadata.scope = "admin" + oauth_provider.context.protected_resource_metadata = ProtectedResourceMetadata( + resource=AnyHttpUrl("https://api.example.com/v1/mcp"), + authorization_servers=[AnyHttpUrl("https://auth.example.com")], + scopes_supported=["read", "write"], + ) + await mock_storage.set_tokens(valid_tokens) + await mock_storage.set_client_info(client_info) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + captured_state: str | None = None + + async def capture_redirect(url: str) -> None: + nonlocal captured_state + captured_state = parse_qs(urlparse(url).query).get("state", [None])[0] + + oauth_provider.context.redirect_handler = capture_redirect + + async def mock_callback() -> tuple[str, str | None]: + return "auth_code", captured_state + + oauth_provider.context.callback_handler = mock_callback + + assert oauth_provider.context.client_metadata.scope == "admin" + + # 1. Original request goes out. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request = await auth_flow.__anext__() + + # 2. 403 with no scope= attribute → priority ladder falls back to PRM scopes_supported. + token_request = await auth_flow.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope"'}, + request=request, + ) + ) + + # Existing "admin" unioned with PRM-derived "read write" → "admin read write". + assert oauth_provider.context.client_metadata.scope == "admin read write" + + # 3. Token endpoint returns the new token → flow retries the original request. + final_request = await auth_flow.asend( + httpx.Response( + 200, + json={ + "access_token": "new_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "admin read write", + }, + request=token_request, + ) + ) + # 4. Server accepts the retry → flow completes. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(200, request=final_request)) + + @pytest.mark.anyio + async def test_403_with_non_insufficient_scope_error_is_no_op( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + ): + """403 with an error other than insufficient_scope does not trigger step-up. + + Only insufficient_scope errors initiate re-authorization. Other 403 errors + (e.g. invalid_token) leave ``client_metadata.scope`` unchanged and retry the + request with the existing token so the caller sees the original 403. + """ + await mock_storage.set_tokens(valid_tokens) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider._initialized = True + + original_scope = oauth_provider.context.client_metadata.scope + + # 1. Original request goes out with the existing access token. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request = await auth_flow.__anext__() + + # 2. 403 with error=invalid_token (not insufficient_scope) → no step-up; flow yields the same request. + retry_request = await auth_flow.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="invalid_token"'}, + request=request, + ) + ) + + # Scope is untouched; the retry carries the same (now-rejected) access token. + assert oauth_provider.context.client_metadata.scope == original_scope + assert retry_request.headers["Authorization"] == "Bearer test_access_token" + + # 3. Server returns 403 again → flow completes; the caller sees the original failure. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(403, request=retry_request)) + + @pytest.mark.anyio + async def test_401_replaces_scope_on_reauth( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + ): + """A fresh 401 (no valid tokens) replaces ``client_metadata.scope`` with the challenge. + + Down-scoping invariant: 401 uses replace semantics, so a re-login after step-up + accumulation is the natural opportunity to drop scopes the user no longer needs. + """ + # Simulate prior step-up accumulation; pre-seed client_info to skip DCR. + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + oauth_provider.context.client_metadata.scope = "read write admin" + oauth_provider.context.current_tokens = None + oauth_provider.context.token_expiry_time = None + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + oauth_provider._perform_authorization_code_grant = mock.AsyncMock( + return_value=("test_auth_code", "test_code_verifier") + ) + + assert oauth_provider.context.client_metadata.scope == "read write admin" + + test_request = httpx.Request("GET", "https://api.example.com/mcp") + auth_flow = oauth_provider.async_auth_flow(test_request) + + # 1. Original request goes out without an auth header (no current tokens). + request = await auth_flow.__anext__() + assert "Authorization" not in request.headers + + # 2. 401 with a narrower scope challenge → flow yields a PRM discovery request. + discovery_request = await auth_flow.asend( + httpx.Response( + 401, + headers={ + "WWW-Authenticate": ( + 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource", ' + 'scope="read"' + ) + }, + request=test_request, + ) + ) + + # 3. PRM response → flow yields an OASM (auth-server metadata) discovery request. + oauth_metadata_request = await auth_flow.asend( + httpx.Response( + 200, + content=( + b'{"resource": "https://api.example.com/v1/mcp", ' + b'"authorization_servers": ["https://auth.example.com"]}' + ), + request=discovery_request, + ) + ) + + # 4. OASM response → flows yields a token request. + token_request = await auth_flow.asend( + httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com", ' + b'"authorization_endpoint": "https://auth.example.com/authorize", ' + b'"token_endpoint": "https://auth.example.com/token", ' + b'"registration_endpoint": "https://auth.example.com/register"}' + ), + request=oauth_metadata_request, + ) + ) + + # By the time we're exchanging tokens, the 401 branch has already chosen scope. + # The accumulated "read write admin" must be replaced by the challenge's "read". + assert oauth_provider.context.client_metadata.scope == "read" + + # 5. Token endpoint returns the new token → flow retries the original request with it. + final_request = await auth_flow.asend( + httpx.Response( + 200, + content=b'{"access_token": "new_token", "token_type": "Bearer", "expires_in": 3600}', + request=token_request, + ) + ) + assert final_request.headers["Authorization"] == "Bearer new_token" + + # 6. Server accepts the retry → flow completes. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(200, request=final_request)) + + @pytest.mark.anyio + async def test_happy_path_does_not_mutate_scope( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + ): + """A successful request with a valid token leaves ``client_metadata.scope`` untouched. + + ``client_metadata.scope`` is mutated only on the 401 (replace) and 403 step-up + (union) branches of ``async_auth_flow``. + """ + oauth_provider.context.client_metadata.scope = "read write admin" + await mock_storage.set_tokens(valid_tokens) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider._initialized = True + + assert oauth_provider.context.client_metadata.scope == "read write admin" + + # 1. Original request goes out with the existing access token. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/test")) + request = await auth_flow.__anext__() + assert request.headers["Authorization"] == "Bearer test_access_token" + + # 2. Server accepts → flow completes without ever entering the 401 or 403 branches. + with pytest.raises(StopAsyncIteration): + await auth_flow.asend(httpx.Response(200, request=request)) + + # Scope is unchanged: only 401 (replace) and 403 step-up (union) mutate it. + assert oauth_provider.context.client_metadata.scope == "read write admin" + + @pytest.mark.anyio + async def test_step_up_surfaces_errors_with_log( + self, + oauth_provider: OAuthClientProvider, + mock_storage: MockTokenStorage, + valid_tokens: OAuthToken, + caplog: pytest.LogCaptureFixture, + ): + """Errors raised during step-up are logged and re-raised, not silently swallowed.""" + client_info = OAuthClientInformationFull( + client_id="test_client_id", + client_secret="test_client_secret", + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + ) + await mock_storage.set_tokens(valid_tokens) + await mock_storage.set_client_info(client_info) + oauth_provider.context.current_tokens = valid_tokens + oauth_provider.context.token_expiry_time = time.time() + 1800 + oauth_provider.context.client_info = client_info + oauth_provider._initialized = True + + # Make _perform_authorization fail so the step-up exception handler runs. + boom = RuntimeError("step-up boom") + oauth_provider._perform_authorization = mock.AsyncMock(side_effect=boom) + + # 1. Original request goes out. + auth_flow = oauth_provider.async_auth_flow(httpx.Request("GET", "https://api.example.com/mcp")) + request = await auth_flow.__anext__() + + # 2. 403 insufficient_scope → step-up runs, _perform_authorization raises, + # handler logs and re-raises so the caller sees the original error. + with caplog.at_level(logging.ERROR, logger="mcp.client.auth.oauth2"): + with pytest.raises(RuntimeError, match="step-up boom"): + await auth_flow.asend( + httpx.Response( + 403, + headers={"WWW-Authenticate": 'Bearer error="insufficient_scope", scope="admin"'}, + request=request, + ) + ) + + assert "OAuth flow error" in caplog.text