Skip to content
Merged
2 changes: 2 additions & 0 deletions src/fides/api/app_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
is_rate_limit_enabled,
)
from fides.api.util.saas_config_updater import update_saas_configs
from fides.api.util.security_headers import SecurityHeadersMiddleware
from fides.config import CONFIG
from fides.config.config_proxy import ConfigProxy

Expand Down Expand Up @@ -87,6 +88,7 @@ def create_fides_app(
fastapi_app.state.limiter = fides_limiter
# Starlette bug causing this to fail mypy
fastapi_app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # type: ignore
fastapi_app.add_middleware(SecurityHeadersMiddleware)
for handler in ExceptionHandlers.get_handlers():
# Starlette bug causing this to fail mypy
fastapi_app.add_exception_handler(RedisNotConfigured, handler) # type: ignore
Expand Down
101 changes: 101 additions & 0 deletions src/fides/api/util/security_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import re
from dataclasses import dataclass

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

from fides.config import CONFIG

apply_recommended_headers = CONFIG.security.headers_mode == "recommended"


def is_exact_match(matcher: re.Pattern[str], path_name: str) -> bool:
matched_content = re.fullmatch(matcher, path_name)
return matched_content is not None


HeaderDefinition = tuple[str, str]


@dataclass
class HeaderRule:
matcher: re.Pattern[str]
headers: list[HeaderDefinition]


recommended_csp_header_value = re.sub(
r"\s{2,}",
" ",
"""
default-src 'self';
script-src 'self' 'unsafe-inline';
style-src 'self' 'unsafe-inline';
connect-src 'self';
img-src 'self' blob: data:;
font-src 'self';
object-src 'none';
base-uri 'self';
form-action 'self';
frame-ancestors 'self';
upgrade-insecure-requests;
""",
).strip()

recommended_headers: list[HeaderRule] = [
HeaderRule(
matcher=re.compile(r"/.*"),
headers=[
("X-Content-Type-Options", "nosniff"),
("Strict-Transport-Security", "max-age=31536000"),
],
),
HeaderRule(
matcher=re.compile(r"^/((?!api|health).*)"),
headers=[
(
"Content-Security-Policy",
recommended_csp_header_value,
),
("X-Frame-Options", "SAMEORIGIN"),
],
),
]


def get_applicable_header_rules(
path: str, header_rules: list[HeaderRule]
) -> list[HeaderDefinition]:
header_names: set[str] = set()
header_definitions: list[HeaderDefinition] = []

for rule in header_rules:
if is_exact_match(rule.matcher, path):
for header in rule.headers:
[header_name, _] = header
if header_name not in header_names:
header_names.add(header_name)
header_definitions.append(header)

return header_definitions


def apply_headers_to_response(
headers: list[HeaderRule], request: Request, response: Response
) -> None:
applicable_headers = get_applicable_header_rules(request.url.path, headers)
for [header_name, header_value] in applicable_headers:
response.headers.append(header_name, header_value)


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""
Controls what security headers are included in Fides API responses
"""

async def dispatch(self, request: Request, call_next): # type: ignore
response = await call_next(request)

if apply_recommended_headers:
apply_headers_to_response(recommended_headers, request, response)

return response
6 changes: 5 additions & 1 deletion src/fides/config/security_settings.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""This module handles finding and parsing fides configuration files."""

# pylint: disable=C0115,C0116, E0213
from typing import List, Optional, Pattern, Tuple, Union
from typing import List, Literal, Optional, Pattern, Tuple, Union

from pydantic import Field, SerializeAsAny, ValidationInfo, field_validator
from pydantic_settings import SettingsConfigDict
Expand Down Expand Up @@ -97,6 +97,10 @@ class SecuritySettings(FidesSettings):
default=None,
description="The header used to determine the client IP address for rate limiting. If not set or set to empty string, rate limiting will be disabled.",
)
headers_mode: Union[Literal["none"], Literal["recommended"]] = Field(
default="none",
description="Controls what security headers are included in Fides server responses.",
)
request_rate_limit: str = Field(
default="2000/minute",
description="The number of requests from a single IP address allowed to hit an endpoint within a rolling 60 second period.",
Expand Down
110 changes: 110 additions & 0 deletions tests/api/util/test_security_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import re
from unittest import mock

import pytest
from fastapi import Request, Response

from fides.api.util.security_headers import (
HeaderRule,
apply_headers_to_response,
get_applicable_header_rules,
is_exact_match,
recommended_csp_header_value,
recommended_headers,
)


class TestSecurityHeaders:
@pytest.mark.parametrize(
"pattern,path,expected",
[
(r"\/example-path", "/example-path", True),
(r"\/example-path", "/example-path/with-more-content", False),
(r"\/example-path/?(.*)", "/example-path/with-more-content", True),
(r"\/example-path", "/anti-example-path", False),
(r"\/example-path", "/completely-disparate-no-match", False),
],
)
def test_is_exact_match(self, pattern, path, expected):
assert (is_exact_match(re.compile(pattern), path)) is expected

def test_get_applicable_header_rules_returns_first_matching_rule_for_path(self):
expected_headers: tuple[str, str] = ("header-1", "value-1")
headers: list[HeaderRule] = [
HeaderRule(re.compile(r"\/a"), [expected_headers]),
HeaderRule(re.compile(r"\/a"), [("header-1", "value-2")]),
]

assert get_applicable_header_rules("/a", headers) == [expected_headers]

def test_get_applicable_header_rules_returns_disparate_headers(self):
header1 = "header-1"
header2 = "header-2"
headers1: tuple[str, str] = (header1, "value-1")
headers2: tuple[str, str] = (header2, "value-2")
headers: list[HeaderRule] = [
HeaderRule(re.compile(r"\/a-path"), [headers1]),
HeaderRule(re.compile(r"\/a-path"), [headers2]),
]

assert get_applicable_header_rules("/a-path", headers) == [headers1, headers2]

def test_apply_headers_to_response(self):
header = ("header-1", "value-1")
header_rules: list[HeaderRule] = [HeaderRule(re.compile(r".*"), [header])]

mock_request = mock.Mock(spec=Request)
mock_request.url.path = "/any-path"

response = Response()

apply_headers_to_response(header_rules, mock_request, response)

assert header in response.headers.items()

@pytest.mark.parametrize(
"path,expected",
[
(
"/api/foo",
[
("X-Content-Type-Options", "nosniff"),
("Strict-Transport-Security", "max-age=31536000"),
],
),
(
"/health",
[
("X-Content-Type-Options", "nosniff"),
("Strict-Transport-Security", "max-age=31536000"),
],
),
(
"/privacy-requests",
[
("X-Content-Type-Options", "nosniff"),
("Strict-Transport-Security", "max-age=31536000"),
(
"Content-Security-Policy",
recommended_csp_header_value,
),
("X-Frame-Options", "SAMEORIGIN"),
],
),
(
"/",
[
("X-Content-Type-Options", "nosniff"),
("Strict-Transport-Security", "max-age=31536000"),
(
"Content-Security-Policy",
recommended_csp_header_value,
),
("X-Frame-Options", "SAMEORIGIN"),
],
),
],
)
def test_recommended_headers_api_route(self, path, expected):
applicable_rules = get_applicable_header_rules(path, recommended_headers)
assert applicable_rules == expected
Loading