Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gitops/common/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def image(self) -> str:
"""[610829907584.dkr.ecr.ap-southeast-2.amazonaws.com/uptick:yoink-9f03ac80f3]"""
image = self.values.get("image", "")
if isinstance(image, dict):
return f"{image['repository']}:{image.get('tag','latest')}"
return f"{image['repository']}:{image.get('tag', 'latest')}"
else:
return image

Expand Down
2 changes: 1 addition & 1 deletion gitops/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def bump( # noqa: C901
print(f"Redeploying {colourise(app_name, Fore.LIGHTGREEN_EX)}")
update_app(app_name, **{"bump": str(uuid.uuid4())})
else:
print(f"Skipping {colourise(app_name, Fore.LIGHTGREEN_EX)}: already on" f" {colour_image(new_image_tag)}")
print(f"Skipping {colourise(app_name, Fore.LIGHTGREEN_EX)}: already on {colour_image(new_image_tag)}")
if redeploy:
commit_message = f"Redeploying {filter}"
else:
Expand Down
2 changes: 1 addition & 1 deletion gitops/utils/async_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def run_tasks_async_with_progress(tasks, max_concurrency=10):
async def print_async_complete(task, x, y, status_y_offset, win_info, sem):
cor, name = task
addstr(win_info, x, y, name)
output = f'{"-"*20}\n{progress(name)}\n{"-"*20}\n'
output = f"{'-' * 20}\n{progress(name)}\n{'-' * 20}\n"
try:
await sem.acquire()
output += await cor
Expand Down
2 changes: 1 addition & 1 deletion gitops/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def confirm(message: str = "") -> bool:
def confirm_dangerous_command() -> None:
message = (
"You are about to execute a dangerous command against a"
f" {colourise('production' , Fore.RED)} environment. Please ensure you are pairing with"
f" {colourise('production', Fore.RED)} environment. Please ensure you are pairing with"
" someone else."
)
# TODO. Include an actual multi person MFA to proceed.
Expand Down
2 changes: 1 addition & 1 deletion gitops/utils/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ async def _find_pod() -> str:

async def wait_for_pod(context: str, namespace: str, pod: str) -> str | None:
while True:
cmd = "kubectl get pod" f" --context {context}" f" -n {namespace}" ' -o jsonpath="{.status.phase}"' f" {pod}"
cmd = f'kubectl get pod --context {context} -n {namespace} -o jsonpath="{{.status.phase}}" {pod}'
stdout, _, _ = await async_run(cmd)
output = stdout.decode().lower()
if output != "pending":
Expand Down
147 changes: 79 additions & 68 deletions gitops_server/workers/deployer/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from gitops_server.types import AppDefinitions, RunOutput, UpdateAppResult
from gitops_server.utils import get_repo_name_from_url, github, run, slack
from gitops_server.utils.git import temp_repo
from gitops_server.workers.deployer.semaphore_manager import AppSemaphoreManager

from .hooks import handle_failed_deploy, handle_successful_deploy

Expand All @@ -24,9 +25,13 @@
logger = logging.getLogger("gitops")
MAX_HELM_HISTORY = 3

# Max parallel helm installs at a time
# Kube api may rate limit otherwise
helm_parallel_semaphore = asyncio.Semaphore(int(settings.GITOPS_MAX_PARALLEL_DEPLOYS))


@tracer.start_as_current_span("post_result_summary")
async def post_result_summary(source: str, results: list[UpdateAppResult]):
async def post_result_summary(source: str, results: list[UpdateAppResult]) -> None:
n_success = sum([r["exit_code"] == 0 for r in results])
n_failed = sum([r["exit_code"] != 0 for r in results])
await slack.post(
Expand All @@ -52,6 +57,7 @@ def __init__(
commit_message: str,
current_app_definitions: AppDefinitions,
previous_app_definitions: AppDefinitions,
semaphore_manager: AppSemaphoreManager,
skip_migrations: bool = False,
):
self.author_name = author_name
Expand All @@ -61,19 +67,17 @@ def __init__(
self.previous_app_definitions = previous_app_definitions
self.deploy_id = str(uuid.uuid4())
self.skip_migrations = skip_migrations
# Max parallel helm installs at a time
# Kube api may rate limit otherwise
self.semaphore = asyncio.Semaphore(int(settings.GITOPS_MAX_PARALLEL_DEPLOYS))

# Track which ones we've deployed and which ones failed
self.successful_apps: set[str] = set()
self.failed_apps: set[str] = set()

self.post_ts: str | None = None
self.skip_deploy = "--skip-deploy" in commit_message
self.semaphore_manager = semaphore_manager

@classmethod
async def from_push_event(cls, push_event):
async def from_push_event(cls, push_event: dict[str, Any], semaphore_manager: AppSemaphoreManager) -> "Deployer":
url = push_event["repository"]["clone_url"]
author_name = push_event.get("head_commit", {}).get("author", {}).get("name")
author_email = push_event.get("head_commit", {}).get("author", {}).get("email")
Expand All @@ -91,10 +95,11 @@ async def from_push_event(cls, push_event):
commit_message,
current_app_definitions,
previous_app_definitions,
semaphore_manager,
skip_migrations,
)

async def deploy(self):
async def deploy(self) -> None:
if self.skip_deploy:
logger.info("Skipping deploy due to `--skip-deploy` flag.")
return
Expand Down Expand Up @@ -122,20 +127,22 @@ async def deploy(self):
uninstall_results = await asyncio.gather(
*[self.uninstall_app(self.previous_app_definitions.apps[app_name]) for app_name in self.removed_apps]
)
await post_result_summary(self.current_app_definitions.name, update_results + uninstall_results)
without_nones = [r for r in update_results + uninstall_results if r is not None]
await post_result_summary(self.current_app_definitions.name, without_nones)

async def uninstall_app(self, app: App) -> UpdateAppResult:
with tracer.start_as_current_span("uninstall_app", attributes={"app": app.name}):
async with self.semaphore:
logger.info(f"Uninstalling app {app.name!r}.")
result = await run(f"helm uninstall {app.name} -n {app.namespace}", suppress_errors=True)
if result:
update_result = UpdateAppResult(app_name=app.name, slack_message="", **result)
await self.post_result(
app=app,
result=update_result,
deployer=self,
)
async with self.semaphore_manager.app_semaphore(app.name):
async with helm_parallel_semaphore:
logger.info(f"Uninstalling app {app.name!r}.")
result = await run(f"helm uninstall {app.name} -n {app.namespace}", suppress_errors=True)
if result:
update_result = UpdateAppResult(app_name=app.name, slack_message="", **result)
await self.post_result(
app=app,
result=update_result,
deployer=self,
)
return update_result

async def rollback_deployment(self, app: App) -> None:
Expand All @@ -150,61 +157,33 @@ async def rollback_deployment(self, app: App) -> None:
)

async def update_app_deployment(self, app: App) -> UpdateAppResult | None:
async with self.semaphore_manager.app_semaphore(app.name):
async with helm_parallel_semaphore:
return await self._update_app_deployment(app)

async def _update_app_deployment(self, app: App) -> UpdateAppResult | None:
app.set_value("deployment.labels.gitops/deploy_id", self.deploy_id)
app.set_value("deployment.labels.gitops/status", github.STATUSES.in_progress)
if github_deployment_url := app.values.get("github/deployment_url"):
app.set_value("deployment.annotations.github/deployment_url", github_deployment_url)
async with self.semaphore:
with tracer.start_as_current_span("update_app_deployment", attributes={"app": app.name}) as span:
logger.info(f"Deploying app {app.name!r}.")
from_timestamp = time.time()
if app.chart.type == "git":
span.set_attribute("gitops.chart.type", "git")
assert app.chart.git_repo_url
async with temp_repo(app.chart.git_repo_url, ref=app.chart.git_sha) as chart_folder_path:
with tracer.start_as_current_span("helm_dependency_build"):
await run(f"cd {chart_folder_path}; helm dependency build")

with tempfile.NamedTemporaryFile(suffix=".yml") as cfg:
cfg.write(json.dumps(app.values).encode())
cfg.flush()
os.fsync(cfg.fileno())

with tracer.start_as_current_span("helm_upgrade"):

async def upgrade_helm_git() -> RunOutput:
result = await run(
"helm secrets upgrade --create-namespace"
f" --history-max {MAX_HELM_HISTORY}"
" --install"
" --timeout=600s"
f"{' --set skip_migrations=true' if self.skip_migrations else ''}"
f" -f {cfg.name}"
f" --namespace={app.namespace}"
f" {app.name}"
f" {chart_folder_path}",
suppress_errors=True,
)
return result
with tracer.start_as_current_span("update_app_deployment", attributes={"app": app.name}) as span:
logger.info(f"Deploying app {app.name!r}.")
from_timestamp = time.time()
if app.chart.type == "git":
span.set_attribute("gitops.chart.type", "git")
assert app.chart.git_repo_url
async with temp_repo(app.chart.git_repo_url, ref=app.chart.git_sha) as chart_folder_path:
with tracer.start_as_current_span("helm_dependency_build"):
await run(f"cd {chart_folder_path}; helm dependency build")

result = await upgrade_helm_git()
if result["exit_code"] != 0 and "is in progress" in result["output"]:
await self.rollback_deployment(app)
result = await upgrade_helm_git()

elif app.chart.type == "helm":
span.set_attribute("gitops.chart.type", "helm")
with tempfile.NamedTemporaryFile(suffix=".yml") as cfg:
cfg.write(json.dumps(app.values).encode())
cfg.flush()
os.fsync(cfg.fileno())
chart_version_arguments = f" --version={app.chart.version}" if app.chart.version else ""
with tracer.start_as_current_span("helm_repo_add"):
await run(f"helm repo add {app.chart.helm_repo} {app.chart.helm_repo_url}")

with tracer.start_as_current_span("helm_upgrade"):

async def upgrade_helm_chart() -> RunOutput:
async def upgrade_helm_git() -> RunOutput:
result = await run(
"helm secrets upgrade --create-namespace"
f" --history-max {MAX_HELM_HISTORY}"
Expand All @@ -214,22 +193,54 @@ async def upgrade_helm_chart() -> RunOutput:
f" -f {cfg.name}"
f" --namespace={app.namespace}"
f" {app.name}"
f" {app.chart.helm_chart} {chart_version_arguments}",
f" {chart_folder_path}",
suppress_errors=True,
)
return result

result = await upgrade_helm_chart()
result = await upgrade_helm_git()
if result["exit_code"] != 0 and "is in progress" in result["output"]:
await self.rollback_deployment(app)
result = await upgrade_helm_chart()
else:
logger.warning("Local is not implemented yet")
return None
result = await upgrade_helm_git()

elif app.chart.type == "helm":
span.set_attribute("gitops.chart.type", "helm")
with tempfile.NamedTemporaryFile(suffix=".yml") as cfg:
cfg.write(json.dumps(app.values).encode())
cfg.flush()
os.fsync(cfg.fileno())
chart_version_arguments = f" --version={app.chart.version}" if app.chart.version else ""
with tracer.start_as_current_span("helm_repo_add"):
await run(f"helm repo add {app.chart.helm_repo} {app.chart.helm_repo_url}")

with tracer.start_as_current_span("helm_upgrade"):

async def upgrade_helm_chart() -> RunOutput:
result = await run(
"helm secrets upgrade --create-namespace"
f" --history-max {MAX_HELM_HISTORY}"
" --install"
" --timeout=600s"
f"{' --set skip_migrations=true' if self.skip_migrations else ''}"
f" -f {cfg.name}"
f" --namespace={app.namespace}"
f" {app.name}"
f" {app.chart.helm_chart} {chart_version_arguments}",
suppress_errors=True,
)
return result

result = await upgrade_helm_chart()
if result["exit_code"] != 0 and "is in progress" in result["output"]:
await self.rollback_deployment(app)
result = await upgrade_helm_chart()
else:
logger.warning("Local is not implemented yet")
return None

update_result = UpdateAppResult(app_name=app.name, slack_message="", **result)
update_result = UpdateAppResult(app_name=app.name, slack_message="", **result)

await self.post_result(app=app, result=update_result, deployer=self, from_timestamp=from_timestamp)
await self.post_result(app=app, result=update_result, deployer=self, from_timestamp=from_timestamp)
return update_result

def calculate_app_deltas(self) -> tuple[set[str], set[str], set[str]]:
Expand Down
2 changes: 1 addition & 1 deletion gitops_server/workers/deployer/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def handle_failed_deploy(app: App, result: UpdateAppResult, deployer, **kw
await find_commiter_slack_user(name=deployer.author_name, email=deployer.author_email) or DEFAULT_USER_GROUP
)
slack_user_msg = f" {slack_user} " if slack_user else ""
log_msg = f"<{get_dashboard_url(workspace_name=app.name, from_timestamp=kwargs.get("from_timestamp"), to_timestamp=time.time())}|(Deployment Logs)>"
log_msg = f"<{get_dashboard_url(workspace_name=app.name, from_timestamp=kwargs.get('from_timestamp'), to_timestamp=time.time())}|(Deployment Logs)>"
result["slack_message"] = (
f"Failed to deploy app `{result['app_name']}` for cluster"
f" `{settings.CLUSTER_NAME}` :rotating_light:"
Expand Down
52 changes: 52 additions & 0 deletions gitops_server/workers/deployer/semaphore_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

logger = logging.getLogger("gitops.semaphore")


class AppSemaphoreManager:
"""Manages per-app semaphores to ensure sequential updates per app.

This ensures that while multiple deployments can run concurrently,
any given app is only updated by one deployment at a time.
"""

_instance: "AppSemaphoreManager | None" = None

def __init__(self) -> None:
self._app_semaphores: dict[str, asyncio.Semaphore] = {}
self._semaphore_lock = asyncio.Lock()
self._initialized: bool = True

async def get_app_semaphore(self, app_name: str) -> asyncio.Semaphore:
"""Get the semaphore for an app.

If the semaphore does not exist, create it.
"""
async with self._semaphore_lock:
if app_name not in self._app_semaphores:
logger.debug(f"Creating new semaphore for app: {app_name}")
self._app_semaphores[app_name] = asyncio.Semaphore(1)
semaphore = self._app_semaphores[app_name]
return semaphore

@asynccontextmanager
async def app_semaphore(self, app_name: str) -> AsyncGenerator[None, None]:
"""Async context manager to acquire and release a semaphore for the given app.

Each app gets exactly one semaphore with a limit of 1,
ensuring sequential updates per app.
"""
semaphore = await self.get_app_semaphore(app_name)
async with semaphore:
yield

def get_locked_apps(self) -> set[str]:
"""Return a set of apps that currently have active locks."""
locked_apps = set()
for app_name, semaphore in self._app_semaphores.items():
if semaphore.locked():
locked_apps.add(app_name)
return locked_apps
Loading
Loading