diff --git a/gitops/common/app.py b/gitops/common/app.py index a19c18f..014e469 100644 --- a/gitops/common/app.py +++ b/gitops/common/app.py @@ -107,7 +107,7 @@ def image(self) -> str: """[610829907584.dkr.ecr.ap-southeast-2.amazonaws.com/uptick:yoink-9f03ac80f3]""" image = self.values.get("image", "") if isinstance(image, dict): - return f"{image['repository']}:{image.get('tag','latest')}" + return f"{image['repository']}:{image.get('tag', 'latest')}" else: return image diff --git a/gitops/core.py b/gitops/core.py index fd3a3a5..6bbcd4a 100644 --- a/gitops/core.py +++ b/gitops/core.py @@ -107,7 +107,7 @@ def bump( # noqa: C901 print(f"Redeploying {colourise(app_name, Fore.LIGHTGREEN_EX)}") update_app(app_name, **{"bump": str(uuid.uuid4())}) else: - print(f"Skipping {colourise(app_name, Fore.LIGHTGREEN_EX)}: already on" f" {colour_image(new_image_tag)}") + print(f"Skipping {colourise(app_name, Fore.LIGHTGREEN_EX)}: already on {colour_image(new_image_tag)}") if redeploy: commit_message = f"Redeploying {filter}" else: diff --git a/gitops/utils/async_runner.py b/gitops/utils/async_runner.py index d4eeb53..cf93434 100644 --- a/gitops/utils/async_runner.py +++ b/gitops/utils/async_runner.py @@ -62,7 +62,7 @@ async def run_tasks_async_with_progress(tasks, max_concurrency=10): async def print_async_complete(task, x, y, status_y_offset, win_info, sem): cor, name = task addstr(win_info, x, y, name) - output = f'{"-"*20}\n{progress(name)}\n{"-"*20}\n' + output = f"{'-' * 20}\n{progress(name)}\n{'-' * 20}\n" try: await sem.acquire() output += await cor diff --git a/gitops/utils/cli.py b/gitops/utils/cli.py index 2dfd8bc..250ae6b 100644 --- a/gitops/utils/cli.py +++ b/gitops/utils/cli.py @@ -38,7 +38,7 @@ def confirm(message: str = "") -> bool: def confirm_dangerous_command() -> None: message = ( "You are about to execute a dangerous command against a" - f" {colourise('production' , Fore.RED)} environment. Please ensure you are pairing with" + f" {colourise('production', Fore.RED)} environment. Please ensure you are pairing with" " someone else." ) # TODO. Include an actual multi person MFA to proceed. diff --git a/gitops/utils/kube.py b/gitops/utils/kube.py index d8e2811..a3d7d36 100644 --- a/gitops/utils/kube.py +++ b/gitops/utils/kube.py @@ -290,7 +290,7 @@ async def _find_pod() -> str: async def wait_for_pod(context: str, namespace: str, pod: str) -> str | None: while True: - cmd = "kubectl get pod" f" --context {context}" f" -n {namespace}" ' -o jsonpath="{.status.phase}"' f" {pod}" + cmd = f'kubectl get pod --context {context} -n {namespace} -o jsonpath="{{.status.phase}}" {pod}' stdout, _, _ = await async_run(cmd) output = stdout.decode().lower() if output != "pending": diff --git a/gitops_server/workers/deployer/deploy.py b/gitops_server/workers/deployer/deploy.py index 7221d1d..3121fe5 100644 --- a/gitops_server/workers/deployer/deploy.py +++ b/gitops_server/workers/deployer/deploy.py @@ -14,6 +14,7 @@ from gitops_server.types import AppDefinitions, RunOutput, UpdateAppResult from gitops_server.utils import get_repo_name_from_url, github, run, slack from gitops_server.utils.git import temp_repo +from gitops_server.workers.deployer.semaphore_manager import AppSemaphoreManager from .hooks import handle_failed_deploy, handle_successful_deploy @@ -24,9 +25,13 @@ logger = logging.getLogger("gitops") MAX_HELM_HISTORY = 3 +# Max parallel helm installs at a time +# Kube api may rate limit otherwise +helm_parallel_semaphore = asyncio.Semaphore(int(settings.GITOPS_MAX_PARALLEL_DEPLOYS)) + @tracer.start_as_current_span("post_result_summary") -async def post_result_summary(source: str, results: list[UpdateAppResult]): +async def post_result_summary(source: str, results: list[UpdateAppResult]) -> None: n_success = sum([r["exit_code"] == 0 for r in results]) n_failed = sum([r["exit_code"] != 0 for r in results]) await slack.post( @@ -52,6 +57,7 @@ def __init__( commit_message: str, current_app_definitions: AppDefinitions, previous_app_definitions: AppDefinitions, + semaphore_manager: AppSemaphoreManager, skip_migrations: bool = False, ): self.author_name = author_name @@ -61,9 +67,6 @@ def __init__( self.previous_app_definitions = previous_app_definitions self.deploy_id = str(uuid.uuid4()) self.skip_migrations = skip_migrations - # Max parallel helm installs at a time - # Kube api may rate limit otherwise - self.semaphore = asyncio.Semaphore(int(settings.GITOPS_MAX_PARALLEL_DEPLOYS)) # Track which ones we've deployed and which ones failed self.successful_apps: set[str] = set() @@ -71,9 +74,10 @@ def __init__( self.post_ts: str | None = None self.skip_deploy = "--skip-deploy" in commit_message + self.semaphore_manager = semaphore_manager @classmethod - async def from_push_event(cls, push_event): + async def from_push_event(cls, push_event: dict[str, Any], semaphore_manager: AppSemaphoreManager) -> "Deployer": url = push_event["repository"]["clone_url"] author_name = push_event.get("head_commit", {}).get("author", {}).get("name") author_email = push_event.get("head_commit", {}).get("author", {}).get("email") @@ -91,10 +95,11 @@ async def from_push_event(cls, push_event): commit_message, current_app_definitions, previous_app_definitions, + semaphore_manager, skip_migrations, ) - async def deploy(self): + async def deploy(self) -> None: if self.skip_deploy: logger.info("Skipping deploy due to `--skip-deploy` flag.") return @@ -122,20 +127,22 @@ async def deploy(self): uninstall_results = await asyncio.gather( *[self.uninstall_app(self.previous_app_definitions.apps[app_name]) for app_name in self.removed_apps] ) - await post_result_summary(self.current_app_definitions.name, update_results + uninstall_results) + without_nones = [r for r in update_results + uninstall_results if r is not None] + await post_result_summary(self.current_app_definitions.name, without_nones) async def uninstall_app(self, app: App) -> UpdateAppResult: with tracer.start_as_current_span("uninstall_app", attributes={"app": app.name}): - async with self.semaphore: - logger.info(f"Uninstalling app {app.name!r}.") - result = await run(f"helm uninstall {app.name} -n {app.namespace}", suppress_errors=True) - if result: - update_result = UpdateAppResult(app_name=app.name, slack_message="", **result) - await self.post_result( - app=app, - result=update_result, - deployer=self, - ) + async with self.semaphore_manager.app_semaphore(app.name): + async with helm_parallel_semaphore: + logger.info(f"Uninstalling app {app.name!r}.") + result = await run(f"helm uninstall {app.name} -n {app.namespace}", suppress_errors=True) + if result: + update_result = UpdateAppResult(app_name=app.name, slack_message="", **result) + await self.post_result( + app=app, + result=update_result, + deployer=self, + ) return update_result async def rollback_deployment(self, app: App) -> None: @@ -150,61 +157,33 @@ async def rollback_deployment(self, app: App) -> None: ) async def update_app_deployment(self, app: App) -> UpdateAppResult | None: + async with self.semaphore_manager.app_semaphore(app.name): + async with helm_parallel_semaphore: + return await self._update_app_deployment(app) + + async def _update_app_deployment(self, app: App) -> UpdateAppResult | None: app.set_value("deployment.labels.gitops/deploy_id", self.deploy_id) app.set_value("deployment.labels.gitops/status", github.STATUSES.in_progress) if github_deployment_url := app.values.get("github/deployment_url"): app.set_value("deployment.annotations.github/deployment_url", github_deployment_url) - async with self.semaphore: - with tracer.start_as_current_span("update_app_deployment", attributes={"app": app.name}) as span: - logger.info(f"Deploying app {app.name!r}.") - from_timestamp = time.time() - if app.chart.type == "git": - span.set_attribute("gitops.chart.type", "git") - assert app.chart.git_repo_url - async with temp_repo(app.chart.git_repo_url, ref=app.chart.git_sha) as chart_folder_path: - with tracer.start_as_current_span("helm_dependency_build"): - await run(f"cd {chart_folder_path}; helm dependency build") - - with tempfile.NamedTemporaryFile(suffix=".yml") as cfg: - cfg.write(json.dumps(app.values).encode()) - cfg.flush() - os.fsync(cfg.fileno()) - - with tracer.start_as_current_span("helm_upgrade"): - - async def upgrade_helm_git() -> RunOutput: - result = await run( - "helm secrets upgrade --create-namespace" - f" --history-max {MAX_HELM_HISTORY}" - " --install" - " --timeout=600s" - f"{' --set skip_migrations=true' if self.skip_migrations else ''}" - f" -f {cfg.name}" - f" --namespace={app.namespace}" - f" {app.name}" - f" {chart_folder_path}", - suppress_errors=True, - ) - return result + with tracer.start_as_current_span("update_app_deployment", attributes={"app": app.name}) as span: + logger.info(f"Deploying app {app.name!r}.") + from_timestamp = time.time() + if app.chart.type == "git": + span.set_attribute("gitops.chart.type", "git") + assert app.chart.git_repo_url + async with temp_repo(app.chart.git_repo_url, ref=app.chart.git_sha) as chart_folder_path: + with tracer.start_as_current_span("helm_dependency_build"): + await run(f"cd {chart_folder_path}; helm dependency build") - result = await upgrade_helm_git() - if result["exit_code"] != 0 and "is in progress" in result["output"]: - await self.rollback_deployment(app) - result = await upgrade_helm_git() - - elif app.chart.type == "helm": - span.set_attribute("gitops.chart.type", "helm") with tempfile.NamedTemporaryFile(suffix=".yml") as cfg: cfg.write(json.dumps(app.values).encode()) cfg.flush() os.fsync(cfg.fileno()) - chart_version_arguments = f" --version={app.chart.version}" if app.chart.version else "" - with tracer.start_as_current_span("helm_repo_add"): - await run(f"helm repo add {app.chart.helm_repo} {app.chart.helm_repo_url}") with tracer.start_as_current_span("helm_upgrade"): - async def upgrade_helm_chart() -> RunOutput: + async def upgrade_helm_git() -> RunOutput: result = await run( "helm secrets upgrade --create-namespace" f" --history-max {MAX_HELM_HISTORY}" @@ -214,22 +193,54 @@ async def upgrade_helm_chart() -> RunOutput: f" -f {cfg.name}" f" --namespace={app.namespace}" f" {app.name}" - f" {app.chart.helm_chart} {chart_version_arguments}", + f" {chart_folder_path}", suppress_errors=True, ) return result - result = await upgrade_helm_chart() + result = await upgrade_helm_git() if result["exit_code"] != 0 and "is in progress" in result["output"]: await self.rollback_deployment(app) - result = await upgrade_helm_chart() - else: - logger.warning("Local is not implemented yet") - return None + result = await upgrade_helm_git() + + elif app.chart.type == "helm": + span.set_attribute("gitops.chart.type", "helm") + with tempfile.NamedTemporaryFile(suffix=".yml") as cfg: + cfg.write(json.dumps(app.values).encode()) + cfg.flush() + os.fsync(cfg.fileno()) + chart_version_arguments = f" --version={app.chart.version}" if app.chart.version else "" + with tracer.start_as_current_span("helm_repo_add"): + await run(f"helm repo add {app.chart.helm_repo} {app.chart.helm_repo_url}") + + with tracer.start_as_current_span("helm_upgrade"): + + async def upgrade_helm_chart() -> RunOutput: + result = await run( + "helm secrets upgrade --create-namespace" + f" --history-max {MAX_HELM_HISTORY}" + " --install" + " --timeout=600s" + f"{' --set skip_migrations=true' if self.skip_migrations else ''}" + f" -f {cfg.name}" + f" --namespace={app.namespace}" + f" {app.name}" + f" {app.chart.helm_chart} {chart_version_arguments}", + suppress_errors=True, + ) + return result + + result = await upgrade_helm_chart() + if result["exit_code"] != 0 and "is in progress" in result["output"]: + await self.rollback_deployment(app) + result = await upgrade_helm_chart() + else: + logger.warning("Local is not implemented yet") + return None - update_result = UpdateAppResult(app_name=app.name, slack_message="", **result) + update_result = UpdateAppResult(app_name=app.name, slack_message="", **result) - await self.post_result(app=app, result=update_result, deployer=self, from_timestamp=from_timestamp) + await self.post_result(app=app, result=update_result, deployer=self, from_timestamp=from_timestamp) return update_result def calculate_app_deltas(self) -> tuple[set[str], set[str], set[str]]: diff --git a/gitops_server/workers/deployer/hooks.py b/gitops_server/workers/deployer/hooks.py index 5cdff9c..7f9a921 100644 --- a/gitops_server/workers/deployer/hooks.py +++ b/gitops_server/workers/deployer/hooks.py @@ -103,7 +103,7 @@ async def handle_failed_deploy(app: App, result: UpdateAppResult, deployer, **kw await find_commiter_slack_user(name=deployer.author_name, email=deployer.author_email) or DEFAULT_USER_GROUP ) slack_user_msg = f" {slack_user} " if slack_user else "" - log_msg = f"<{get_dashboard_url(workspace_name=app.name, from_timestamp=kwargs.get("from_timestamp"), to_timestamp=time.time())}|(Deployment Logs)>" + log_msg = f"<{get_dashboard_url(workspace_name=app.name, from_timestamp=kwargs.get('from_timestamp'), to_timestamp=time.time())}|(Deployment Logs)>" result["slack_message"] = ( f"Failed to deploy app `{result['app_name']}` for cluster" f" `{settings.CLUSTER_NAME}` :rotating_light:" diff --git a/gitops_server/workers/deployer/semaphore_manager.py b/gitops_server/workers/deployer/semaphore_manager.py new file mode 100644 index 0000000..bba66b2 --- /dev/null +++ b/gitops_server/workers/deployer/semaphore_manager.py @@ -0,0 +1,52 @@ +import asyncio +import logging +from contextlib import asynccontextmanager +from collections.abc import AsyncGenerator + +logger = logging.getLogger("gitops.semaphore") + + +class AppSemaphoreManager: + """Manages per-app semaphores to ensure sequential updates per app. + + This ensures that while multiple deployments can run concurrently, + any given app is only updated by one deployment at a time. + """ + + _instance: "AppSemaphoreManager | None" = None + + def __init__(self) -> None: + self._app_semaphores: dict[str, asyncio.Semaphore] = {} + self._semaphore_lock = asyncio.Lock() + self._initialized: bool = True + + async def get_app_semaphore(self, app_name: str) -> asyncio.Semaphore: + """Get the semaphore for an app. + + If the semaphore does not exist, create it. + """ + async with self._semaphore_lock: + if app_name not in self._app_semaphores: + logger.debug(f"Creating new semaphore for app: {app_name}") + self._app_semaphores[app_name] = asyncio.Semaphore(1) + semaphore = self._app_semaphores[app_name] + return semaphore + + @asynccontextmanager + async def app_semaphore(self, app_name: str) -> AsyncGenerator[None, None]: + """Async context manager to acquire and release a semaphore for the given app. + + Each app gets exactly one semaphore with a limit of 1, + ensuring sequential updates per app. + """ + semaphore = await self.get_app_semaphore(app_name) + async with semaphore: + yield + + def get_locked_apps(self) -> set[str]: + """Return a set of apps that currently have active locks.""" + locked_apps = set() + for app_name, semaphore in self._app_semaphores.items(): + if semaphore.locked(): + locked_apps.add(app_name) + return locked_apps diff --git a/gitops_server/workers/deployer/worker.py b/gitops_server/workers/deployer/worker.py index cfc03a8..e8224e7 100644 --- a/gitops_server/workers/deployer/worker.py +++ b/gitops_server/workers/deployer/worker.py @@ -1,9 +1,11 @@ import asyncio import logging +from typing import Any from opentelemetry import trace from .deploy import Deployer +from .semaphore_manager import AppSemaphoreManager logger = logging.getLogger("gitops_worker") @@ -11,25 +13,28 @@ class DeployQueueWorker: - """Simple synchronous background work queue. + """Concurrent deployment worker with per-app serialization. - Deployments need to be carried out one at a time to ensure the cluster - doesn't get confused. The worker is based entirely on asyncio and runs - alongside the server for maximum efficiency. + Multiple deployments can run simultaneously, but apps are updated + sequentially using per-app semaphores. This ensures cluster consistency + while allowing parallel deployments of different apps. """ - _worker = None + # Worker singleton + _worker: "DeployQueueWorker | None" = None @classmethod - def get_worker(cls): + def get_worker(cls) -> "DeployQueueWorker": if not cls._worker: cls._worker = cls() return cls._worker - def __init__(self): - self.queue = asyncio.Queue() + def __init__(self) -> None: + self.queue: asyncio.Queue[Any] = asyncio.Queue() + self.semaphore_manager = AppSemaphoreManager() + self.active_deployments: set[asyncio.Task[None]] = set() - async def enqueue(self, work): + async def enqueue(self, work: Any) -> None: """Enqueue an item of work for future processing. The `work` argument is the body of an incoming GitHub push webhook. @@ -37,31 +42,57 @@ async def enqueue(self, work): logger.info(f"Enqueued work, {self.queue.qsize() + 1} items in the queue.") await self.queue.put(work) - async def run(self): + async def run(self) -> None: """Run the worker. - Enters into a loop that waits for work to be queued. Each task is - awaited here to ensure synchronous operation. + Manages concurrent deployments while maintaining per-app serialization. + Each deployment runs as a separate task, allowing multiple deployments + to proceed simultaneously. # TODO: Need to gracefully handle termination. """ - logger.info("Starting up deployer worker loop") + logger.info("Starting up concurrent deployer worker loop") while True: try: - await self.process_work() + # Clean up completed tasks + self.active_deployments = {task for task in self.active_deployments if not task.done()} + + # Process new work + work = await self.queue.get() + deployment_task = asyncio.create_task(self.process_deployment(work)) + self.active_deployments.add(deployment_task) + + # Log current status + locked_apps = self.semaphore_manager.get_locked_apps() + logger.info( + f"Active deployments: {len(self.active_deployments)}, " + f"Locked apps: {sorted(locked_apps)}" + ) + except Exception as e: logger.error(str(e), exc_info=True) - async def process_work(self): - work = await self.queue.get() + async def process_deployment(self, work: Any) -> None: + """Process a single deployment in a separate task. + + This allows multiple deployments to run concurrently while + maintaining per-app serialization through semaphores. + """ ref = work.get("ref") - logger.info(f'Have a push to "{ref}".') + logger.info(f'Processing deployment for push to "{ref}".') + if ref == "refs/heads/master": with tracer.start_as_current_span("gitops_process_webhook") as current_span: - deployer = await Deployer.from_push_event(work) - current_span.set_attribute("gitops.ref", ref) - current_span.set_attribute("gitops.after", work.get("after")) - current_span.set_attribute("gitops.before", work.get("before")) - current_span.set_attribute("gitops.author_email", deployer.author_email) - current_span.set_attribute("gitops.author_name", deployer.author_name) - current_span.set_attribute("gitops.commit_message", deployer.commit_message) - await deployer.deploy() + try: + deployer = await Deployer.from_push_event(work, self.semaphore_manager) + current_span.set_attribute("gitops.ref", ref) + current_span.set_attribute("gitops.after", work.get("after")) + current_span.set_attribute("gitops.before", work.get("before")) + current_span.set_attribute("gitops.author_email", deployer.author_email) + current_span.set_attribute("gitops.author_name", deployer.author_name) + current_span.set_attribute("gitops.commit_message", deployer.commit_message) + await deployer.deploy() + + except Exception as e: + logger.error(f"Deployment failed: {e}", exc_info=True) + raise + else: + logger.info(f'Ignoring push to "{ref}" (not master branch).') diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 51e51f5..6ada365 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -6,6 +6,7 @@ from gitops.common.app import App from gitops_server.types import AppDefinitions from gitops_server.workers.deployer import Deployer +from gitops_server.workers.deployer.semaphore_manager import AppSemaphoreManager from .sample_data import SAMPLE_GITHUB_PAYLOAD, SAMPLE_GITHUB_PAYLOAD_SKIP_DEPLOY, SAMPLE_GITHUB_PAYLOAD_SKIP_MIGRATIONS from .utils import create_test_yaml, mock_load_app_definitions @@ -17,6 +18,10 @@ @pytest.mark.asyncio class TestDeploy: + def setup_method(self): + """Reset singleton instance between tests.""" + AppSemaphoreManager._instance = None + @patch("gitops_server.workers.deployer.deploy.run") @patch("gitops_server.utils.slack.post") @patch("gitops_server.workers.deployer.deploy.load_app_definitions", mock_load_app_definitions) @@ -25,7 +30,8 @@ async def test_deployer_git(self, temp_repo_mock, post_mock, run_mock): """Fake a deploy to two servers, bumping fg from 2 to 4.""" run_mock.return_value = {"exit_code": 0, "output": ""} temp_repo_mock.return_value.__aenter__.return_value = "mock-repo" - deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD) + semaphore_manager = AppSemaphoreManager() + deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD, semaphore_manager) await deployer.deploy() assert run_mock.call_count == 4 assert run_mock.call_args_list[0][0][0] == "cd mock-repo; helm dependency build" @@ -63,7 +69,8 @@ async def test_deployer_update_helm_app(self, temp_repo_mock, post_mock, run_moc }, ) - deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD) + semaphore_manager = AppSemaphoreManager() + deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD, semaphore_manager) await deployer.update_app_deployment(helm_app) assert run_mock.call_count == 2 @@ -85,7 +92,8 @@ async def test_deployer_skip_migrations_in_commit_message_should_run_helm_withou """Fake a deploy to two servers, bumping fg from 2 to 4.""" run_mock.return_value = {"exit_code": 0, "output": ""} temp_repo_mock.return_value.__aenter__.return_value = "mock-repo" - deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD_SKIP_MIGRATIONS) + semaphore_manager = AppSemaphoreManager() + deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD_SKIP_MIGRATIONS, semaphore_manager) await deployer.deploy() assert run_mock.call_count == 4 assert run_mock.call_args_list[0][0][0] == "cd mock-repo; helm dependency build" @@ -110,7 +118,8 @@ async def test_skip_deploy_in_commit_message_should_not_run_deployment(self, tem """Fake a deploy to two servers, bumping fg from 2 to 4.""" run_mock.return_value = {"exit_code": 0, "output": ""} temp_repo_mock.return_value.__aenter__.return_value = "mock-repo" - deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD_SKIP_DEPLOY) + semaphore_manager = AppSemaphoreManager() + deployer = await Deployer.from_push_event(SAMPLE_GITHUB_PAYLOAD_SKIP_DEPLOY, semaphore_manager) assert deployer.skip_deploy is True await deployer.deploy() diff --git a/tests/test_deploy_concurrent.py b/tests/test_deploy_concurrent.py new file mode 100644 index 0000000..fe480c7 --- /dev/null +++ b/tests/test_deploy_concurrent.py @@ -0,0 +1,255 @@ +import asyncio +from unittest.mock import Mock, patch + +import pytest + +from gitops_server.workers.deployer.semaphore_manager import AppSemaphoreManager +from gitops_server.workers.deployer.worker import DeployQueueWorker + + +@pytest.mark.asyncio +class TestSemaphoreManager: + """Test the AppSemaphoreManager for concurrent deployment control.""" + + def setup_method(self) -> None: + """Reset singleton instance between tests.""" + AppSemaphoreManager._instance = None + + async def test_semaphore_creation_per_app(self) -> None: + """Test that each app gets its own semaphore with limit 1.""" + manager = AppSemaphoreManager() + + # Get semaphores for different apps + semaphore_a = await manager.get_app_semaphore("app-a") + semaphore_b = await manager.get_app_semaphore("app-b") + semaphore_a2 = await manager.get_app_semaphore("app-a") + + # Same app should get same semaphore + assert semaphore_a is semaphore_a2 + # Different apps should get different semaphores + assert semaphore_a is not semaphore_b + # Each semaphore should have limit of 1 + assert semaphore_a._value == 1 + assert semaphore_b._value == 1 + + async def test_per_app_serialization(self) -> None: + """Test that same apps are deployed sequentially across deployments.""" + manager = AppSemaphoreManager() + + # Track the order of operations + operations = [] + + async def simulate_deployment(app_name: str, deployment_id: str) -> None: + operations.append(f"start-{deployment_id}-{app_name}") + async with manager.app_semaphore(app_name): + operations.append(f"acquired-{deployment_id}-{app_name}") + + # Simulate some deployment work + await asyncio.sleep(0.05) + operations.append(f"work-{deployment_id}-{app_name}") + + operations.append(f"released-{deployment_id}-{app_name}") + + # Two deployments both trying to deploy app-b + await asyncio.gather(simulate_deployment("app-b", "deploy1"), simulate_deployment("app-b", "deploy2")) + + # Verify operations happened in serialized order + assert len(operations) == 8 # 4 operations × 2 deployments + + # Find when each deployment acquired and released the lock + deploy1_acquired = next(i for i, op in enumerate(operations) if op == "acquired-deploy1-app-b") + deploy1_released = next(i for i, op in enumerate(operations) if op == "released-deploy1-app-b") + deploy2_acquired = next(i for i, op in enumerate(operations) if op == "acquired-deploy2-app-b") + deploy2_released = next(i for i, op in enumerate(operations) if op == "released-deploy2-app-b") + + # One deployment should completely finish before the other acquires the lock + assert (deploy1_released < deploy2_acquired) or (deploy2_released < deploy1_acquired) + + async def test_parallel_different_apps(self) -> None: + """Test that different apps can be deployed in parallel.""" + manager = AppSemaphoreManager() + + operations = [] + + async def simulate_deployment(app_name: str) -> None: + operations.append(f"start-{app_name}") + async with manager.app_semaphore(app_name): + operations.append(f"acquired-{app_name}") + await asyncio.sleep(0.05) + operations.append(f"work-{app_name}") + + operations.append(f"released-{app_name}") + + # Deploy different apps concurrently + await asyncio.gather(simulate_deployment("app-a"), simulate_deployment("app-b"), simulate_deployment("app-c")) + + # All apps should have been processed + assert len(operations) == 12 # 4 operations × 3 apps + + # Verify all apps got their own semaphore + assert len(manager._app_semaphores) == 3 + assert "app-a" in manager._app_semaphores + assert "app-b" in manager._app_semaphores + assert "app-c" in manager._app_semaphores + + async def test_lock_release_on_exception(self) -> None: + """Test that locks are properly released when exceptions occur.""" + manager = AppSemaphoreManager() + + async def failing_deployment() -> None: + async with manager.app_semaphore("app-x"): + # Simulate deployment failure + raise ValueError("Deployment failed") + + # Should raise the exception but not leave locks hanging + with pytest.raises(ValueError): + await failing_deployment() + + # Verify lock was released + locked_apps = manager.get_locked_apps() + assert "app-x" not in locked_apps + + async def test_get_locked_apps(self) -> None: + """Test that locked apps are correctly identified.""" + manager = AppSemaphoreManager() + + # Initially no apps are locked + assert len(manager.get_locked_apps()) == 0 + + # Acquire locks for some apps + async with manager.app_semaphore("app-a"): + async with manager.app_semaphore("app-b"): + locked_apps = manager.get_locked_apps() + assert len(locked_apps) == 2 + assert "app-a" in locked_apps + assert "app-b" in locked_apps + + # Release one lock + locked_apps = manager.get_locked_apps() + assert len(locked_apps) == 1 + assert "app-b" not in locked_apps + assert "app-a" in locked_apps + + +@pytest.mark.asyncio +class TestDeployQueueWorker: + """Test the concurrent deployment worker functionality.""" + + def setup_method(self) -> None: + """Reset singleton instance between tests.""" + DeployQueueWorker._worker = None + AppSemaphoreManager._instance = None + + async def test_worker_singleton(self) -> None: + """Test that worker maintains singleton pattern.""" + worker1 = DeployQueueWorker.get_worker() + worker2 = DeployQueueWorker.get_worker() + + assert worker1 is worker2 + assert DeployQueueWorker._worker is worker1 + + async def test_worker_initialization(self) -> None: + """Test that worker is properly initialized.""" + worker = DeployQueueWorker.get_worker() + + # Worker should have required components + assert hasattr(worker, "queue") + assert hasattr(worker, "semaphore_manager") + assert hasattr(worker, "active_deployments") + + # Components should be properly initialized + assert isinstance(worker.queue, asyncio.Queue) + assert isinstance(worker.semaphore_manager, AppSemaphoreManager) + assert isinstance(worker.active_deployments, set) + assert len(worker.active_deployments) == 0 + + async def test_enqueue_and_process_work(self) -> None: + """Test that work can be enqueued and processed.""" + worker = DeployQueueWorker.get_worker() + + # Mock webhook data + test_work = {"ref": "refs/heads/master", "test": "data"} + + # Enqueue work + await worker.enqueue(test_work) + assert worker.queue.qsize() == 1 + + # Dequeue work + work = await worker.queue.get() + assert work == test_work + assert worker.queue.qsize() == 0 + + @patch("gitops_server.workers.deployer.worker.Deployer.from_push_event") + async def test_process_deployment_task_creation(self, mock_from_push_event: Mock) -> None: + """Test that deployment processing creates proper async tasks.""" + worker = DeployQueueWorker.get_worker() + + # Mock deployer creation and deployment + mock_deployer = Mock() + mock_deployer.deploy = Mock(return_value=asyncio.sleep(0)) # Return awaitable + mock_from_push_event.return_value = mock_deployer + + test_work = { + "ref": "refs/heads/master", + "repository": {"clone_url": "https://github.com/test/repo.git"}, + "head_commit": {"message": "test", "author": {"name": "Test", "email": "test@example.com"}}, + } + + # Process deployment + await worker.process_deployment(test_work) + + # Verify deployer was created and deployed + mock_from_push_event.assert_called_once_with(test_work, worker.semaphore_manager) + + +@pytest.mark.asyncio +class TestConcurrentDeploymentIntegration: + """Integration tests combining semaphore manager and worker.""" + + def setup_method(self) -> None: + """Reset singleton instances between tests.""" + DeployQueueWorker._worker = None + AppSemaphoreManager._instance = None + + async def test_concurrent_deployment_coordination(self) -> None: + """Test end-to-end coordination of concurrent deployments.""" + worker = DeployQueueWorker.get_worker() + manager = worker.semaphore_manager + + # Track deployment coordination + coordination_log = [] + + # Mock a deployment that uses app locks + async def mock_deployment(app_names: list[str], deployment_id: str) -> None: + for app_name in app_names: + coordination_log.append(f"start-{deployment_id}-{app_name}") + async with manager.app_semaphore(app_name): + coordination_log.append(f"acquired-{deployment_id}-{app_name}") + # Simulate deployment work + await asyncio.sleep(0.01) + coordination_log.append(f"deployed-{deployment_id}-{app_name}") + + coordination_log.append(f"released-{deployment_id}-{app_name}") + + # Simulate multiple concurrent deployments with overlapping apps + await asyncio.gather( + mock_deployment(["app-a", "app-b"], "deploy-1"), + mock_deployment(["app-b", "app-c"], "deploy-2"), + mock_deployment(["app-a", "app-c"], "deploy-3"), + ) + + # Verify all deployments completed + assert len(coordination_log) > 0 + + # Count deployments per app + app_a_deployments = len([log for log in coordination_log if "deployed" in log and "app-a" in log]) + app_b_deployments = len([log for log in coordination_log if "deployed" in log and "app-b" in log]) + app_c_deployments = len([log for log in coordination_log if "deployed" in log and "app-c" in log]) + + # Each app should be deployed exactly twice (appears in 2 out of 3 deployments) + assert app_a_deployments == 2 # deploy-1, deploy-3 + assert app_b_deployments == 2 # deploy-1, deploy-2 + assert app_c_deployments == 2 # deploy-2, deploy-3 + + # No locks should remain + assert len(manager.get_locked_apps()) == 0