From 3ee84a96a8ed0fda85f5c06bd1dc93315e12c847 Mon Sep 17 00:00:00 2001 From: kalyanguru18 Date: Mon, 18 May 2026 12:02:20 +0000 Subject: [PATCH 1/3] feat(auth): add validate_registered_redirect_uri helper --- src/mcp/server/auth/routes.py | 31 ++++++++++++++-- tests/server/auth/test_routes.py | 61 ++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/mcp/server/auth/routes.py b/src/mcp/server/auth/routes.py index a72e819477..60b54ddbb6 100644 --- a/src/mcp/server/auth/routes.py +++ b/src/mcp/server/auth/routes.py @@ -2,7 +2,7 @@ from typing import Any from urllib.parse import urlparse -from pydantic import AnyHttpUrl +from pydantic import AnyHttpUrl, AnyUrl from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request from starlette.responses import Response @@ -18,7 +18,7 @@ from mcp.server.auth.provider import OAuthAuthorizationServerProvider from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions from mcp.server.streamable_http import MCP_PROTOCOL_VERSION_HEADER -from mcp.shared.auth import OAuthMetadata, ProtectedResourceMetadata +from mcp.shared.auth import InvalidRedirectUriError, OAuthMetadata, ProtectedResourceMetadata def validate_issuer_url(url: AnyHttpUrl): @@ -42,6 +42,33 @@ def validate_issuer_url(url: AnyHttpUrl): raise ValueError("Issuer URL must not have a query string") +def validate_registered_redirect_uri(url: AnyUrl) -> None: + """Validate that a registered redirect_uri meets OAuth 2.0 + RFC 7591 requirements. + + Mirrors the policy that :func:`validate_issuer_url` applies to issuer URLs: + redirect URIs must use ``https``, except that ``http`` is permitted for + loopback hosts (``localhost``, ``127.0.0.1``, ``[::1]``) per RFC 8252 §7.3, + and they MUST NOT carry a fragment component per RFC 7591 §2. + + Args: + url: A registered redirect_uri value from + :class:`mcp.shared.auth.OAuthClientMetadata`. + + Raises: + InvalidRedirectUriError: If the URI uses a scheme other than ``https`` + or loopback ``http``, or if it contains a fragment. + """ + # RFC 9700 §4.1.1 (OAuth 2.0 Security BCP): https-only, with the RFC 8252 + # native-app loopback exception. + if url.scheme not in ("https", "http"): + raise InvalidRedirectUriError(f"redirect_uri must use https (or http for loopback); got scheme {url.scheme!r}") + if url.scheme == "http" and url.host not in ("localhost", "127.0.0.1", "[::1]"): + raise InvalidRedirectUriError(f"redirect_uri must use https for non-loopback hosts; got {str(url)!r}") + # RFC 7591 §2: redirect_uri MUST NOT contain a fragment component. + if url.fragment: + raise InvalidRedirectUriError(f"redirect_uri must not have a fragment; got {str(url)!r}") + + AUTHORIZATION_PATH = "/authorize" TOKEN_PATH = "/token" REGISTRATION_PATH = "/register" diff --git a/tests/server/auth/test_routes.py b/tests/server/auth/test_routes.py index 3d13b5ba53..202358d3a4 100644 --- a/tests/server/auth/test_routes.py +++ b/tests/server/auth/test_routes.py @@ -1,7 +1,8 @@ import pytest -from pydantic import AnyHttpUrl +from pydantic import AnyHttpUrl, AnyUrl -from mcp.server.auth.routes import validate_issuer_url +from mcp.server.auth.routes import validate_issuer_url, validate_registered_redirect_uri +from mcp.shared.auth import InvalidRedirectUriError def test_validate_issuer_url_https_allowed(): @@ -45,3 +46,59 @@ def test_validate_issuer_url_fragment_rejected(): def test_validate_issuer_url_query_rejected(): with pytest.raises(ValueError, match="query"): validate_issuer_url(AnyHttpUrl("https://example.com/path?q=1")) + + +def test_validate_registered_redirect_uri_https_allowed(): + validate_registered_redirect_uri(AnyUrl("https://example.com/cb")) + + +def test_validate_registered_redirect_uri_https_with_query_allowed(): + validate_registered_redirect_uri(AnyUrl("https://example.com/cb?foo=bar")) + + +def test_validate_registered_redirect_uri_http_localhost_allowed(): + validate_registered_redirect_uri(AnyUrl("http://localhost:8080/cb")) + + +def test_validate_registered_redirect_uri_http_127_0_0_1_allowed(): + validate_registered_redirect_uri(AnyUrl("http://127.0.0.1:8080/cb")) + + +def test_validate_registered_redirect_uri_http_ipv6_loopback_allowed(): + validate_registered_redirect_uri(AnyUrl("http://[::1]:8080/cb")) + + +def test_validate_registered_redirect_uri_javascript_scheme_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must use https"): + validate_registered_redirect_uri(AnyUrl("javascript:alert(1)")) + + +def test_validate_registered_redirect_uri_data_scheme_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must use https"): + validate_registered_redirect_uri(AnyUrl("data:text/html,x")) + + +def test_validate_registered_redirect_uri_file_scheme_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must use https"): + validate_registered_redirect_uri(AnyUrl("file:///etc/passwd")) + + +def test_validate_registered_redirect_uri_ftp_scheme_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must use https"): + validate_registered_redirect_uri(AnyUrl("ftp://attacker.example/cb")) + + +def test_validate_registered_redirect_uri_http_non_loopback_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must use https for non-loopback"): + validate_registered_redirect_uri(AnyUrl("http://attacker.example/cb")) + + +def test_validate_registered_redirect_uri_http_127_prefix_domain_rejected(): + """A domain like 127.0.0.1.evil.com is NOT loopback.""" + with pytest.raises(InvalidRedirectUriError, match="must use https for non-loopback"): + validate_registered_redirect_uri(AnyUrl("http://127.0.0.1.evil.com/cb")) + + +def test_validate_registered_redirect_uri_fragment_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must not have a fragment"): + validate_registered_redirect_uri(AnyUrl("https://example.com/cb#frag")) From 35df94476bcc981c4a6c0dee2251eee00ef20ca2 Mon Sep 17 00:00:00 2001 From: kalyanguru18 Date: Mon, 18 May 2026 12:02:20 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix(auth):=20reject=20empty-fragment=20redi?= =?UTF-8?q?rect=5Furi=20(RFC=207591=20=C2=A72)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mcp/server/auth/routes.py | 2 +- tests/server/auth/test_routes.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mcp/server/auth/routes.py b/src/mcp/server/auth/routes.py index 60b54ddbb6..09ed8d390f 100644 --- a/src/mcp/server/auth/routes.py +++ b/src/mcp/server/auth/routes.py @@ -65,7 +65,7 @@ def validate_registered_redirect_uri(url: AnyUrl) -> None: if url.scheme == "http" and url.host not in ("localhost", "127.0.0.1", "[::1]"): raise InvalidRedirectUriError(f"redirect_uri must use https for non-loopback hosts; got {str(url)!r}") # RFC 7591 §2: redirect_uri MUST NOT contain a fragment component. - if url.fragment: + if url.fragment is not None: raise InvalidRedirectUriError(f"redirect_uri must not have a fragment; got {str(url)!r}") diff --git a/tests/server/auth/test_routes.py b/tests/server/auth/test_routes.py index 202358d3a4..91f94c9469 100644 --- a/tests/server/auth/test_routes.py +++ b/tests/server/auth/test_routes.py @@ -102,3 +102,8 @@ def test_validate_registered_redirect_uri_http_127_prefix_domain_rejected(): def test_validate_registered_redirect_uri_fragment_rejected(): with pytest.raises(InvalidRedirectUriError, match="must not have a fragment"): validate_registered_redirect_uri(AnyUrl("https://example.com/cb#frag")) + + +def test_validate_registered_redirect_uri_empty_fragment_rejected(): + with pytest.raises(InvalidRedirectUriError, match="must not have a fragment"): + validate_registered_redirect_uri(AnyUrl("https://example.com/cb#")) From 759c9be25488abe2e052399afccb22841bc0549e Mon Sep 17 00:00:00 2001 From: kalyanguru18 Date: Mon, 18 May 2026 12:02:20 +0000 Subject: [PATCH 3/3] fix(auth): reject non-spec redirect_uris in DCR handler Extracts validate_registered_redirect_uri to mcp.server.auth._redirect_uri to avoid a circular import with handlers.register; re-exports from routes to preserve the public test import path. --- src/mcp/server/auth/_redirect_uri.py | 41 +++++++++++ src/mcp/server/auth/handlers/register.py | 18 ++++- src/mcp/server/auth/routes.py | 37 +++------- tests/server/auth/test_error_handling.py | 86 ++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 30 deletions(-) create mode 100644 src/mcp/server/auth/_redirect_uri.py diff --git a/src/mcp/server/auth/_redirect_uri.py b/src/mcp/server/auth/_redirect_uri.py new file mode 100644 index 0000000000..05c42f49e8 --- /dev/null +++ b/src/mcp/server/auth/_redirect_uri.py @@ -0,0 +1,41 @@ +"""Internal helpers for validating registered OAuth redirect URIs. + +Lives outside :mod:`mcp.server.auth.routes` to avoid a circular import: +``routes`` imports :class:`~mcp.server.auth.handlers.register.RegistrationHandler`, +and the handler in turn needs the redirect-URI validator, so the validator +must sit in a module that neither side has to depend on transitively. +:mod:`mcp.server.auth.routes` re-exports :func:`validate_registered_redirect_uri` +so callers (including tests) keep the public import path. +""" + +from pydantic import AnyUrl + +from mcp.shared.auth import InvalidRedirectUriError + + +def validate_registered_redirect_uri(url: AnyUrl) -> None: + """Validate that a registered redirect_uri meets OAuth 2.0 + RFC 7591 requirements. + + Mirrors the policy that :func:`mcp.server.auth.routes.validate_issuer_url` + applies to issuer URLs: redirect URIs must use ``https``, except that + ``http`` is permitted for loopback hosts (``localhost``, ``127.0.0.1``, + ``[::1]``) per RFC 8252 §7.3, and they MUST NOT carry a fragment component + per RFC 7591 §2. + + Args: + url: A registered redirect_uri value from + :class:`mcp.shared.auth.OAuthClientMetadata`. + + Raises: + InvalidRedirectUriError: If the URI uses a scheme other than ``https`` + or loopback ``http``, or if it contains a fragment. + """ + # RFC 9700 §4.1.1 (OAuth 2.0 Security BCP): https-only, with the RFC 8252 + # native-app loopback exception. + if url.scheme not in ("https", "http"): + raise InvalidRedirectUriError(f"redirect_uri must use https (or http for loopback); got scheme {url.scheme!r}") + if url.scheme == "http" and url.host not in ("localhost", "127.0.0.1", "[::1]"): + raise InvalidRedirectUriError(f"redirect_uri must use https for non-loopback hosts; got {str(url)!r}") + # RFC 7591 §2: redirect_uri MUST NOT contain a fragment component. + if url.fragment is not None: + raise InvalidRedirectUriError(f"redirect_uri must not have a fragment; got {str(url)!r}") diff --git a/src/mcp/server/auth/handlers/register.py b/src/mcp/server/auth/handlers/register.py index 79eb0fb0c1..149a0299ba 100644 --- a/src/mcp/server/auth/handlers/register.py +++ b/src/mcp/server/auth/handlers/register.py @@ -8,11 +8,12 @@ from starlette.requests import Request from starlette.responses import Response +from mcp.server.auth._redirect_uri import validate_registered_redirect_uri from mcp.server.auth.errors import stringify_pydantic_error from mcp.server.auth.json_response import PydanticJSONResponse from mcp.server.auth.provider import OAuthAuthorizationServerProvider, RegistrationError, RegistrationErrorCode from mcp.server.auth.settings import ClientRegistrationOptions -from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata +from mcp.shared.auth import InvalidRedirectUriError, OAuthClientInformationFull, OAuthClientMetadata # this alias is a no-op; it's just to separate out the types exposed to the # provider from what we use in the HTTP handler @@ -45,6 +46,21 @@ async def handle(self, request: Request) -> Response: status_code=400, ) + # RFC 9700 §4.1.1 + RFC 7591 §2: Pydantic AnyUrl accepts non-OAuth + # schemes (javascript:, data:, file:, etc.) and URIs with fragments; + # reject those here per spec. + for uri in client_metadata.redirect_uris or []: + try: + validate_registered_redirect_uri(uri) + except InvalidRedirectUriError as e: + return PydanticJSONResponse( + content=RegistrationErrorResponse( + error="invalid_redirect_uri", + error_description=e.message, + ), + status_code=400, + ) + client_id = str(uuid4()) # If auth method is None, default to client_secret_post diff --git a/src/mcp/server/auth/routes.py b/src/mcp/server/auth/routes.py index 09ed8d390f..ccd847a83d 100644 --- a/src/mcp/server/auth/routes.py +++ b/src/mcp/server/auth/routes.py @@ -2,13 +2,14 @@ from typing import Any from urllib.parse import urlparse -from pydantic import AnyHttpUrl, AnyUrl +from pydantic import AnyHttpUrl from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request from starlette.responses import Response from starlette.routing import Route, request_response # type: ignore from starlette.types import ASGIApp +from mcp.server.auth._redirect_uri import validate_registered_redirect_uri from mcp.server.auth.handlers.authorize import AuthorizationHandler from mcp.server.auth.handlers.metadata import MetadataHandler, ProtectedResourceMetadataHandler from mcp.server.auth.handlers.register import RegistrationHandler @@ -18,7 +19,12 @@ from mcp.server.auth.provider import OAuthAuthorizationServerProvider from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions from mcp.server.streamable_http import MCP_PROTOCOL_VERSION_HEADER -from mcp.shared.auth import InvalidRedirectUriError, OAuthMetadata, ProtectedResourceMetadata +from mcp.shared.auth import OAuthMetadata, ProtectedResourceMetadata + +# Re-exported from ._redirect_uri to avoid a circular import with +# .handlers.register, which also needs this validator. External callers and +# tests should keep importing it from this module. +__all__ = ["validate_registered_redirect_uri"] def validate_issuer_url(url: AnyHttpUrl): @@ -42,33 +48,6 @@ def validate_issuer_url(url: AnyHttpUrl): raise ValueError("Issuer URL must not have a query string") -def validate_registered_redirect_uri(url: AnyUrl) -> None: - """Validate that a registered redirect_uri meets OAuth 2.0 + RFC 7591 requirements. - - Mirrors the policy that :func:`validate_issuer_url` applies to issuer URLs: - redirect URIs must use ``https``, except that ``http`` is permitted for - loopback hosts (``localhost``, ``127.0.0.1``, ``[::1]``) per RFC 8252 §7.3, - and they MUST NOT carry a fragment component per RFC 7591 §2. - - Args: - url: A registered redirect_uri value from - :class:`mcp.shared.auth.OAuthClientMetadata`. - - Raises: - InvalidRedirectUriError: If the URI uses a scheme other than ``https`` - or loopback ``http``, or if it contains a fragment. - """ - # RFC 9700 §4.1.1 (OAuth 2.0 Security BCP): https-only, with the RFC 8252 - # native-app loopback exception. - if url.scheme not in ("https", "http"): - raise InvalidRedirectUriError(f"redirect_uri must use https (or http for loopback); got scheme {url.scheme!r}") - if url.scheme == "http" and url.host not in ("localhost", "127.0.0.1", "[::1]"): - raise InvalidRedirectUriError(f"redirect_uri must use https for non-loopback hosts; got {str(url)!r}") - # RFC 7591 §2: redirect_uri MUST NOT contain a fragment component. - if url.fragment is not None: - raise InvalidRedirectUriError(f"redirect_uri must not have a fragment; got {str(url)!r}") - - AUTHORIZATION_PATH = "/authorize" TOKEN_PATH = "/token" REGISTRATION_PATH = "/register" diff --git a/tests/server/auth/test_error_handling.py b/tests/server/auth/test_error_handling.py index 7c5c435825..0b7fa41e8a 100644 --- a/tests/server/auth/test_error_handling.py +++ b/tests/server/auth/test_error_handling.py @@ -288,3 +288,89 @@ async def test_token_error_handling_refresh_token( data = refresh_response.json() assert data["error"] == "invalid_scope" assert data["error_description"] == "The requested scope is invalid" + + +@pytest.mark.anyio +async def test_register_rejects_javascript_scheme(client: httpx.AsyncClient): + """Registration must reject ``javascript:`` redirect URIs per RFC 9700 §4.1.1.""" + response = await client.post( + "/register", + json={ + "redirect_uris": ["javascript:alert(1)"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert response.status_code == 400, response.content + data = response.json() + assert data["error"] == "invalid_redirect_uri" + assert "https" in data["error_description"] + + +@pytest.mark.anyio +async def test_register_rejects_cleartext_http_non_loopback(client: httpx.AsyncClient): + """Registration must reject cleartext http for non-loopback hosts per RFC 8252 §7.3.""" + response = await client.post( + "/register", + json={ + "redirect_uris": ["http://attacker.example/cb"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert response.status_code == 400, response.content + data = response.json() + assert data["error"] == "invalid_redirect_uri" + assert "non-loopback" in data["error_description"] + + +@pytest.mark.anyio +async def test_register_rejects_fragment(client: httpx.AsyncClient): + """Registration must reject redirect URIs that include a fragment per RFC 7591 §2.""" + response = await client.post( + "/register", + json={ + "redirect_uris": ["https://example.com/cb#frag"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert response.status_code == 400, response.content + data = response.json() + assert data["error"] == "invalid_redirect_uri" + assert "fragment" in data["error_description"] + + +@pytest.mark.anyio +async def test_register_accepts_https_redirect_uri(client: httpx.AsyncClient): + """Registration must accept https redirect URIs.""" + response = await client.post( + "/register", + json={ + "redirect_uris": ["https://example.com/cb"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert response.status_code == 201, response.content + data = response.json() + assert "client_id" in data + + +@pytest.mark.anyio +async def test_register_accepts_loopback_redirect_uri(client: httpx.AsyncClient): + """Registration must accept http://localhost loopback redirect URIs per RFC 8252 §7.3.""" + response = await client.post( + "/register", + json={ + "redirect_uris": ["http://localhost:8080/cb"], + "token_endpoint_auth_method": "none", + "grant_types": ["authorization_code"], + "response_types": ["code"], + }, + ) + assert response.status_code == 201, response.content