Files
LinedanceAfspiller/venv/lib/python3.12/site-packages/pydantic_settings/sources/providers/gcp.py
2026-04-09 21:54:18 +02:00

242 lines
9.7 KiB
Python

from __future__ import annotations as _annotations
import warnings
from collections.abc import Iterator, Mapping
from functools import cached_property
from typing import TYPE_CHECKING, Any
from pydantic.fields import FieldInfo
from ..types import SecretVersion
from .env import EnvSettingsSource
if TYPE_CHECKING:
from google.auth import default as google_auth_default
from google.auth.credentials import Credentials
from google.cloud.secretmanager import SecretManagerServiceClient
from pydantic_settings.main import BaseSettings
else:
Credentials = None
SecretManagerServiceClient = None
google_auth_default = None
def import_gcp_secret_manager() -> None:
global Credentials
global SecretManagerServiceClient
global google_auth_default
try:
from google.auth import default as google_auth_default
from google.auth.credentials import Credentials
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=FutureWarning)
from google.cloud.secretmanager import SecretManagerServiceClient
except ImportError as e: # pragma: no cover
raise ImportError(
'GCP Secret Manager dependencies are not installed, run `pip install pydantic-settings[gcp-secret-manager]`'
) from e
class GoogleSecretManagerMapping(Mapping[str, str | None]):
_loaded_secrets: dict[str, str | None]
_secret_client: SecretManagerServiceClient
def __init__(self, secret_client: SecretManagerServiceClient, project_id: str, case_sensitive: bool) -> None:
self._loaded_secrets = {}
self._secret_client = secret_client
self._project_id = project_id
self._case_sensitive = case_sensitive
@property
def _gcp_project_path(self) -> str:
return self._secret_client.common_project_path(self._project_id)
def _select_case_insensitive_secret(self, lower_name: str, candidates: list[str]) -> str:
if len(candidates) == 1:
return candidates[0]
# Sort to ensure deterministic selection (prefer lowercase / ASCII last)
candidates.sort()
winner = candidates[-1]
warnings.warn(
f"Secret collision: Found multiple secrets {candidates} normalizing to '{lower_name}'. "
f"Using '{winner}' for case-insensitive lookup.",
UserWarning,
stacklevel=2,
)
return winner
@cached_property
def _secret_name_map(self) -> dict[str, str]:
mapping: dict[str, str] = {}
# Group secrets by normalized name to detect collisions
normalized_groups: dict[str, list[str]] = {}
secrets = self._secret_client.list_secrets(parent=self._gcp_project_path)
for secret in secrets:
name = self._secret_client.parse_secret_path(secret.name).get('secret', '')
mapping[name] = name
if not self._case_sensitive:
lower_name = name.lower()
if lower_name not in normalized_groups:
normalized_groups[lower_name] = []
normalized_groups[lower_name].append(name)
if not self._case_sensitive:
for lower_name, candidates in normalized_groups.items():
mapping[lower_name] = self._select_case_insensitive_secret(lower_name, candidates)
return mapping
@property
def _secret_names(self) -> list[str]:
return list(self._secret_name_map.keys())
def _secret_version_path(self, key: str, version: str = 'latest') -> str:
return self._secret_client.secret_version_path(self._project_id, key, version)
def _get_secret_value(self, gcp_secret_name: str, version: str = 'latest') -> str | None:
try:
return self._secret_client.access_secret_version(
name=self._secret_version_path(gcp_secret_name, version)
).payload.data.decode('UTF-8')
except Exception:
return None
def __getitem__(self, key: str) -> str | None:
if key in self._loaded_secrets:
return self._loaded_secrets[key]
gcp_secret_name = self._secret_name_map.get(key)
if gcp_secret_name is None and not self._case_sensitive:
gcp_secret_name = self._secret_name_map.get(key.lower())
if gcp_secret_name:
self._loaded_secrets[key] = self._get_secret_value(gcp_secret_name)
else:
raise KeyError(key)
return self._loaded_secrets[key]
def __len__(self) -> int:
return len(self._secret_names)
def __iter__(self) -> Iterator[str]:
return iter(self._secret_names)
class GoogleSecretManagerSettingsSource(EnvSettingsSource):
_credentials: Credentials
_secret_client: SecretManagerServiceClient
_project_id: str
def __init__(
self,
settings_cls: type[BaseSettings],
credentials: Credentials | None = None,
project_id: str | None = None,
env_prefix: str | None = None,
env_parse_none_str: str | None = None,
env_parse_enums: bool | None = None,
secret_client: SecretManagerServiceClient | None = None,
case_sensitive: bool | None = True,
) -> None:
# Import Google Packages if they haven't already been imported
if SecretManagerServiceClient is None or Credentials is None or google_auth_default is None:
import_gcp_secret_manager()
# If credentials or project_id are not passed, then
# try to get them from the default function
if not credentials or not project_id:
_creds, _project_id = google_auth_default()
# Set the credentials and/or project id if they weren't specified
if credentials is None:
credentials = _creds
if project_id is None:
if isinstance(_project_id, str):
project_id = _project_id
else:
raise AttributeError(
'project_id is required to be specified either as an argument or from the google.auth.default. See https://google-auth.readthedocs.io/en/master/reference/google.auth.html#google.auth.default'
)
self._credentials: Credentials = credentials
self._project_id: str = project_id
if secret_client:
self._secret_client = secret_client
else:
self._secret_client = SecretManagerServiceClient(credentials=self._credentials)
super().__init__(
settings_cls,
case_sensitive=case_sensitive,
env_prefix=env_prefix,
env_ignore_empty=False,
env_parse_none_str=env_parse_none_str,
env_parse_enums=env_parse_enums,
)
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
"""Override get_field_value to get the secret value from GCP Secret Manager.
Look for a SecretVersion metadata field to specify a particular SecretVersion.
Args:
field: The field to get the value for
field_name: The declared name of the field
Returns:
A tuple of (value, key, value_is_complex), where `key` is the identifier used
to populate the model (either the field name or an alias, depending on
configuration).
"""
secret_version = next((m.version for m in field.metadata if isinstance(m, SecretVersion)), None)
# If a secret version is specified, try to get that specific version of the secret from
# GCP Secret Manager via the GoogleSecretManagerMapping. This allows different versions
# of the same secret name to be retrieved independently and cached in the GoogleSecretManagerMapping
if secret_version and isinstance(self.env_vars, GoogleSecretManagerMapping):
for field_key, env_name, value_is_complex in self._extract_field_info(field, field_name):
gcp_secret_name = self.env_vars._secret_name_map.get(env_name)
if gcp_secret_name is None and not self.case_sensitive:
gcp_secret_name = self.env_vars._secret_name_map.get(env_name.lower())
if gcp_secret_name:
env_val = self.env_vars._get_secret_value(gcp_secret_name, secret_version)
if env_val is not None:
# If populate_by_name is enabled, return field_name to allow multiple fields
# with the same alias but different versions to be distinguished
if self.settings_cls.model_config.get('populate_by_name'):
return env_val, field_name, value_is_complex
return env_val, field_key, value_is_complex
# If a secret version is specified but not found, we should not fall back to "latest" (default behavior)
# as that would be incorrect. We return None to indicate the value was not found.
return None, field_name, False
val, key, is_complex = super().get_field_value(field, field_name)
# If populate_by_name is enabled, we need to return the field_name as the key
# without this being enabled, you cannot load two secrets with the same name but different versions
if self.settings_cls.model_config.get('populate_by_name') and val is not None:
return val, field_name, is_complex
return val, key, is_complex
def _load_env_vars(self) -> Mapping[str, str | None]:
return GoogleSecretManagerMapping(
self._secret_client, project_id=self._project_id, case_sensitive=self.case_sensitive
)
def __repr__(self) -> str:
return f'{self.__class__.__name__}(project_id={self._project_id!r}, env_nested_delimiter={self.env_nested_delimiter!r})'
__all__ = ['GoogleSecretManagerSettingsSource', 'GoogleSecretManagerMapping']