diff --git a/.github/scripts/publish_script.py b/.github/scripts/publish_script.py index ca176d9..24e1484 100644 --- a/.github/scripts/publish_script.py +++ b/.github/scripts/publish_script.py @@ -12,6 +12,7 @@ SECRET_ACCESS_KEY = os.environ["R2_SECRET_ACCESS_KEY"] PROD_BUCKET = os.environ["R2_PRODUCTION_BUCKET"] STAGING_BUCKET = os.environ["R2_STAGING_BUCKET"] +INTERNAL_BUCKET = os.environ["R2_INTERNAL_BUCKET"] ENDPOINT_URL = f"https://{ACCOUNT_ID}.r2.cloudflarestorage.com" MANIFEST_FILE = "manifest.json" DATASETS_DOC_PATH = "docs/source/datasets.md" @@ -134,23 +135,32 @@ def handle_deletions(manifest_data: list[dict[str, Any]]) -> bool: processed_deletion = False for dataset in manifest_data: + # Get the bucket type for this dataset + bucket_type = dataset.get("bucket", "production") + target_bucket = INTERNAL_BUCKET if bucket_type == "internal" else PROD_BUCKET + bucket_name = "internal" if bucket_type == "internal" else "production" + if dataset.get("status") == "pending-deletion": processed_deletion = True - print(f"Found dataset marked for full deletion: {dataset['fileName']}") + print( + f"Found dataset marked for full deletion: {dataset['fileName']} from {bucket_name} bucket" + ) for entry in dataset.get("history", []): if "r2_object_key" in entry: - objects_to_delete_from_r2.append({"Key": entry["r2_object_key"]}) + objects_to_delete_from_r2.append( + {"Key": entry["r2_object_key"], "Bucket": target_bucket} + ) else: versions_to_keep = [] for entry in dataset.get("history", []): if entry.get("status") == "pending-deletion": processed_deletion = True print( - f"Found version marked for deletion: {dataset['fileName']} v{entry['version']}" + f"Found version marked for deletion: {dataset['fileName']} v{entry['version']} from {bucket_name} bucket" ) if "r2_object_key" in entry: objects_to_delete_from_r2.append( - {"Key": entry["r2_object_key"]} + {"Key": entry["r2_object_key"], "Bucket": target_bucket} ) else: versions_to_keep.append(entry) @@ -162,18 +172,44 @@ def handle_deletions(manifest_data: list[dict[str, Any]]) -> bool: return False if objects_to_delete_from_r2: - print( - f"\nDeleting {len(objects_to_delete_from_r2)} objects from production R2 bucket..." - ) - for i in range(0, len(objects_to_delete_from_r2), 1000): - chunk: Any = objects_to_delete_from_r2[i : i + 1000] - response = client.delete_objects( - Bucket=PROD_BUCKET, Delete={"Objects": chunk, "Quiet": True} + # Group objects by bucket for deletion + prod_objects = [ + obj for obj in objects_to_delete_from_r2 if obj["Bucket"] == PROD_BUCKET + ] + internal_objects = [ + obj for obj in objects_to_delete_from_r2 if obj["Bucket"] == INTERNAL_BUCKET + ] + + if prod_objects: + print( + f"\nDeleting {len(prod_objects)} objects from production R2 bucket..." + ) + for i in range(0, len(prod_objects), 1000): + chunk: Any = prod_objects[i : i + 1000] + objects_only = [{"Key": obj["Key"]} for obj in chunk] + response = client.delete_objects( + Bucket=PROD_BUCKET, Delete={"Objects": objects_only, "Quiet": True} + ) + if response.get("Errors"): + print(" ❌ ERROR during batch deletion:", response["Errors"]) + exit(1) + print("✅ Successfully deleted objects from production R2 bucket.") + + if internal_objects: + print( + f"\nDeleting {len(internal_objects)} objects from internal R2 bucket..." ) - if response.get("Errors"): - print(" ❌ ERROR during batch deletion:", response["Errors"]) - exit(1) - print("✅ Successfully deleted objects from R2.") + for i in range(0, len(internal_objects), 1000): + chunk: Any = internal_objects[i : i + 1000] + objects_only = [{"Key": obj["Key"]} for obj in chunk] + response = client.delete_objects( + Bucket=INTERNAL_BUCKET, + Delete={"Objects": objects_only, "Quiet": True}, + ) + if response.get("Errors"): + print(" ❌ ERROR during batch deletion:", response["Errors"]) + exit(1) + print("✅ Successfully deleted objects from internal R2 bucket.") finalize_manifest(datasets_to_keep, "ci: Finalize manifest after data deletion") return True @@ -186,6 +222,11 @@ def handle_publications(manifest_data: list[dict[str, Any]]) -> bool: """ print("\n--- Phase 2: Checking for pending publications ---") for dataset in manifest_data: + # Get the bucket type for this dataset + bucket_type = dataset.get("bucket", "production") + target_bucket = INTERNAL_BUCKET if bucket_type == "internal" else PROD_BUCKET + bucket_name = "internal" if bucket_type == "internal" else "production" + for i, entry in enumerate(dataset["history"]): if entry.get("commit") == "pending-merge": commit_details = get_commit_details() @@ -197,7 +238,9 @@ def handle_publications(manifest_data: list[dict[str, Any]]) -> bool: if "staging_key" in entry and entry["staging_key"]: staging_key = entry.pop("staging_key") final_key = entry["r2_object_key"] - print(f"Publishing: {dataset['fileName']} v{entry['version']}") + print( + f"Publishing: {dataset['fileName']} v{entry['version']} to {bucket_name} bucket" + ) print(f" Description: {entry['description']}") try: copy_source: Any = { @@ -205,9 +248,11 @@ def handle_publications(manifest_data: list[dict[str, Any]]) -> bool: "Key": staging_key, } client.copy_object( - CopySource=copy_source, Bucket=PROD_BUCKET, Key=final_key + CopySource=copy_source, Bucket=target_bucket, Key=final_key + ) + print( + f" ✅ Server-side copy to {bucket_name} bucket successful." ) - print(" ✅ Server-side copy successful.") client.delete_object(Bucket=STAGING_BUCKET, Key=staging_key) print(" ✅ Staging object deleted.") except ClientError as e: @@ -215,14 +260,14 @@ def handle_publications(manifest_data: list[dict[str, Any]]) -> bool: exit(1) else: print( - f"Finalizing rollback: {dataset['fileName']} v{entry['version']}" + f"Finalizing rollback: {dataset['fileName']} v{entry['version']} in {bucket_name} bucket" ) print(f" Description: {entry['description']}") dataset["history"][i] = entry finalize_manifest( manifest_data, - f"ci: Publish {dataset['fileName']} {entry['version']}", + f"ci: Publish {dataset['fileName']} {entry['version']} to {bucket_name}", ) return True # Process only one publication per run diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 3ef4d9b..4d72f85 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -41,3 +41,4 @@ jobs: R2_SECRET_ACCESS_KEY: ${{ secrets.R2_SECRET_ACCESS_KEY }} R2_PRODUCTION_BUCKET: ${{ vars.R2_PRODUCTION_BUCKET }} # Use repo variable R2_STAGING_BUCKET: ${{ vars.R2_STAGING_BUCKET }} # Use repo variable + R2_INTERNAL_BUCKET: ${{ vars.R2_INTERNAL_BUCKET }} # Use repo variable diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5dce116..a04cac0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/astral-sh/uv-pre-commit - rev: 0.8.19 + rev: 0.8.22 hooks: # Dependency management - id: uv-lock @@ -21,6 +21,11 @@ repos: args: ["--maxkb=1024"] - id: debug-statements + - repo: https://github.com/asottile/pyupgrade + rev: v3.20.0 + hooks: + - id: pyupgrade + # Python Linting & Formatting with Ruff - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.13.1 diff --git a/README.md b/README.md index e152965..e51f727 100644 --- a/README.md +++ b/README.md @@ -59,19 +59,41 @@ flowchart TD ## Features +- **Multi-Bucket Support:** Choose between production (public) and internal (team-only) buckets for different data access levels. - **CI/CD-Driven Publishing:** Data publication is transactional and automated via GitHub Actions after a pull request is merged, preventing inconsistent states. - **Enhanced Security:** Production credentials are never stored on developer machines; they are only used by the trusted GitHub Actions runner. - **Interactive TUI:** Run `datamanager` with no arguments for a user-friendly, menu-driven interface. - **Data Lifecycle Management:** A full suite of commands for rollback, deletion, and pruning, all gated by the same secure PR workflow. - **Integrity Verification:** All downloaded files are automatically checked against their SHA256 hash from the manifest. -- **Credential Verification:** A detailed verify command reports read/write/delete permissions for both production and staging buckets. +- **Credential Verification:** A detailed verify command reports read/write/delete permissions for production, staging, and internal buckets. + +## Bucket Types + +The system supports two types of data storage buckets: + +### Production Bucket (Public) + +- **Access Level:** Publicly accessible +- **Use Case:** Data intended for public consumption +- **Default:** Used by default for all operations +- **Permissions:** Requires appropriate public access settings in Cloudflare R2 + +### Internal Bucket (Team-Only) + +- **Access Level:** Team members only +- **Use Case:** Sensitive or internal data not for public consumption +- **Security:** Private access, team credentials required +- **Usage:** Specify `--bucket internal` when using commands ## Prerequisites - Python 3.12+ - Git - `sqlite3` command-line tool -- An active Cloudflare account with **two** R2 buckets (one for production, one for staging). +- An active Cloudflare account with **three** R2 buckets: + - Production bucket (publicly accessible) + - Staging bucket (for temporary uploads) + - Internal bucket (team-only access) - For the data in this repo, contact the OEO team for access to the R2 buckets. ## ⚙️ Setup and Installation @@ -113,6 +135,7 @@ flowchart TD R2_SECRET_ACCESS_KEY="your_r2_secret_key" R2_PRODUCTION_BUCKET="your-production-bucket-name" R2_STAGING_BUCKET="your-staging-bucket-name" + R2_INTERNAL_BUCKET="your-internal-bucket-name" ``` 4. **Verify Configuration:** @@ -143,8 +166,11 @@ git checkout -b feat/update-energy-data Use the `datamanager` tool to stage your changes. The `prepare` command handles both creating new datasets and updating existing ones. ```bash -# This uploads the file to the staging bucket and updates manifest.json locally +# Prepare for production bucket (default) uv run datamanager prepare energy-data.sqlite ./local-files/new-energy.sqlite + +# Prepare for internal bucket +uv run datamanager prepare energy-data.sqlite ./local-files/new-energy.sqlite --bucket internal ``` The tool will guide you through the process. For other maintenance tasks like `rollback` or `delete`, use the corresponding command. @@ -191,7 +217,7 @@ This will launch a menu where you can choose your desired action, including the ### Command-Line Interface (CLI) -You can also use the command-line interface directly for specific tasks or for scripting purposes. +You can also use the command-line interface directly for specific tasks or for scripting purposes. Use the `--bucket` option to specify whether to work with production or internal data. ![CLI](assets/cli.png) @@ -231,11 +257,17 @@ uv run datamanager list-datasets Downloads a dataset from the **production** R2 bucket and verifies its integrity. ```bash -# Pull the latest version +# Pull the latest version from production (default) uv run datamanager pull user-profiles.sqlite -# Pull a specific version +# Pull from internal bucket +uv run datamanager pull user-profiles.sqlite --bucket internal + +# Pull a specific version from production uv run datamanager pull user-profiles.sqlite --version v2 + +# Pull a specific version from internal bucket +uv run datamanager pull user-profiles.sqlite --version v2 --bucket internal ``` ![pull](assets/pull.png) @@ -268,7 +300,7 @@ uv run datamanager prune-versions --keep 5 #### `verify` -Checks R2 credentials and reports granular read/write/delete permissions for both production and staging buckets. +Checks R2 credentials and reports granular read/write/delete permissions for production, staging, and internal buckets. ```bash uv run datamanager verify diff --git a/docs/source/setup.md b/docs/source/setup.md index 58a8339..27d8248 100644 --- a/docs/source/setup.md +++ b/docs/source/setup.md @@ -1,5 +1,11 @@ # Setup and Installation +This tool requires three Cloudflare R2 buckets: + +- **Production bucket**: For publicly accessible datasets +- **Staging bucket**: For temporary uploads during the review process +- **Internal bucket**: For team-only datasets with restricted access + 1. **Clone the Repository:** ```bash @@ -37,6 +43,7 @@ R2_SECRET_ACCESS_KEY="your_r2_secret_key" R2_PRODUCTION_BUCKET="your-production-bucket-name" R2_STAGING_BUCKET="your-staging-bucket-name" + R2_INTERNAL_BUCKET="your-internal-bucket-name" ``` 4. **Verify Configuration:** diff --git a/docs/source/usage.md b/docs/source/usage.md index cbedbfd..1096df1 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -54,7 +54,7 @@ uv run datamanager list-datasets ### `pull` -Downloads a dataset from the **production** R2 bucket and verifies its integrity. +Downloads a dataset from the appropriate R2 bucket (production or internal) and verifies its integrity. The bucket is determined by the dataset's configuration in the manifest. ```bash # Pull the latest version @@ -94,7 +94,7 @@ uv run datamanager prune-versions --keep 5 ### `verify` -Checks R2 credentials and reports granular read/write/delete permissions for both production and staging buckets. +Checks R2 credentials and reports granular read/write/delete permissions for all three buckets (production, staging, and internal). ```bash uv run datamanager verify diff --git a/docs/source/workflow.md b/docs/source/workflow.md index ffc7c6a..0a23049 100644 --- a/docs/source/workflow.md +++ b/docs/source/workflow.md @@ -41,8 +41,8 @@ Go to GitHub and open a pull request from your feature branch to `main`. The dif Once the PR is reviewed, approved, and all status checks pass, merge it. The CI/CD pipeline takes over automatically: -- It copies the data from the staging bucket to the production bucket. +- It copies the data from the staging bucket to the appropriate target bucket (production or internal). - It finalizes the `manifest.json` with the new commit hash and description. - It pushes a final commit back to `main`. -The new data version is now live and available to all users via `datamanager pull`. +The new data version is now live and available via `datamanager pull`. **Note:** Internal datasets are only accessible to team members with appropriate R2 bucket permissions. diff --git a/env.example b/env.example index 718cbc9..e8bf862 100644 --- a/env.example +++ b/env.example @@ -3,3 +3,4 @@ R2_ACCESS_KEY_ID="your_r2_access_key" R2_SECRET_ACCESS_KEY="your_r2_secret_key" R2_PRODUCTION_BUCKET="your-production-bucket-name" R2_STAGING_BUCKET="your-staging-bucket-name" +R2_INTERNAL_BUCKET="your-internal-bucket-name" diff --git a/manifest.json b/manifest.json index 47d689c..75e2eef 100644 --- a/manifest.json +++ b/manifest.json @@ -1,6 +1,7 @@ [ { "fileName": "test_database.sqlite", + "bucket": "production", "latestVersion": "v1", "history": [ { @@ -31,5 +32,22 @@ "description": "updating test_database to get multiple versions" } ] + }, + { + "fileName": "internal_test_database.sqlite", + "bucket": "internal", + "latestVersion": "v1", + "history": [ + { + "version": "v1", + "timestamp": "2025-09-23T21:36:33.540270Z", + "sha256": "6d60f0035a80de92c3f3df433212699e0584a09a7d4943693ae0889d98640641", + "r2_object_key": "internal/v1-6d60f0035a80de92c3f3df433212699e0584a09a7d4943693ae0889d98640641.sqlite", + "staging_key": "staging-uploads/6d60f0035a80de92c3f3df433212699e0584a09a7d4943693ae0889d98640641.sqlite", + "diffFromPrevious": null, + "commit": "pending-merge", + "description": "pending-merge" + } + ] } ] diff --git a/src/datamanager/__main__.py b/src/datamanager/__main__.py index 5f75948..01ffe7d 100644 --- a/src/datamanager/__main__.py +++ b/src/datamanager/__main__.py @@ -67,6 +67,10 @@ def verify(ctx: typer.Context) -> None: title="R2 Bucket Permissions Report", ) + # Update table headers to be more descriptive + table.columns[0].header = "Bucket" + table.columns[1].header = "Status" + overall_success = True for res in results: status_icon = ( @@ -97,11 +101,14 @@ def verify(ctx: typer.Context) -> None: def list_datasets(ctx: typer.Context) -> None: """Lists all datasets tracked in the manifest.""" data = manifest.read_manifest() - table = Table("Dataset Name", "Latest Version", "Last Updated", "SHA256") + table = Table("Dataset Name", "Bucket", "Latest Version", "Last Updated", "SHA256") for item in data: latest = item["history"][0] + bucket_type = item.get("bucket", "production") + bucket_display = "🔒 Internal" if bucket_type == "internal" else "🌐 Production" table.add_row( item["fileName"], + bucket_display, latest["version"], # latest["timestamp"], f"{_rel(latest['timestamp'])} ({latest['timestamp']})", @@ -110,9 +117,13 @@ def list_datasets(ctx: typer.Context) -> None: console.print(table) -def _run_pull_logic(name: str, version: str, output: Optional[Path]) -> None: +def _run_pull_logic( + name: str, version: str, output: Optional[Path], bucket: str = "production" +) -> None: """The core logic for pulling and verifying a dataset.""" - console.print(f"🔎 Locating version '{version}' for dataset '{name}'...") + console.print( + f"🔎 Locating version '{version}' for dataset '{name}' from {bucket} bucket..." + ) version_entry = manifest.get_version_entry(name, version) if not version_entry: @@ -133,7 +144,10 @@ def _run_pull_logic(name: str, version: str, output: Optional[Path]) -> None: ) success = core.pull_and_verify( - version_entry["r2_object_key"], version_entry["sha256"], final_path + version_entry["r2_object_key"], + version_entry["sha256"], + final_path, + bucket, ) if success: @@ -162,9 +176,15 @@ def pull( "-o", help="Output path for the file. Defaults to the dataset name in the current directory.", ), + bucket: str = typer.Option( + "production", + "--bucket", + "-b", + help="Bucket to pull from: 'production' or 'internal'. Defaults to production.", + ), ) -> None: """Pulls a specific version of a dataset from R2 and verifies its integrity.""" - _run_pull_logic(name, version, output) + _run_pull_logic(name, version, output, bucket) def _pull_interactive(ctx: typer.Context) -> None: @@ -176,20 +196,33 @@ def _pull_interactive(ctx: typer.Context) -> None: console.print("[yellow]No datasets found in the manifest to pull.[/]") return - dataset_names = [ds["fileName"] for ds in all_datasets] - selected_name = questionary.select( - "Which dataset would you like to pull?", choices=dataset_names + # Show datasets with bucket information + dataset_choices = [] + for ds in all_datasets: + bucket_type = ds.get("bucket", "production") + bucket_icon = "🔒" if bucket_type == "internal" else "🌐" + bucket_name = "Internal" if bucket_type == "internal" else "Production" + dataset_choices.append(f"{ds['fileName']} ({bucket_icon} {bucket_name})") + + selected_choice = questionary.select( + "Which dataset would you like to pull?", choices=dataset_choices ).ask() - if selected_name is None: + if selected_choice is None: console.print("Pull cancelled.") return + # Extract dataset name from choice + selected_name = selected_choice.split(" (")[0] + dataset = manifest.get_dataset(selected_name) if not dataset or not dataset["history"]: console.print(f"[red]Error: No version history found for {selected_name}.[/]") return + # Determine which bucket this dataset belongs to + dataset_bucket = dataset.get("bucket", "production") + version_choices = [ f"{entry['version']} (commit: {entry['commit']}, {_rel(entry['timestamp'])})" for entry in dataset["history"] @@ -218,14 +251,17 @@ def _pull_interactive(ctx: typer.Context) -> None: name=selected_name, version=version_to_pull, output=Path(output_path_str), + bucket=dataset_bucket, # Use the dataset's actual bucket ) except typer.Exit: pass -def _run_prepare_logic(ctx: typer.Context, name: str, file: Path) -> None: +def _run_prepare_logic( + _ctx: typer.Context, name: str, file: Path, bucket: str = "production" +) -> None: """The core logic for preparing a dataset for release.""" - console.print(f"🚀 Preparing update for [cyan]{name}[/]...") + console.print(f"🚀 Preparing update for [cyan]{name}[/] to {bucket} bucket...") new_hash = core.hash_file(file) dataset = manifest.get_dataset(name) @@ -257,8 +293,10 @@ def _run_prepare_logic(ctx: typer.Context, name: str, file: Path) -> None: diff_git_path: Optional[Path] = None with tempfile.TemporaryDirectory() as tempdir: old_path = Path(tempdir) / "prev.sqlite" - # Download from the PRODUCTION bucket - core.download_from_r2(client, latest_version["r2_object_key"], old_path) + # Download from the appropriate bucket using core helper + core.download_from_r2( + client, latest_version["r2_object_key"], old_path, bucket + ) full_diff, summary = core.generate_sql_diff(old_path, file) @@ -293,8 +331,11 @@ def _run_prepare_logic(ctx: typer.Context, name: str, file: Path) -> None: else: # --- This is for CREATE --- + # Use dataset-based prefix regardless of bucket; bucket is resolved separately + bucket_prefix = Path(Path(name).stem).as_posix() new_dataset_obj = { "fileName": name, + "bucket": bucket, # Track which bucket this dataset belongs to "latestVersion": "v1", "history": [ { @@ -303,7 +344,7 @@ def _run_prepare_logic(ctx: typer.Context, name: str, file: Path) -> None: .isoformat() .replace("+00:00", "Z"), "sha256": new_hash, - "r2_object_key": f"{Path(Path(name).stem)}/v1-{new_hash}.sqlite", + "r2_object_key": f"{bucket_prefix}/v1-{new_hash}.sqlite", "staging_key": staging_key, "diffFromPrevious": None, # Explicitly None for new datasets "commit": "pending-merge", @@ -331,18 +372,62 @@ def prepare( name: str = typer.Argument(..., help="The logical name of the dataset."), file: Path = typer.Argument(..., help="Path to the .sqlite file.", exists=True), no_prompt: bool = COMMON_OPTIONS["no_prompt"], + bucket: str = typer.Option( + "production", + "--bucket", + "-b", + help="Target bucket: 'production' or 'internal'. Defaults to production.", + ), ) -> None: """ Prepares a dataset for release: uploads to staging and updates the manifest. This is the first step in the CI/CD-driven workflow. """ ctx.obj["no_prompt"] = no_prompt or ctx.obj.get("no_prompt") - _run_prepare_logic(ctx, name, file) + _run_prepare_logic(ctx, name, file, bucket) def _prepare_interactive(ctx: typer.Context) -> None: - """Guides the user through preparing a dataset for release.""" - console.print("\n[bold]Interactive Dataset Preparation[/]") + """Guides the user through preparing a dataset for production release.""" + console.print("\n[bold]Interactive Dataset Preparation (Production)[/]") + + selected_file_str = questionary.path( + "Enter the path to the .sqlite file:", + validate=lambda path: Path(path).is_file(), + ).ask() + if selected_file_str is None: + console.print("Preparation cancelled.") + return + + default_name = Path(selected_file_str).name + selected_name = questionary.text( + "Enter the logical name for this dataset:", + default=default_name, + validate=lambda text: len(text) > 0 or "Name cannot be empty.", + ).ask() + if selected_name is None: + console.print("Preparation cancelled.") + return + + console.print( + f"\nYou are about to prepare dataset [cyan]{selected_name}[/] for [bold green]PRODUCTION[/] release from file [green]{selected_file_str}[/]." + ) + proceed = _ask_confirm(ctx, "Do you want to continue?", default=False) + if not proceed: + console.print("Preparation cancelled.") + return + + try: + _run_prepare_logic( + ctx, name=selected_name, file=Path(selected_file_str), bucket="production" + ) + except typer.Exit: + pass + + +def _prepare_internal_interactive(ctx: typer.Context) -> None: + """Guides the user through preparing a dataset for internal release.""" + console.print("\n[bold]Interactive Dataset Preparation (Internal)[/]") selected_file_str = questionary.path( "Enter the path to the .sqlite file:", @@ -363,7 +448,10 @@ def _prepare_interactive(ctx: typer.Context) -> None: return console.print( - f"\nYou are about to prepare dataset [cyan]{selected_name}[/] from file [green]{selected_file_str}[/]." + f"\nYou are about to prepare dataset [cyan]{selected_name}[/] for [bold blue]INTERNAL[/] release from file [green]{selected_file_str}[/]." + ) + console.print( + "[bold yellow]Note:[/] Internal datasets are only accessible to team members." ) proceed = _ask_confirm(ctx, "Do you want to continue?", default=False) if not proceed: @@ -371,7 +459,9 @@ def _prepare_interactive(ctx: typer.Context) -> None: return try: - _run_prepare_logic(ctx, name=selected_name, file=Path(selected_file_str)) + _run_prepare_logic( + ctx, name=selected_name, file=Path(selected_file_str), bucket="internal" + ) except typer.Exit: pass @@ -684,7 +774,8 @@ def main(ctx: typer.Context, no_prompt: bool = COMMON_OPTIONS["no_prompt"]) -> N actions: dict[str, Callable[[typer.Context], None] | str] = { "List all datasets": list_datasets, - "Prepare a dataset for release": _prepare_interactive, + "Prepare a dataset for production release": _prepare_interactive, + "Prepare a dataset for internal release": _prepare_internal_interactive, "Pull a dataset version": _pull_interactive, "Rollback a dataset to a previous version": _rollback_interactive, "Delete a dataset": _delete_interactive, diff --git a/src/datamanager/config.py b/src/datamanager/config.py index da1a1e3..44123d0 100644 --- a/src/datamanager/config.py +++ b/src/datamanager/config.py @@ -28,6 +28,7 @@ class Settings: secret_key: str = _need("R2_SECRET_ACCESS_KEY") bucket: str = _need("R2_PRODUCTION_BUCKET") staging_bucket: str = _need("R2_STAGING_BUCKET") + internal_bucket: str = _need("R2_INTERNAL_BUCKET") manifest_file: str = "manifest.json" max_diff_lines: int = 500 diff --git a/src/datamanager/core.py b/src/datamanager/core.py index 37defb3..e2f8715 100644 --- a/src/datamanager/core.py +++ b/src/datamanager/core.py @@ -49,6 +49,38 @@ def get_r2_client() -> S3Client: ) +def resolve_bucket_alias(bucket_alias: str) -> str: + """ + Resolves a bucket alias to the actual bucket name. + + Args: + bucket_alias: The bucket alias ("production" or "internal") + + Returns: + The actual bucket name from settings + """ + return _resolve_bucket(bucket_alias) + + +def _resolve_bucket(bucket: str | None) -> str: + """ + Resolves bucket parameter to actual bucket name. + + Args: + bucket: Bucket name, alias, or None + + Returns: + The actual bucket name from settings + """ + if bucket is None or bucket == "production": + return settings.bucket + elif bucket == "internal": + return settings.internal_bucket + else: + # Return raw bucket name if it's not an alias + return bucket + + def hash_file(file_path: Path) -> str: """Calculates and returns the SHA-256 hash of a file.""" h = hashlib.sha256() @@ -58,8 +90,11 @@ def hash_file(file_path: Path) -> str: return h.hexdigest() -def upload_to_r2(client: S3Client, file_path: Path, object_key: str) -> None: +def upload_to_r2( + client: S3Client, file_path: Path, object_key: str, bucket: str | None = None +) -> None: """Uploads a file to R2 with a progress bar.""" + target_bucket = _resolve_bucket(bucket) file_size = file_path.stat().st_size with Progress() as progress: task = progress.add_task( @@ -67,7 +102,7 @@ def upload_to_r2(client: S3Client, file_path: Path, object_key: str) -> None: ) client.upload_file( str(file_path), - settings.bucket, + target_bucket, object_key, Callback=lambda bytes_transferred: progress.update( task, advance=bytes_transferred @@ -75,10 +110,13 @@ def upload_to_r2(client: S3Client, file_path: Path, object_key: str) -> None: ) -def download_from_r2(client: S3Client, object_key: str, download_path: Path) -> None: +def download_from_r2( + client: S3Client, object_key: str, download_path: Path, bucket: str | None = None +) -> None: """Downloads a file from R2 with a progress bar.""" + target_bucket = _resolve_bucket(bucket) try: - file_size = client.head_object(Bucket=settings.bucket, Key=object_key)[ + file_size = client.head_object(Bucket=target_bucket, Key=object_key)[ "ContentLength" ] with Progress() as progress: @@ -86,7 +124,7 @@ def download_from_r2(client: S3Client, object_key: str, download_path: Path) -> f"[cyan]Downloading {download_path.name}...", total=file_size ) client.download_file( - settings.bucket, + target_bucket, object_key, str(download_path), Callback=lambda bytes_transferred: progress.update( @@ -99,7 +137,9 @@ def download_from_r2(client: S3Client, object_key: str, download_path: Path) -> raise -def pull_and_verify(object_key: str, expected_hash: str, output_path: Path) -> bool: +def pull_and_verify( + object_key: str, expected_hash: str, output_path: Path, bucket: str | None = None +) -> bool: """ Downloads a file from R2, verifies its hash, and cleans up on failure. @@ -108,9 +148,10 @@ def pull_and_verify(object_key: str, expected_hash: str, output_path: Path) -> b """ client = get_r2_client() try: - download_from_r2(client, object_key, output_path) - except Exception: - return False # Error message is printed inside download_from_r2 + download_from_r2(client, object_key, output_path, bucket) + except (ClientError, FileNotFoundError, OSError, PermissionError) as e: + console.print(f"[bold red]Error downloading file: {e}[/]") + return False console.print("Verifying file integrity...") downloaded_hash = hash_file(output_path) @@ -192,11 +233,14 @@ def _dump(db: Path) -> str: return full_diff, summary -def delete_from_r2(client: S3Client, object_key: str) -> None: +def delete_from_r2( + client: S3Client, object_key: str, bucket: str | None = None +) -> None: """Deletes an object from the R2 bucket.""" + target_bucket = _resolve_bucket(bucket) console.print(f"Attempting to delete [yellow]{object_key}[/] from R2...") try: - client.delete_object(Bucket=settings.bucket, Key=object_key) + client.delete_object(Bucket=target_bucket, Key=object_key) console.print("✅ Rollback deletion successful.") except Exception as e: # This is a best-effort cleanup. We notify the user if it fails. @@ -270,7 +314,7 @@ def _check_bucket_permissions(client: Any, bucket_name: str) -> VerificationResu def verify_r2_access() -> list[VerificationResult]: """ - Verifies granular permissions for both production and staging buckets. + Verifies granular permissions for production, staging, and internal buckets. Returns: A list of result dictionaries, one for each bucket check. @@ -282,6 +326,8 @@ def verify_r2_access() -> list[VerificationResult]: results.append(_check_bucket_permissions(client, settings.bucket)) # Check Staging Bucket results.append(_check_bucket_permissions(client, settings.staging_bucket)) + # Check Internal Bucket + results.append(_check_bucket_permissions(client, settings.internal_bucket)) except Exception as e: # Catches errors during client creation (e.g., bad endpoint) connection_error: VerificationResult = { @@ -309,3 +355,8 @@ def upload_to_staging(client: S3Client, file_path: Path, object_key: str) -> Non task, advance=bytes_transferred ), ) + + +def upload_to_internal(client: S3Client, file_path: Path, object_key: str) -> None: + """Uploads a file to the INTERNAL R2 bucket with a progress bar.""" + upload_to_r2(client, file_path, object_key, bucket="internal") diff --git a/src/datamanager/manifest.py b/src/datamanager/manifest.py index eb1813a..0936dd7 100644 --- a/src/datamanager/manifest.py +++ b/src/datamanager/manifest.py @@ -21,6 +21,8 @@ "get_version_entry", "add_new_dataset", "update_dataset", + "get_dataset_bucket", + "update_dataset_bucket", ] # Initialize console for any feedback @@ -199,6 +201,39 @@ def add_new_dataset(dataset_object: dict[str, Any]) -> None: write_manifest(data) +def get_dataset_bucket(name: str) -> str: + """ + Gets the bucket type for a dataset. + + Args: + name: The 'fileName' of the dataset. + + Returns: + The bucket type ('production' or 'internal'), defaults to 'production'. + """ + dataset = get_dataset(name) + if not dataset: + return "production" # Default for existing datasets + bucket = dataset.get("bucket", "production") + return bucket if bucket is not None else "production" + + +def update_dataset_bucket(name: str, bucket: str) -> None: + """ + Updates the bucket type for a dataset. + + Args: + name: The 'fileName' of the dataset. + bucket: The bucket type ('production' or 'internal'). + """ + data = read_manifest() + for item in data: + if item.get("fileName") == name: + item["bucket"] = bucket + break + write_manifest(data) + + def update_dataset(name: str, updated_dataset: dict[str, Any]) -> None: """ Finds a dataset by name and replaces the entire object. diff --git a/tests/conftest.py b/tests/conftest.py index 39c2137..a990a31 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,6 +43,7 @@ def test_repo(tmp_path: Path) -> Generator[Path, Any, None]: manifest_data = [ { "fileName": "core-dataset.sqlite", + "bucket": "production", "latestVersion": "v2", "history": [ { diff --git a/tests/test_core.py b/tests/test_core.py index 5186fdc..85f0b1d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,17 +1,41 @@ # tests/test_core.py import sqlite3 from pathlib import Path -from typing import Any from unittest.mock import MagicMock from pytest_mock import MockerFixture import pytest from botocore.exceptions import ClientError +from typing import Dict, Any + from datamanager import core from datamanager.config import settings +from typing import TypedDict, cast + + +class _ErrorBody(TypedDict): + Code: str + Message: str + + +class _ClientErrResp(TypedDict): + Error: _ErrorBody + + +def make_client_error(code: str, message: str, operation_name: str) -> ClientError: + """ + Build a ClientError with a response shape that satisfies mypy + without relying on private botocore types. + """ + resp: _ClientErrResp = {"Error": {"Code": code, "Message": message}} + # ClientError expects a dict with "Error" key but the stubs require + # a specific TypedDict. We cast once here to keep call sites clean. + return ClientError(cast(Any, resp), operation_name) + + def test_hash_file(tmp_path: Path) -> None: """Test SHA256 hash calculation.""" test_file = tmp_path / "test.txt" @@ -88,12 +112,20 @@ def test_verify_r2_access_full_permissions(mocker: MockerFixture) -> None: results = core.verify_r2_access() - # Should return results for both production and staging buckets - assert len(results) == 2 + # Should return results for production, staging, and internal buckets + assert len(results) == 3 prod_result = results[0] + staging_result = results[1] + internal_result = results[2] assert prod_result["exists"] is True + assert staging_result["exists"] is True + assert internal_result["exists"] is True assert all(prod_result["permissions"].values()) + assert all(staging_result["permissions"].values()) + assert all(internal_result["permissions"].values()) assert "Full access" in prod_result["message"] + assert "Full access" in staging_result["message"] + assert "Full access" in internal_result["message"] def test_verify_r2_access_read_only(mocker: MockerFixture) -> None: @@ -102,8 +134,10 @@ def test_verify_r2_access_read_only(mocker: MockerFixture) -> None: mock_client.head_bucket.return_value = True mock_client.list_objects_v2.return_value = True # Simulate write/delete failing with a generic ClientError - error_response: Any = {"Error": {"Code": "403", "Message": "Access Denied"}} - mock_client.put_object.side_effect = ClientError(error_response, "PutObject") + + mock_client.put_object.side_effect = make_client_error( + "403", "Access Denied", "PutObject" + ) results = core.verify_r2_access() prod_result = results[0] @@ -118,8 +152,9 @@ def test_verify_r2_access_read_only(mocker: MockerFixture) -> None: def test_verify_r2_access_bucket_not_found(mocker: MockerFixture) -> None: """Test verification when a bucket does not exist.""" mock_client = mocker.patch("datamanager.core.get_r2_client").return_value - error_response: Any = {"Error": {"Code": "404", "Message": "Not Found"}} - mock_client.head_bucket.side_effect = ClientError(error_response, "HeadBucket") + mock_client.head_bucket.side_effect = make_client_error( + "404", "Not Found", "HeadBucket" + ) results = core.verify_r2_access() prod_result = results[0] @@ -159,8 +194,9 @@ def test_download_from_r2_failure(mocker: MockerFixture, tmp_path: Path) -> None mocker.patch("datamanager.core.get_r2_client", return_value=mock_client) # Simulate boto3 raising an error during download - error_response: Any = {"Error": {"Code": "404", "Message": "Not Found"}} - mock_client.head_object.side_effect = ClientError(error_response, "HeadObject") + mock_client.head_object.side_effect = make_client_error( + "404", "Not Found", "HeadObject" + ) output_file = tmp_path / "test.sqlite" @@ -168,3 +204,243 @@ def test_download_from_r2_failure(mocker: MockerFixture, tmp_path: Path) -> None # which our CLI logic is designed to catch. with pytest.raises(ClientError): core.download_from_r2(mock_client, "non-existent-key", output_file) + + +def test_resolve_bucket_alias_production(mocker: MockerFixture) -> None: + """Test that resolve_bucket_alias correctly resolves 'production' alias.""" + # Mock the _resolve_bucket function to avoid actual settings dependency + mocker.patch( + "datamanager.core._resolve_bucket", return_value="test-production-bucket" + ) + + result = core.resolve_bucket_alias("production") + assert result == "test-production-bucket" + + +def test_resolve_bucket_alias_internal(mocker: MockerFixture) -> None: + """Test that resolve_bucket_alias correctly resolves 'internal' alias.""" + mocker.patch( + "datamanager.core._resolve_bucket", return_value="test-internal-bucket" + ) + + result = core.resolve_bucket_alias("internal") + assert result == "test-internal-bucket" + + +def test_resolve_bucket_none(mocker: MockerFixture) -> None: + """Test _resolve_bucket with None bucket parameter.""" + # Mock the entire settings object since it's a frozen dataclass + mock_settings = mocker.MagicMock() + mock_settings.bucket = "test-production-bucket" + mock_settings.internal_bucket = "test-internal-bucket" + mocker.patch("datamanager.core.settings", mock_settings) + + result = core._resolve_bucket(None) + assert result == "test-production-bucket" + + +def test_resolve_bucket_production(mocker: MockerFixture) -> None: + """Test _resolve_bucket with 'production' bucket parameter.""" + mock_settings = mocker.MagicMock() + mock_settings.bucket = "test-production-bucket" + mock_settings.internal_bucket = "test-internal-bucket" + mocker.patch("datamanager.core.settings", mock_settings) + + result = core._resolve_bucket("production") + assert result == "test-production-bucket" + + +def test_resolve_bucket_internal(mocker: MockerFixture) -> None: + """Test _resolve_bucket with 'internal' bucket parameter.""" + mock_settings = mocker.MagicMock() + mock_settings.bucket = "test-production-bucket" + mock_settings.internal_bucket = "test-internal-bucket" + mocker.patch("datamanager.core.settings", mock_settings) + + result = core._resolve_bucket("internal") + assert result == "test-internal-bucket" + + +def test_resolve_bucket_custom(mocker: MockerFixture) -> None: + """Test _resolve_bucket with custom bucket name.""" + result = core._resolve_bucket("custom-bucket-name") + assert result == "custom-bucket-name" + + +def test_upload_to_staging(mocker: MockerFixture, tmp_path: Path) -> None: + """Test upload_to_staging function.""" + mock_client = mocker.MagicMock() + test_file = tmp_path / "test.sqlite" + test_file.touch() + + # Mock the entire settings object since it's a frozen dataclass + mock_settings = mocker.MagicMock() + mock_settings.staging_bucket = "test-staging-bucket" + mocker.patch("datamanager.core.settings", mock_settings) + + core.upload_to_staging(mock_client, test_file, "test-key") + + # Verify upload_file was called with correct staging bucket + mock_client.upload_file.assert_called_once_with( + str(test_file), + "test-staging-bucket", + "test-key", + Callback=mocker.ANY, # We don't need to test the exact callback + ) + + +def test_upload_to_internal(mocker: MockerFixture, tmp_path: Path) -> None: + """Test upload_to_internal function.""" + mock_client = mocker.MagicMock() + test_file = tmp_path / "test.sqlite" + test_file.touch() + + # Mock the upload_to_r2 function + mock_upload_r2 = mocker.patch("datamanager.core.upload_to_r2") + + core.upload_to_internal(mock_client, test_file, "test-key") + + # Verify upload_to_r2 was called with internal bucket + mock_upload_r2.assert_called_once_with( + mock_client, test_file, "test-key", bucket="internal" + ) + + +def test_pull_and_verify_download_error(mocker: MockerFixture, tmp_path: Path) -> None: + """Test pull_and_verify when download fails.""" + mock_client = mocker.MagicMock() + mocker.patch("datamanager.core.get_r2_client", return_value=mock_client) + mock_download = mocker.patch("datamanager.core.download_from_r2") + mock_remove = mocker.patch("os.remove") + + # Simulate download failure - note: os.remove should NOT be called + # when download fails, only when download succeeds but hash verification fails + error_response: Any = {"Error": {"Code": "404", "Message": "Not Found"}} + mock_download.side_effect = ClientError( + error_response, + "Download", + ) + + output_file = tmp_path / "test.sqlite" + output_file.touch() + + result = core.pull_and_verify("test-key", "expected-hash", output_file) + + assert result is False + mock_download.assert_called_once() + # os.remove should NOT be called when download fails + mock_remove.assert_not_called() + + +def test_pull_and_verify_success(mocker: MockerFixture, tmp_path: Path) -> None: + """Test pull_and_verify when everything succeeds.""" + mock_client = mocker.MagicMock() + mocker.patch("datamanager.core.get_r2_client", return_value=mock_client) + mock_download = mocker.patch("datamanager.core.download_from_r2") + mock_remove = mocker.patch("os.remove") + + output_file = tmp_path / "test.sqlite" + output_file.touch() + + # Mock hash_file to return correct hash + mocker.patch("datamanager.core.hash_file", return_value="expected-hash") + + result = core.pull_and_verify("test-key", "expected-hash", output_file) + + assert result is True + mock_download.assert_called_once() + mock_remove.assert_not_called() + + +def test_delete_from_r2_exception(mocker: MockerFixture) -> None: + """Test delete_from_r2 when delete operation fails.""" + mock_client = mocker.MagicMock() + mock_client.delete_object.side_effect = Exception("Delete failed") + + # Should not raise exception, just print warning + core.delete_from_r2(mock_client, "test-key") + + +def test_verify_r2_access_connection_error(mocker: MockerFixture) -> None: + """Test verify_r2_access when client creation fails.""" + mocker.patch( + "datamanager.core.get_r2_client", side_effect=Exception("Connection failed") + ) + + results = core.verify_r2_access() + + assert len(results) == 1 + assert results[0]["bucket_name"] == "Connection" + assert results[0]["exists"] is False + assert "Failed to create R2 client" in results[0]["message"] + + +def test_check_bucket_permissions_access_denied(mocker: MockerFixture) -> None: + """Test _check_bucket_permissions when bucket access is denied.""" + mock_client = mocker.MagicMock() + error_response: Dict[str, Any] = { + "Error": {"Code": "403", "Message": "Access Denied"} + } + mock_client.head_bucket.side_effect = ClientError(error_response, "HeadBucket") # type: ignore + + result = core._check_bucket_permissions(mock_client, "test-bucket") + + assert result["bucket_name"] == "test-bucket" + assert result["exists"] is False + assert result["permissions"] == {"read": False, "write": False, "delete": False} + assert "Access Denied" in result["message"] + + +def test_check_bucket_permissions_connection_error(mocker: MockerFixture) -> None: + """Test _check_bucket_permissions when there's a connection error.""" + mock_client = mocker.MagicMock() + error_response: Dict[str, Any] = { + "Error": {"Code": "500", "Message": "Internal Server Error"} + } + mock_client.head_bucket.side_effect = ClientError(error_response, "HeadBucket") # type: ignore + + result = core._check_bucket_permissions(mock_client, "test-bucket") + + assert result["bucket_name"] == "test-bucket" + assert result["exists"] is False + assert "Connection error" in result["message"] + + +def test_check_bucket_permissions_read_only(mocker: MockerFixture) -> None: + """Test _check_bucket_permissions when only read permissions are available.""" + mock_client = mocker.MagicMock() + mock_client.head_bucket.return_value = True + mock_client.list_objects_v2.return_value = True + # Simulate write/delete failing + error_response: Dict[str, Any] = { + "Error": {"Code": "403", "Message": "Access Denied"} + } + mock_client.put_object.side_effect = ClientError(error_response, "PutObject") # type: ignore + + result = core._check_bucket_permissions(mock_client, "test-bucket") + + assert result["bucket_name"] == "test-bucket" + assert result["exists"] is True + assert result["permissions"]["read"] is True + assert result["permissions"]["write"] is False + assert result["permissions"]["delete"] is False + assert "Partial access: [read]" in result["message"] + + +def test_check_bucket_permissions_no_permissions(mocker: MockerFixture) -> None: + """Test _check_bucket_permissions when no permissions are available.""" + mock_client = mocker.MagicMock() + mock_client.head_bucket.return_value = True + # Simulate all operations failing + error_response: Any = {"Error": {"Code": "403", "Message": "Access Denied"}} + mock_client.list_objects_v2.side_effect = ClientError( + error_response, "ListObjectsV2" + ) + mock_client.put_object.side_effect = ClientError(error_response, "PutObject") + + result = core._check_bucket_permissions(mock_client, "test-bucket") + + assert result["bucket_name"] == "test-bucket" + assert result["exists"] is True + assert result["permissions"] == {"read": False, "write": False, "delete": False} + assert "No object permissions" in result["message"] diff --git a/tests/test_main.py b/tests/test_main.py index 6e45a51..b5f4591 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -30,6 +30,7 @@ def test_prepare_for_create_success(test_repo: Path, mocker: MockerFixture) -> N # Verify the manifest entry was created correctly with no diff dataset = manifest.get_dataset("new-dataset.sqlite") assert dataset is not None + assert dataset["bucket"] == "production" # Should default to production assert dataset["history"][0]["diffFromPrevious"] is None assert dataset["history"][0]["description"] == "pending-merge" @@ -229,7 +230,7 @@ def test_prepare_interactive_success(test_repo: Path, mocker: MockerFixture) -> main_app._prepare_interactive(mock_ctx) mock_run_logic.assert_called_once_with( - mock_ctx, name="new-dataset.sqlite", file=new_file + mock_ctx, name="new-dataset.sqlite", file=new_file, bucket="production" ) @@ -295,6 +296,7 @@ def test_pull_interactive_success(test_repo: Path, mocker: MockerFixture) -> Non name="core-dataset.sqlite", version="v1", output=Path("./pulled-file.sqlite"), + bucket="production", ) @@ -469,3 +471,133 @@ def test_prune_versions_no_op(test_repo: Path, mocker: MockerFixture) -> None: assert result.exit_code == 0, result.stdout assert "No action needed" in result.stdout + + +def test_prepare_internal_bucket(test_repo: Path, mocker: MockerFixture) -> None: + """Test the 'prepare' command for internal bucket.""" + os.chdir(test_repo) + new_file = test_repo / "internal_dataset.sqlite" + new_file.touch() + + mock_r2_client = mocker.patch("datamanager.core.get_r2_client").return_value + mock_r2_client.head_object.return_value = {"ContentLength": 1024} + mock_upload = mocker.patch("datamanager.core.upload_to_staging") + + result = runner.invoke( + app, + ["prepare", "internal-dataset.sqlite", str(new_file), "--bucket", "internal"], + ) + + assert result.exit_code == 0, result.stdout + assert "Preparation complete!" in result.stdout + mock_upload.assert_called_once() + + # Verify the manifest entry was created correctly for internal bucket + dataset = manifest.get_dataset("internal-dataset.sqlite") + assert dataset is not None + assert dataset["bucket"] == "internal" + assert dataset["history"][0]["diffFromPrevious"] is None + assert dataset["history"][0]["description"] == "pending-merge" + + +def test_pull_internal_bucket(test_repo: Path, mocker: MockerFixture) -> None: + """Test the 'pull' command for internal bucket.""" + os.chdir(test_repo) + mock_pull = mocker.patch("datamanager.core.pull_and_verify", return_value=True) + + result = runner.invoke(app, ["pull", "core-dataset.sqlite", "--bucket", "internal"]) + + assert result.exit_code == 0 + assert "✅ Success!" in result.stdout + mock_pull.assert_called_once() + call_args = mock_pull.call_args[0] + # Should use internal bucket + assert call_args[3] == "internal" # bucket parameter + + +def test_bucket_prefix_uses_dataset_stem_consistently( + test_repo: Path, mocker: MockerFixture +) -> None: + """Test that bucket_prefix logic uses dataset stem for both internal and production buckets.""" + os.chdir(test_repo) + new_file = test_repo / "test-dataset.sqlite" + new_file.touch() + + mock_r2_client = mocker.patch("datamanager.core.get_r2_client").return_value + mock_r2_client.head_object.return_value = {"ContentLength": 1024} + mock_upload = mocker.patch("datamanager.core.upload_to_staging") + mocker.patch("datamanager.core.download_from_r2") + + # Test production bucket (default) by calling the core logic directly + mock_ctx = mocker.MagicMock() + mock_ctx.obj = {} + main_app._run_prepare_logic(mock_ctx, "test-dataset.sqlite", new_file, "production") + + dataset_prod = manifest.get_dataset("test-dataset.sqlite") + assert dataset_prod is not None + prod_r2_key = dataset_prod["history"][0]["r2_object_key"] + # Should use dataset stem as prefix, not "production" + assert prod_r2_key.startswith("test-dataset/"), ( + f"Expected dataset stem prefix, got: {prod_r2_key}" + ) + + # Reset mocks for internal bucket test + mock_upload.reset_mock() + + # Test internal bucket by calling the core logic directly + main_app._run_prepare_logic(mock_ctx, "test-dataset.sqlite", new_file, "internal") + + dataset_internal = manifest.get_dataset("test-dataset.sqlite") + assert dataset_internal is not None + internal_r2_key = dataset_internal["history"][0]["r2_object_key"] + # Should use same dataset stem as prefix, not "internal" + assert internal_r2_key.startswith("test-dataset/"), ( + f"Expected dataset stem prefix, got: {internal_r2_key}" + ) + + # Both should use the same prefix structure (dataset stem) + assert prod_r2_key.split("/")[0] == internal_r2_key.split("/")[0] == "test-dataset" + + +def test_update_logic_works_with_dataset_stem_prefix( + test_repo: Path, mocker: MockerFixture +) -> None: + """Test that update logic correctly derives r2_dir from existing r2_object_key.""" + os.chdir(test_repo) + + # Mock R2 client and operations + mock_r2_client = mocker.patch("datamanager.core.get_r2_client").return_value + mock_r2_client.head_object.return_value = {"ContentLength": 1024} + mocker.patch("datamanager.core.upload_to_staging") + mocker.patch("datamanager.core.download_from_r2") + + # Mock diff generation + fake_diff = "--- old\n+++ new\n-old line\n+new line\n" + fake_summary = "# summary: 1 add, 1 del\n" + mocker.patch( + "datamanager.core.generate_sql_diff", return_value=(fake_diff, fake_summary) + ) + + # Create a new version file + v2_file = test_repo / "update-test.sqlite" + v2_file.write_text("updated content") + + # Prepare an update (this will create v3) + mock_ctx = mocker.MagicMock() + mock_ctx.obj = {} + main_app._run_prepare_logic(mock_ctx, "core-dataset.sqlite", v2_file, "production") + + # Verify the update was created correctly + dataset = manifest.get_dataset("core-dataset.sqlite") + assert dataset is not None + + # Should have 3 versions now (v1, v2, v3) + assert len(dataset["history"]) == 3 + latest_entry = dataset["history"][0] + + # The new version should derive its directory from the previous r2_object_key + # Previous key was "core-dataset/v2-.sqlite", so new should be "core-dataset/v3-.sqlite" + assert latest_entry["r2_object_key"].startswith("core-dataset/v3-"), ( + f"Update should derive from existing key: {latest_entry['r2_object_key']}" + ) + assert latest_entry["version"] == "v3" diff --git a/tests/test_manifest.py b/tests/test_manifest.py index bdcb03a..9f41d5a 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -33,6 +33,7 @@ def test_get_dataset(test_repo: Path) -> None: dataset = manifest.get_dataset("core-dataset.sqlite") assert dataset is not None assert dataset["latestVersion"] == "v2" + assert dataset["bucket"] == "production" # Should have bucket field non_existent = manifest.get_dataset("non-existent.sqlite") assert non_existent is None @@ -48,3 +49,83 @@ def test_update_latest_history_entry(test_repo: Path) -> None: assert data[0]["history"][0]["version"] == "v2" assert data[0]["history"][0]["commit"] == "abcdef" assert data[0]["latestVersion"] == "v2" + assert data[0]["bucket"] == "production" # Should preserve bucket field + + +def test_get_dataset_bucket(test_repo: Path) -> None: + """Test getting the bucket type for a dataset.""" + os.chdir(test_repo) + bucket = manifest.get_dataset_bucket("core-dataset.sqlite") + assert bucket == "production" + + # Test non-existent dataset + bucket = manifest.get_dataset_bucket("non-existent.sqlite") + assert bucket == "production" # Should default to production + + +def test_update_dataset_bucket(test_repo: Path) -> None: + """Test updating the bucket type for a dataset.""" + os.chdir(test_repo) + manifest.update_dataset_bucket("core-dataset.sqlite", "internal") + + data = manifest.read_manifest() + assert data[0]["bucket"] == "internal" + + # Test non-existent dataset (should not crash) + manifest.update_dataset_bucket("non-existent.sqlite", "production") + + +def test_get_dataset_bucket_default(test_repo: Path) -> None: + """Test getting bucket type for a dataset that doesn't have bucket field.""" + os.chdir(test_repo) + + # Create a dataset without bucket field (simulate old manifest) + data = manifest.read_manifest() + if data: + # Remove bucket field to simulate old dataset + del data[0]["bucket"] + manifest.write_manifest(data) + + bucket = manifest.get_dataset_bucket("core-dataset.sqlite") + assert bucket == "production" # Should default to production + + +def test_get_dataset_bucket_internal(test_repo: Path) -> None: + """Test getting bucket type for internal dataset.""" + os.chdir(test_repo) + + # Set dataset to internal bucket + manifest.update_dataset_bucket("core-dataset.sqlite", "internal") + + bucket = manifest.get_dataset_bucket("core-dataset.sqlite") + assert bucket == "internal" + + +def test_get_dataset_bucket_production(test_repo: Path) -> None: + """Test getting bucket type for production dataset.""" + os.chdir(test_repo) + + # Set dataset to production bucket + manifest.update_dataset_bucket("core-dataset.sqlite", "production") + + bucket = manifest.get_dataset_bucket("core-dataset.sqlite") + assert bucket == "production" + + +def test_get_dataset_bucket_nonexistent() -> None: + """Test getting bucket type for non-existent dataset.""" + bucket = manifest.get_dataset_bucket("non-existent.sqlite") + assert bucket == "production" # Should default to production + + +def test_update_dataset_bucket_invalid_bucket(test_repo: Path) -> None: + """Test updating dataset bucket with invalid bucket type.""" + os.chdir(test_repo) + + # Should not crash with invalid bucket type + manifest.update_dataset_bucket("core-dataset.sqlite", "invalid-bucket") + + data = manifest.read_manifest() + assert ( + data[0]["bucket"] == "invalid-bucket" + ) # Should still update even with invalid value