Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/mcp/server/auth/_redirect_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Internal helpers for validating registered OAuth redirect URIs.

Lives outside :mod:`mcp.server.auth.routes` to avoid a circular import:
``routes`` imports :class:`~mcp.server.auth.handlers.register.RegistrationHandler`,
and the handler in turn needs the redirect-URI validator, so the validator
must sit in a module that neither side has to depend on transitively.
:mod:`mcp.server.auth.routes` re-exports :func:`validate_registered_redirect_uri`
so callers (including tests) keep the public import path.
"""

from pydantic import AnyUrl

from mcp.shared.auth import InvalidRedirectUriError


def validate_registered_redirect_uri(url: AnyUrl) -> None:
"""Validate that a registered redirect_uri meets OAuth 2.0 + RFC 7591 requirements.

Mirrors the policy that :func:`mcp.server.auth.routes.validate_issuer_url`
applies to issuer URLs: redirect URIs must use ``https``, except that
``http`` is permitted for loopback hosts (``localhost``, ``127.0.0.1``,
``[::1]``) per RFC 8252 §7.3, and they MUST NOT carry a fragment component
per RFC 7591 §2.

Args:
url: A registered redirect_uri value from
:class:`mcp.shared.auth.OAuthClientMetadata`.

Raises:
InvalidRedirectUriError: If the URI uses a scheme other than ``https``
or loopback ``http``, or if it contains a fragment.
"""
# RFC 9700 §4.1.1 (OAuth 2.0 Security BCP): https-only, with the RFC 8252
# native-app loopback exception.
if url.scheme not in ("https", "http"):
raise InvalidRedirectUriError(f"redirect_uri must use https (or http for loopback); got scheme {url.scheme!r}")
if url.scheme == "http" and url.host not in ("localhost", "127.0.0.1", "[::1]"):
raise InvalidRedirectUriError(f"redirect_uri must use https for non-loopback hosts; got {str(url)!r}")
# RFC 7591 §2: redirect_uri MUST NOT contain a fragment component.
if url.fragment is not None:
raise InvalidRedirectUriError(f"redirect_uri must not have a fragment; got {str(url)!r}")
18 changes: 17 additions & 1 deletion src/mcp/server/auth/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from starlette.requests import Request
from starlette.responses import Response

from mcp.server.auth._redirect_uri import validate_registered_redirect_uri
from mcp.server.auth.errors import stringify_pydantic_error
from mcp.server.auth.json_response import PydanticJSONResponse
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, RegistrationError, RegistrationErrorCode
from mcp.server.auth.settings import ClientRegistrationOptions
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata
from mcp.shared.auth import InvalidRedirectUriError, OAuthClientInformationFull, OAuthClientMetadata

# this alias is a no-op; it's just to separate out the types exposed to the
# provider from what we use in the HTTP handler
Expand Down Expand Up @@ -45,6 +46,21 @@ async def handle(self, request: Request) -> Response:
status_code=400,
)

# RFC 9700 §4.1.1 + RFC 7591 §2: Pydantic AnyUrl accepts non-OAuth
# schemes (javascript:, data:, file:, etc.) and URIs with fragments;
# reject those here per spec.
for uri in client_metadata.redirect_uris or []:
try:
validate_registered_redirect_uri(uri)
except InvalidRedirectUriError as e:
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error="invalid_redirect_uri",
error_description=e.message,
),
status_code=400,
)

client_id = str(uuid4())

# If auth method is None, default to client_secret_post
Expand Down
6 changes: 6 additions & 0 deletions src/mcp/server/auth/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from starlette.routing import Route, request_response # type: ignore
from starlette.types import ASGIApp

from mcp.server.auth._redirect_uri import validate_registered_redirect_uri
from mcp.server.auth.handlers.authorize import AuthorizationHandler
from mcp.server.auth.handlers.metadata import MetadataHandler, ProtectedResourceMetadataHandler
from mcp.server.auth.handlers.register import RegistrationHandler
Expand All @@ -20,6 +21,11 @@
from mcp.server.streamable_http import MCP_PROTOCOL_VERSION_HEADER
from mcp.shared.auth import OAuthMetadata, ProtectedResourceMetadata

# Re-exported from ._redirect_uri to avoid a circular import with
# .handlers.register, which also needs this validator. External callers and
# tests should keep importing it from this module.
__all__ = ["validate_registered_redirect_uri"]


def validate_issuer_url(url: AnyHttpUrl):
"""Validate that the issuer URL meets OAuth 2.0 requirements.
Expand Down
86 changes: 86 additions & 0 deletions tests/server/auth/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,89 @@ async def test_token_error_handling_refresh_token(
data = refresh_response.json()
assert data["error"] == "invalid_scope"
assert data["error_description"] == "The requested scope is invalid"


@pytest.mark.anyio
async def test_register_rejects_javascript_scheme(client: httpx.AsyncClient):
"""Registration must reject ``javascript:`` redirect URIs per RFC 9700 §4.1.1."""
response = await client.post(
"/register",
json={
"redirect_uris": ["javascript:alert(1)"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 400, response.content
data = response.json()
assert data["error"] == "invalid_redirect_uri"
assert "https" in data["error_description"]


@pytest.mark.anyio
async def test_register_rejects_cleartext_http_non_loopback(client: httpx.AsyncClient):
"""Registration must reject cleartext http for non-loopback hosts per RFC 8252 §7.3."""
response = await client.post(
"/register",
json={
"redirect_uris": ["http://attacker.example/cb"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 400, response.content
data = response.json()
assert data["error"] == "invalid_redirect_uri"
assert "non-loopback" in data["error_description"]


@pytest.mark.anyio
async def test_register_rejects_fragment(client: httpx.AsyncClient):
"""Registration must reject redirect URIs that include a fragment per RFC 7591 §2."""
response = await client.post(
"/register",
json={
"redirect_uris": ["https://example.com/cb#frag"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 400, response.content
data = response.json()
assert data["error"] == "invalid_redirect_uri"
assert "fragment" in data["error_description"]


@pytest.mark.anyio
async def test_register_accepts_https_redirect_uri(client: httpx.AsyncClient):
"""Registration must accept https redirect URIs."""
response = await client.post(
"/register",
json={
"redirect_uris": ["https://example.com/cb"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 201, response.content
data = response.json()
assert "client_id" in data


@pytest.mark.anyio
async def test_register_accepts_loopback_redirect_uri(client: httpx.AsyncClient):
"""Registration must accept http://localhost loopback redirect URIs per RFC 8252 §7.3."""
response = await client.post(
"/register",
json={
"redirect_uris": ["http://localhost:8080/cb"],
"token_endpoint_auth_method": "none",
"grant_types": ["authorization_code"],
"response_types": ["code"],
},
)
assert response.status_code == 201, response.content
66 changes: 64 additions & 2 deletions tests/server/auth/test_routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from pydantic import AnyHttpUrl
from pydantic import AnyHttpUrl, AnyUrl

from mcp.server.auth.routes import validate_issuer_url
from mcp.server.auth.routes import validate_issuer_url, validate_registered_redirect_uri
from mcp.shared.auth import InvalidRedirectUriError


def test_validate_issuer_url_https_allowed():
Expand Down Expand Up @@ -45,3 +46,64 @@ def test_validate_issuer_url_fragment_rejected():
def test_validate_issuer_url_query_rejected():
with pytest.raises(ValueError, match="query"):
validate_issuer_url(AnyHttpUrl("https://example.com/path?q=1"))


def test_validate_registered_redirect_uri_https_allowed():
validate_registered_redirect_uri(AnyUrl("https://example.com/cb"))


def test_validate_registered_redirect_uri_https_with_query_allowed():
validate_registered_redirect_uri(AnyUrl("https://example.com/cb?foo=bar"))


def test_validate_registered_redirect_uri_http_localhost_allowed():
validate_registered_redirect_uri(AnyUrl("http://localhost:8080/cb"))


def test_validate_registered_redirect_uri_http_127_0_0_1_allowed():
validate_registered_redirect_uri(AnyUrl("http://127.0.0.1:8080/cb"))


def test_validate_registered_redirect_uri_http_ipv6_loopback_allowed():
validate_registered_redirect_uri(AnyUrl("http://[::1]:8080/cb"))


def test_validate_registered_redirect_uri_javascript_scheme_rejected():
with pytest.raises(InvalidRedirectUriError, match="must use https"):
validate_registered_redirect_uri(AnyUrl("javascript:alert(1)"))


def test_validate_registered_redirect_uri_data_scheme_rejected():
with pytest.raises(InvalidRedirectUriError, match="must use https"):
validate_registered_redirect_uri(AnyUrl("data:text/html,x"))


def test_validate_registered_redirect_uri_file_scheme_rejected():
with pytest.raises(InvalidRedirectUriError, match="must use https"):
validate_registered_redirect_uri(AnyUrl("file:///etc/passwd"))


def test_validate_registered_redirect_uri_ftp_scheme_rejected():
with pytest.raises(InvalidRedirectUriError, match="must use https"):
validate_registered_redirect_uri(AnyUrl("ftp://attacker.example/cb"))


def test_validate_registered_redirect_uri_http_non_loopback_rejected():
with pytest.raises(InvalidRedirectUriError, match="must use https for non-loopback"):
validate_registered_redirect_uri(AnyUrl("http://attacker.example/cb"))


def test_validate_registered_redirect_uri_http_127_prefix_domain_rejected():
"""A domain like 127.0.0.1.evil.com is NOT loopback."""
with pytest.raises(InvalidRedirectUriError, match="must use https for non-loopback"):
validate_registered_redirect_uri(AnyUrl("http://127.0.0.1.evil.com/cb"))


def test_validate_registered_redirect_uri_fragment_rejected():
with pytest.raises(InvalidRedirectUriError, match="must not have a fragment"):
validate_registered_redirect_uri(AnyUrl("https://example.com/cb#frag"))


def test_validate_registered_redirect_uri_empty_fragment_rejected():
with pytest.raises(InvalidRedirectUriError, match="must not have a fragment"):
validate_registered_redirect_uri(AnyUrl("https://example.com/cb#"))
Loading