diff --git a/src/oci-recovery-service-mcp-server/LICENSE.txt b/src/oci-recovery-service-mcp-server/LICENSE.txt new file mode 100644 index 00000000..8dc7c070 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/LICENSE.txt @@ -0,0 +1,35 @@ +Copyright (c) 2025 Oracle and/or its affiliates. + +The Universal Permissive License (UPL), Version 1.0 + +Subject to the condition set forth below, permission is hereby granted to any +person obtaining a copy of this software, associated documentation and/or data +(collectively the "Software"), free of charge and under any and all copyright +rights in the Software, and any and all patent rights owned or freely +licensable by each licensor hereunder covering either (i) the unmodified +Software as contributed to or provided by such licensor, or (ii) the Larger +Works (as defined below), to deal in both + +(a) the Software, and +(b) any piece of software and/or hardware listed in the lrgrwrks.txt file if +one is included with the Software (each a "Larger Work" to which the Software +is contributed by such licensors), + +without restriction, including without limitation the rights to copy, create +derivative works of, display, perform, and distribute the Software and make, +use, sell, offer for sale, import, export, have made, and have sold the +Software and the Larger Work(s), and to sublicense the foregoing rights on +either these or other terms. + +This license is subject to the following condition: +The above copyright notice and either this complete permission notice or at +a minimum a reference to the UPL must be included in all copies or +substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/src/oci-recovery-service-mcp-server/README.md b/src/oci-recovery-service-mcp-server/README.md new file mode 100644 index 00000000..feb8ae39 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/README.md @@ -0,0 +1,40 @@ +# OCI Recovery Service MCP Server + +## Overview + +This server provides tools to interact with the OCI Recovery Service resources. +It includes tools to help with reporting on backups utilizing the Recovery Service. + +## Running the server + +```sh +uv run oracle.oci-recovery-service-mcp-server +``` + +## Tools + +| Tool Name | Description | +| --- | --- | +| list_compartments | List compartments | +| list_databases_using_recovery_service | List databases using recovery service using compartment OCID | +| list_recovery_service_subnets | List registered recovery service subnets using compartment OCID | +| get_tenancy_cost_summary | get the cost and usage information for recovery service usage | + + +⚠️ **NOTE**: All actions are performed with the permissions of the configured OCI CLI profile. We advise least-privilege IAM setup, secure credential management, safe network practices, secure logging, and warn against exposing secrets. + +## Third-Party APIs + +Developers choosing to distribute a binary implementation of this project are responsible for obtaining and providing all required licenses and copyright notices for the third-party code used in order to ensure compliance with their respective open source licenses. + +## Disclaimer + +Users are responsible for their local environment and credential safety. Different language model selections may yield different results and performance. + +## License + +Copyright (c) 2025 Oracle and/or its affiliates. + +Released under the Universal Permissive License v1.0 as shown at +. + diff --git a/src/oci-recovery-service-mcp-server/__init__.py b/src/oci-recovery-service-mcp-server/__init__.py new file mode 100644 index 00000000..d1880c10 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/__init__.py @@ -0,0 +1,8 @@ +""" +Copyright (c) 2025, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +__project__ = "oracle.oci-recovery-service-mcp-server" +__version__ = "1.0.1" diff --git a/src/oci-recovery-service-mcp-server/models.py b/src/oci-recovery-service-mcp-server/models.py new file mode 100644 index 00000000..2051e5b8 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/models.py @@ -0,0 +1,324 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class Compartment(BaseModel): + """Represents a compartment entry from list_compartments().""" + + compartment_ocid: str = Field( + ..., + description="OCID of the compartment returned by OCI Identity.", + ) + name: str = Field( + ..., + description="Human-readable name of the compartment.", + ) + lifecycle_state: Optional[str] = Field( + None, + description="Lifecycle state of the compartment (e.g. ACTIVE, DELETING, DELETED).", + ) + is_accessible: Optional[bool] = Field( + None, + description="Whether the current principal has ACCESSIBLE visibility into the compartment.", + ) + + @classmethod + def from_oci(cls, c: Any) -> "Compartment": + """Create a Compartment model from an OCI SDK Compartment object.""" + return cls( + compartment_ocid=c.id, + name=c.name, + lifecycle_state=getattr(c, "lifecycle_state", None), + is_accessible=getattr(c, "is_accessible", None), + ) + + +class ProtectedDatabase(BaseModel): + """Flattened view of a protected database from Recovery Service.""" + + # Identity / location + protected_database_id: Optional[str] = Field( + None, + description="OCID of the protected database resource in Recovery Service.", + ) + display_name: Optional[str] = Field( + None, + description="Human-readable display name of the protected database.", + ) + db_unique_name: Optional[str] = Field( + None, + description="Oracle Database unique name for the protected database.", + ) + database_id: Optional[str] = Field( + None, + description="OCID of the source/original database (ADB, DB System, or Exadata Cloud Service).", + ) + compartment_id: Optional[str] = Field( + None, + description="OCID of the compartment that contains the protected database.", + ) + + # Health / lifecycle + health: Optional[str] = Field( + None, + description="High-level health status of the database backups in Recovery Service.", + ) + health_details: Optional[str] = Field( + None, + description="Detailed explanation of backup health or any detected issues.", + ) + lifecycle_state: Optional[str] = Field( + None, + description="Lifecycle state of the protected database resource (e.g. ACTIVE, UPDATING, DELETING, DELETED, FAILED).", + ) + time_created: Optional[datetime] = Field( + None, + description="Timestamp when the database was registered with Recovery Service.", + ) + + # Metrics (flattened) + database_size_in_gbs: Optional[float] = Field( + None, + description="Approximate size of the source database, in GiB.", + ) + backup_space_used_in_gbs: Optional[float] = Field( + None, + description="Total backup space currently consumed for this database in ARS, in GiB.", + ) + backup_space_estimate_in_gbs: Optional[float] = Field( + None, + description="Estimated full backup space required to satisfy the configured recovery window, in GiB.", + ) + retention_period_in_days: Optional[int] = Field( + None, + description="Configured backup retention period in days.", + ) + current_retention_period_in_seconds: Optional[int] = Field( + None, + description="Current effective retention window length, in seconds.", + ) + minimum_recovery_needed: Optional[float] = Field( + None, + description=( + "Minimum recovery needed (derived from minimum_recovery_needed_in_days in metrics). " + "Represents the minimum amount of redo that must be applied for recovery." + ), + ) + unprotected_window_in_seconds: Optional[int] = Field( + None, + description=( + "Estimated time window of unprotected data (recovery gap) in seconds. " + "This effectively corresponds to the Recovery Point Objective (RPO)." + ), + ) + is_redo_logs_enabled: Optional[bool] = Field( + None, + description="Indicates whether redo log shipping to Recovery Service is enabled.", + ) + + # Protection policy / vault + protection_policy_id: Optional[str] = Field( + None, + description="OCID of the protection policy currently applied to this protected database.", + ) + protection_policy_name: Optional[str] = Field( + None, + description="Display name of the protection policy applied to this database, if resolvable.", + ) + policy_locked_date_time: Optional[datetime] = Field( + None, + description=( + "Timestamp until which backups are immutable/locked by the protection policy. " + "Represents the 'lock until' time for backup immutability." + ), + ) + recovery_service_vault_id: Optional[str] = Field( + None, + description="OCID of the Recovery Service vault that stores this database's backups.", + ) + + # Other flags + is_redo_transmission_enabled: Optional[bool] = Field( + None, + description="Whether redo data is transmitted to Recovery Service in near real-time.", + ) + description: Optional[str] = Field( + None, + description="Optional free-form description for the protected database resource.", + ) + + @classmethod + def from_oci(cls, p: Any, policy_name: Optional[str] = None) -> "ProtectedDatabase": + """ + Create a ProtectedDatabase model from an OCI SDK ProtectedDatabaseSummary + (or similar) object plus an optional resolved protection policy name. + """ + metrics = getattr(p, "metrics", None) + + return cls( + protected_database_id=getattr(p, "id", None), + display_name=getattr(p, "display_name", None), + db_unique_name=getattr(p, "db_unique_name", None), + database_id=getattr(p, "database_id", None), + compartment_id=getattr(p, "compartment_id", None), + health=getattr(p, "health", None), + health_details=getattr(p, "health_details", None), + lifecycle_state=getattr(p, "lifecycle_state", None), + time_created=getattr(p, "time_created", None), + database_size_in_gbs=getattr(metrics, "db_size_in_gbs", None), + backup_space_used_in_gbs=getattr(metrics, "backup_space_used_in_gbs", None), + minimum_recovery_needed=getattr( + metrics, "minimum_recovery_needed_in_days", None + ), + unprotected_window_in_seconds=getattr( + metrics, "unprotected_window_in_seconds", None + ), + retention_period_in_days=getattr( + metrics, "retention_period_in_days", None + ), + current_retention_period_in_seconds=getattr( + metrics, "current_retention_period_in_seconds", None + ), + backup_space_estimate_in_gbs=getattr( + metrics, "backup_space_estimate_in_gbs", None + ), + protection_policy_id=getattr(p, "protection_policy_id", None), + policy_locked_date_time=getattr(p, "policy_locked_date_time", None), + protection_policy_name=policy_name, + recovery_service_vault_id=getattr(p, "recovery_service_vault_id", None), + is_redo_transmission_enabled=getattr( + p, "is_redo_transmission_enabled", None + ), + description=getattr(p, "description", None), + is_redo_logs_enabled=getattr(p, "is_redo_logs_enabled", None), + ) + +# models.py + +class RecoveryServiceSubnets(BaseModel): + """Flattened view of registered subnets from Recovery Service.""" + + # Identity / location + id: Optional[str] = Field( + None, + description="OCID of the recovery service subnet resource.", + ) + display_name: Optional[str] = Field( + None, + description="Display name of the recovery service subnet resource.", + ) + compartment_id: Optional[str] = Field( + None, + description="OCID of the compartment that contains the recovery service subnet.", + ) + + # VCN information + vcn_id: Optional[str] = Field( + None, + description="OCID of the VCN containing the recovery service subnet.", + ) + vcn_name: Optional[str] = Field( + None, + description="Human-readable display name of the VCN (resolved via VirtualNetworkClient).", + ) + + # Subnet information + subnet_id: Optional[str] = Field( + None, + description="OCID of the underlying OCI subnet associated with the recovery service subnet.", + ) + subnet_name: Optional[str] = Field( + None, + description="Display name of the underlying OCI subnet (resolved via VirtualNetworkClient).", + ) + + # Lifecycle + lifecycle_state: Optional[str] = Field( + None, + description="Lifecycle state of the recovery service subnet (e.g. ACTIVE, UPDATING, DELETING, DELETED, FAILED).", + ) + time_created: Optional[datetime] = Field( + None, + description="Timestamp when the recovery service subnet was created.", + ) + + @classmethod + def from_oci( + cls, + s: Any, + subnet_name: Optional[str] = None, + vcn_name: Optional[str] = None, + ) -> "RecoveryServiceSubnets": + """ + Create a RecoveryServiceSubnets model from an OCI SDK RecoveryServiceSubnetSummary + (or similar) object plus optional resolved VCN and Subnet names. + """ + return cls( + id=getattr(s, "id", None), + display_name=getattr(s, "display_name", None), + compartment_id=getattr(s, "compartment_id", None), + lifecycle_state=getattr(s, "lifecycle_state", None), + time_created=getattr(s, "time_created", None), + vcn_id=getattr(s, "vcn_id", None), + subnet_id=getattr(s, "subnet_id", None), + subnet_name=subnet_name, + vcn_name=vcn_name, + ) + +class TenancyCostSummary(BaseModel): + """Summary view of cost information from Usage API.""" + + start: str = Field( + ..., + description="Start of the usage window in ISO-8601 format (UTC, with trailing 'Z').", + ) + end: str = Field( + ..., + description="End of the usage window in ISO-8601 format (UTC, with trailing 'Z').", + ) + granularity: str = Field( + ..., + description="Aggregation granularity used for summarization (e.g. DAILY or MONTHLY).", + ) + total_computed_amount: float = Field( + ..., + description="Total cost (e.g. in the tenancy's billing currency) over the window.", + ) + total_computed_usage: float = Field( + ..., + description="Total usage quantity over the window (units depend on the service and metric).", + ) + items: List[Dict[str, Any]] = Field( + default_factory=list, + description=( + "Raw Usage API summarized usage line items as dictionaries, " + "one per resourceId/interval/grouping." + ), + ) + + @classmethod + def from_usage_api( + cls, + start: datetime, + end: datetime, + granularity: str, + rows: List[Dict[str, Any]], + ) -> "TenancyCostSummary": + """ + Build a TenancyCostSummary from raw Usage API rows and time bounds. + """ + total_cost = sum((r.get("computed_amount", 0) or 0) for r in rows) + total_usage = sum((r.get("computed_quantity", 0) or 0) for r in rows) + + return cls( + start=start.isoformat() + "Z", + end=end.isoformat() + "Z", + granularity=granularity, + total_computed_amount=total_cost, + total_computed_usage=total_usage, + items=rows, + ) diff --git a/src/oci-recovery-service-mcp-server/pyproject.toml b/src/oci-recovery-service-mcp-server/pyproject.toml new file mode 100644 index 00000000..e04d1c87 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/pyproject.toml @@ -0,0 +1,52 @@ +[project] +name = "oracle.oci-recovery-service-mcp-server" +version = "1.1.0" +description = "OCI Recovery Service MCP server" +readme = "README.md" +requires-python = ">=3.13" +license = "UPL-1.0" +license-files = ["LICENSE.txt"] +authors = [ + {name = "Oracle MCP", email = "237432095+oracle-mcp@users.noreply.github.com"}, +] +dependencies = [ + "fastmcp==2.13.0", + "oci==2.160.0", + "pydantic==2.12.3", +] + +classifiers = [ + "License :: OSI Approved :: Universal Permissive License (UPL)", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 3.13", +] + +[project.scripts] +"oracle.oci-recovery-service-mcp-server" = "oracle.oci_recovery_service_mcp_server.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["oracle"] + +[dependency-groups] +dev = [ + "pytest>=8.4.2", + "pytest-asyncio>=1.2.0", + "pytest-cov>=7.0.0", +] + +[tool.coverage.run] +omit = [ + "**/__init__.py", + "**/tests/*", + "dist/*", + ".venv/*", +] + +[tool.coverage.report] +precision = 2 +fail_under = 69.64 diff --git a/src/oci-recovery-service-mcp-server/server.py b/src/oci-recovery-service-mcp-server/server.py new file mode 100644 index 00000000..d8f17f5e --- /dev/null +++ b/src/oci-recovery-service-mcp-server/server.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +OCI MCP Server +- Tools for Compute / DB / Object Storage discovery and simple actions +- Resource providers (e.g., compartments) +- A prompt for summarizing findings + +Transports: stdio (default) +""" + +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime, timedelta, time +from typing import Any, Dict, List, Optional + +from dotenv import load_dotenv + +# MCP (official Python SDK) +from mcp.server.fastmcp import FastMCP + +# OCI SDK +import oci +from oci.util import to_dict + +# Local models +from models import Compartment, ProtectedDatabase, TenancyCostSummary, RecoveryServiceSubnets + +# ---------- Logging & env ---------- +load_dotenv() +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +log = logging.getLogger("oci-mcp") + + +# ---------- OCI helper ---------- + +class OCIManager: + """Simple manager to create OCI clients using ~/.oci/config or env-based auth.""" + + def __init__(self) -> None: + self.config = self._load_config() + self.signer = None # for instance principals etc. + + def _load_config(self) -> Dict[str, Any]: + # Prefer config file if present + cfg_file = os.getenv("OCI_CONFIG_FILE", os.path.expanduser("~/.oci/config")) + profile = os.getenv("OCI_CONFIG_PROFILE", "DEFAULT") + if os.path.exists(cfg_file): + log.info(f"Using OCI config file: {cfg_file} [{profile}]") + return oci.config.from_file(cfg_file, profile_name=profile) + + # Else try explicit env vars + env_keys = ( + "OCI_USER_OCID", + "OCI_FINGERPRINT", + "OCI_TENANCY_OCID", + "OCI_REGION", + "OCI_KEY_FILE", + ) + if all(os.getenv(k) for k in env_keys): + cfg = { + "user": os.environ["OCI_USER_OCID"], + "fingerprint": os.environ["OCI_FINGERPRINT"], + "tenancy": os.environ["OCI_TENANCY_OCID"], + "region": os.environ["OCI_REGION"], + "key_file": os.environ["OCI_KEY_FILE"], + } + log.info("Using explicit OCI env var configuration") + return cfg + + # Finally, try instance principals (for servers running on OCI) + try: + self.signer = oci.auth.signers.get_resource_principals_signer() + region = os.getenv("OCI_REGION", "ap-ashburn-1") + cfg = {"region": region, "tenancy": os.getenv("OCI_TENANCY_OCID", "")} + log.info("Using instance/resource principals signer") + return cfg + except Exception: + raise RuntimeError( + "No OCI credentials found. Run `oci setup config` or set env vars " + "(OCI_USER_OCID, OCI_FINGERPRINT, OCI_TENANCY_OCID, OCI_REGION, OCI_KEY_FILE)." + ) + + def get_client(self, service: str): + """Return an OCI service client bound to configured region/signer.""" + service = service.lower() + kwargs: Dict[str, Any] = {} + if self.signer: + kwargs["signer"] = self.signer + + if service in ("identity", "iam"): + return oci.identity.IdentityClient(self.config, **kwargs) + if service in ("recovery", "ars", "rcv", "zrcv"): + return oci.recovery.DatabaseRecoveryClient(self.config, **kwargs) + if service in ("virtual_network", "vcn", "subnet"): + return oci.core.VirtualNetworkClient(self.config, **kwargs) + if service in ("usage", "usage_api", "cost"): + try: + return oci.usage_api.UsageapiClient(self.config, **kwargs) # type: ignore[attr-defined] + except Exception as e: + raise RuntimeError( + "Usage API client not available; check OCI SDK version." + ) from e + + raise ValueError(f"Unknown OCI service: {service}") + + +oci_manager = OCIManager() + + +# ---------- Utility helpers ---------- + +def _default_compartment() -> Optional[str]: + """Return default compartment (env override or tenancy OCID).""" + return os.getenv("DEFAULT_COMPARTMENT_OCID") or oci_manager.config.get("tenancy") + + +def _to_clean_dict(x: Any) -> Any: + """Safe dict conversion for OCI models/collections.""" + try: + return to_dict(x) + except Exception: + return json.loads(json.dumps(x, default=str)) + + +# ---------- MCP server ---------- + +mcp = FastMCP("oci-mcp-server") + + +@mcp.tool() +def list_compartments() -> List[Compartment]: + """List accessible compartments in the tenancy (including subtrees).""" + identity = oci_manager.get_client("identity") + tenancy_id = oci_manager.config["tenancy"] + + comps = oci.pagination.list_call_get_all_results( + identity.list_compartments, + compartment_id=tenancy_id, + compartment_id_in_subtree=True, + access_level="ACCESSIBLE", + ).data + + # Let the model handle mapping + return [Compartment.from_oci(c) for c in comps] + + +@mcp.tool() +def list_databases_using_recovery_service( + compartment_ocid: str, + lifecycle_state: Optional[str] = None, +) -> List[ProtectedDatabase]: + """List databases that are protected by Autonomous Recovery Service (ARS). + + Args: + compartment_ocid: Compartment OCID (defaults to tenancy if omitted) + lifecycle_state: Optional filter (e.g., ACTIVE, CREATING, DELETING, DELETED, FAILED) + """ + comp = compartment_ocid or _default_compartment() + if not comp: + raise ValueError( + "No compartment OCID available. Pass compartment_ocid explicitly " + "or set DEFAULT_COMPARTMENT_OCID." + ) + + recovery = oci_manager.get_client("recovery") + + # 1) Fetch protected databases + protected = oci.pagination.list_call_get_all_results( + recovery.list_protected_databases, + compartment_id=comp, + ).data + + # Optional lifecycle filter (case-insensitive) + if lifecycle_state: + want = lifecycle_state.upper() + protected = [ + p + for p in protected + if getattr(p, "lifecycle_state", "").upper() == want + ] + + # 2) Collect unique policy IDs so we can resolve their names once + policy_ids = {getattr(p, "protection_policy_id", None) for p in protected} + policy_ids.discard(None) + + policy_name_by_id: Dict[str, str] = {} + for pid in policy_ids: + try: + pol = recovery.get_protection_policy(pid).data + pname = ( + getattr(pol, "display_name", None) + or getattr(pol, "name", None) + or pid + ) + policy_name_by_id[pid] = pname + except Exception: + policy_name_by_id[pid] = pid # still useful + + # 3) Normalize output into ProtectedDatabase models using the model helper + items: List[ProtectedDatabase] = [] + for p in protected: + policy_name = policy_name_by_id.get( + getattr(p, "protection_policy_id", None) + ) + items.append(ProtectedDatabase.from_oci(p, policy_name=policy_name)) + + return items + + + +@mcp.tool() +def list_recovery_service_subnets( + compartment_ocid: str, + lifecycle_state: Optional[str] = None, +) -> List[RecoveryServiceSubnets]: + """List Recovery Service Subnets used by Autonomous Recovery Service (ARS). + This can be used to help determine the OCID needed for configuring Cloud Protect + + Args: + compartment_ocid: Compartment OCID (defaults to tenancy if omitted) + lifecycle_state: Optional filter (e.g., ACTIVE, CREATING, DELETING, DELETED, FAILED) + """ + comp = compartment_ocid or _default_compartment() + if not comp: + raise ValueError( + "No compartment OCID available. Pass compartment_ocid explicitly " + "or set DEFAULT_COMPARTMENT_OCID." + ) + + recovery = oci_manager.get_client("recovery") + vcn_client = oci_manager.get_client("virtual_network") + + # 1) Fetch Recovery Service subnets + subnets = oci.pagination.list_call_get_all_results( + recovery.list_recovery_service_subnets, + compartment_id=comp, + ).data + + # Optional lifecycle filter (case-insensitive) + if lifecycle_state: + want = lifecycle_state.upper() + subnets = [ + s + for s in subnets + if getattr(s, "lifecycle_state", "").upper() == want + ] + + # 2) Resolve subnet names once per unique subnet_id + subnet_ids = {getattr(s, "subnet_id", None) for s in subnets} + subnet_ids.discard(None) + + subnet_name_by_id: Dict[str, str] = {} + for sid in subnet_ids: + try: + sub = vcn_client.get_subnet(sid).data + sname = ( + getattr(sub, "display_name", None) + or getattr(sub, "name", None) + or sid + ) + subnet_name_by_id[sid] = sname + except Exception: + subnet_name_by_id[sid] = sid # still at least return OCID + + # 3) Resolve VCN names once per unique vcn_id + vcn_ids = {getattr(s, "vcn_id", None) for s in subnets} + vcn_ids.discard(None) + + vcn_name_by_id: Dict[str, str] = {} + for vid in vcn_ids: + try: + vcn_info = vcn_client.get_vcn(vid).data + vname = ( + getattr(vcn_info, "display_name", None) + or getattr(vcn_info, "name", None) + or vid + ) + vcn_name_by_id[vid] = vname + except Exception: + vcn_name_by_id[vid] = vid + + # 4) Normalize into RecoveryServiceSubnets models + items: List[RecoveryServiceSubnets] = [] + for s in subnets: + subnet_name = subnet_name_by_id.get(getattr(s, "subnet_id", None)) + vcn_name = vcn_name_by_id.get(getattr(s, "vcn_id", None)) + items.append( + RecoveryServiceSubnets.from_oci( + s, + subnet_name=subnet_name, + vcn_name=vcn_name, + ) + ) + + return items + +@mcp.tool() +def get_tenancy_cost_summary( + start_time_iso: Optional[str] = None, + end_time_iso: Optional[str] = None, + granularity: str = "DAILY", +) -> TenancyCostSummary: + """Summarize tenancy costs using Usage API (requires permissions). + + Args: + start_time_iso: ISO8601 start date (UTC). If provided, time component is ignored and truncated to midnight. + end_time_iso: ISO8601 end date (UTC). If provided, time component is ignored and truncated to midnight. + granularity: DAILY or MONTHLY + """ + try: + usage = oci_manager.get_client("usage_api") + except Exception as e: + raise RuntimeError( + "Usage API not available; upgrade OCI SDK and permissions." + ) from e + + # --- Normalize start/end to midnight UTC (required by Usage API) --- + + if not end_time_iso: + # default: today, but truncated to date boundary + end_date = datetime.utcnow().date() + else: + end_dt = datetime.fromisoformat(end_time_iso.replace("Z", "")) + end_date = end_dt.date() + + if not start_time_iso: + # default: 7 days before end_date + start_date = end_date - timedelta(days=7) + else: + start_dt = datetime.fromisoformat(start_time_iso.replace("Z", "")) + start_date = start_dt.date() + + # Usage API requires 00:00:00, so combine with midnight + start = datetime.combine(start_date, time(0, 0, 0)) + end = datetime.combine(end_date, time(0, 0, 0)) + + + tenant_id = oci_manager.config["tenancy"] + details = oci.usage_api.models.RequestSummarizedUsagesDetails( + tenant_id=tenant_id, + time_usage_started=start, + time_usage_ended=end, + granularity=granularity, + query_type="USAGE", + filter={ + "operator": "AND", + "dimensions": [{"key": "service", "value": "AUTONOMOUS_RECOVERY"}], + }, + group_by=["resourceId"], + ) + + resp = usage.request_summarized_usages( + request_summarized_usages_details=details + ) + rows = ( + [to_dict(x) for x in resp.data.items] + if getattr(resp.data, "items", None) + else [] + ) + + # Let the model compute totals and shape the response + return TenancyCostSummary.from_usage_api( + start=start, + end=end, + granularity=granularity, + rows=rows, + ) + + +# ----------- Resources ----------- + +@mcp.resource("oci://compartments") +def resource_compartments() -> Dict[str, Any]: + """Resource listing compartments (compartment_ocid, name).""" + comps = [c.dict() for c in list_compartments()] + return {"compartments": comps} + + +# ----------- Prompts ----------- + +@mcp.prompt("oci_analysis_prompt") +def oci_analysis_prompt() -> str: + """A helper prompt to analyze OCI state returned by the tools.""" + return ( + "You are an expert Oracle Cloud architect. Given the JSON outputs from tools like " + "`list_compartments`, `list_databases_using_recovery_service`,`list_recovery_service_subnets` and `get_tenancy_cost_summary`, " + "produce a concise assessment covering cost, usage, backup protection and reliability. " + "You can also return return information on Recovery Service sbubnets including the VCN and subnet that is registered " + "Highlight RPO and RTO for databases and note any missing monitoring/alerts." + ) + + +def main() -> None: + # Start stdio transport + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/src/oci-recovery-service-mcp-server/tests/changes.txt b/src/oci-recovery-service-mcp-server/tests/changes.txt new file mode 100644 index 00000000..4f2c8823 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/tests/changes.txt @@ -0,0 +1,43 @@ +Oracle#71 Add MCP server for Recovery service + +# Description + +This is a new MCP server that can be used to retrieve information on Protected databases. +- List of databases in a compartment (compartment OCID is required) +- List of recovery service subnets in a compartment (compartment OCID is required) +- Space usage for protected databases. + +Fixes # 71 (issue) + +## Type of change + + +- [ ] New feature (non-breaking change which adds functionality) + + +# How Has This Been Tested? + +Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration + +- [ ] Test A - List compartments +- [ ] Test B - List protected databases (lists none because you must enter a relevant compartment OCID) +- [ ] Test C - List usage and cost for a range of dates +- [ ] Test D - List recovery service subnets (lists none because you must enter a relevant compartment OCID) + + +**Test Configuration**: +* Firmware version: +* Hardware: +* Toolchain: +* SDK: + +# Checklist: + +- [ ] My code follows the style guidelines of this project +- [ ] I have performed a self-review of my own code +- [ ] I have commented my code, particularly in hard-to-understand areas +- [ ] I have made corresponding changes to the documentation +- [ ] My changes generate no new warnings +- [ ] I have added tests that prove my fix is effective or that my feature works +- [ ] New and existing unit tests pass locally with my changes +- [ ] Any dependent changes have been merged and published in downstream modules diff --git a/src/oci-recovery-service-mcp-server/tests/test_recovery_service_tools.py b/src/oci-recovery-service-mcp-server/tests/test_recovery_service_tools.py new file mode 100644 index 00000000..b027ed47 --- /dev/null +++ b/src/oci-recovery-service-mcp-server/tests/test_recovery_service_tools.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +r""" +test_recovery_service_tools.py - Local test harness for the OCI MCP server tools. + +Usage (from C:\mcp-main\mcp-main\src\oci-recovery-service-mcp-server\oracle\oci_recovery_service_mcp_server\tests): + C:\Python314\python.exe test_recovery_service_tools.py +""" + +from __future__ import annotations + +import os +import sys +import traceback +from pathlib import Path +from pprint import pprint + +# --- ensure we can import server.py no matter where we run this from --- + +HERE = Path(__file__).resolve() +PACKAGE_ROOT = HERE.parents[1] # .../oci_recovery_service_mcp_server +sys.path.insert(0, str(PACKAGE_ROOT)) + + + +# Import the tool functions and helper from your server module +from server import ( # type: ignore[import] + list_compartments, + list_databases_using_recovery_service, + get_tenancy_cost_summary, + list_recovery_service_subnets, + _default_compartment, +) + + +def test_list_compartments() -> None: + print("\n=== 1) Testing list_compartments() ===") + try: + comps = list_compartments() + print(f"Retrieved {len(comps)} compartments.") + for c in comps[:10]: # show at most 10 for brevity + print( + f"- {c.name} " + f"(ocid={c.compartment_ocid}) " + f"state={c.lifecycle_state} " + f"accessible={c.is_accessible}" + ) + if len(comps) > 10: + print(f"... ({len(comps) - 10} more not shown)") + except Exception: + print("Error while calling list_compartments():") + traceback.print_exc() + + +def test_list_databases_using_recovery_service() -> None: + print("\n=== 2) Testing list_databases_using_recovery_service() ===") + try: + # Prefer an explicit test compartment if set + comp = os.getenv("TEST_COMPARTMENT_OCID") or _default_compartment() + if not comp: + print( + "No compartment OCID available.\n" + "Set TEST_COMPARTMENT_OCID or DEFAULT_COMPARTMENT_OCID or ensure\n" + "your tenancy OCID is present in the OCI config used by server.py." + ) + return + + print(f"Using compartment_ocid = {comp}") + dbs = list_databases_using_recovery_service(compartment_ocid=comp) + + print(f"Retrieved {len(dbs)} protected databases.") + for d in dbs[:10]: + print( + f"- {d.display_name or d.db_unique_name or d.protected_database_id} " + f"(protected_database_id={d.protected_database_id}, " + f"database_id={d.database_id}, " + f"health={d.health}, " + f"lifecycle_state={d.lifecycle_state})" + ) + if len(dbs) > 10: + print(f"... ({len(dbs) - 10} more not shown)") + except Exception: + print("Error while calling list_databases_using_recovery_service():") + traceback.print_exc() + + +def test_list_recovery_service_subnets() -> None: + print("\n=== 2) Testing list_recovery_service_subnets() ===") + try: + # Prefer an explicit test compartment if set + comp = os.getenv("TEST_COMPARTMENT_OCID") or _default_compartment() + if not comp: + print( + "No compartment OCID available.\n" + "Set TEST_COMPARTMENT_OCID or DEFAULT_COMPARTMENT_OCID or ensure\n" + "your tenancy OCID is present in the OCI config used by server.py." + ) + return + + print(f"Using compartment_ocid = {comp}") + dbs = list_list_recovery_service_subnets(compartment_ocid=comp) + + print(f"Retrieved {len(rsss)} recovery service subnets.") + for s in rsss[:10]: + print( + f"- {s.display_name } " + f"(recovery_service_id={s.recovery_service_id}, " + f"health={d.health}, " + f"lifecycle_state={d.lifecycle_state})" + ) + if len(rsss) > 10: + print(f"... ({len(rsss) - 10} more not shown)") + except Exception: + print("Error while calling list_list_recovery_service_subnets():") + traceback.print_exc() + + +def test_get_tenancy_cost_summary() -> None: + print("\n=== 3) Testing get_tenancy_cost_summary() ===") + try: + # Let the function use its default of last 7 days, DAILY + summary = get_tenancy_cost_summary() + + print("Cost summary window:") + print(f" start : {summary.start}") + print(f" end : {summary.end}") + print(f" granularity: {summary.granularity}") + print(f" total cost : {summary.total_computed_amount}") + print(f" total usage: {summary.total_computed_usage}") + print(f" items : {len(summary.items)} summarized rows") + + # Show first few raw rows for inspection + for row in summary.items[:5]: + print(" - row:") + pprint(row) + if len(summary.items) > 5: + print(f"... ({len(summary.items) - 5} more rows not shown)") + except Exception: + print("Error while calling get_tenancy_cost_summary():") + traceback.print_exc() + + +def main() -> None: + print("=== OCI MCP Server Local Test (test_recovery_service_tools.py) ===") + print("Using environment / OCI config from server.py (dotenv + OCI config).") + + test_list_compartments() + test_list_databases_using_recovery_service() + test_get_tenancy_cost_summary() + + print("\n=== Done ===") + + +if __name__ == "__main__": + main() \ No newline at end of file