diff --git a/packages/service-library/src/servicelib/fastapi/tracing.py b/packages/service-library/src/servicelib/fastapi/tracing.py index 2d6419a381e5..9adef7d484cd 100644 --- a/packages/service-library/src/servicelib/fastapi/tracing.py +++ b/packages/service-library/src/servicelib/fastapi/tracing.py @@ -250,7 +250,6 @@ async def tracing_instrumentation_lifespan( class ResponseTraceIdHeaderMiddleware(BaseHTTPMiddleware): - async def dispatch(self, request: Request, call_next): response = await call_next(request) trace_id_header = get_trace_id_header() diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py index 039a9a06f1f3..12d799df07c1 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py @@ -18,6 +18,7 @@ from models_library.service_settings_labels import SimcoreServiceLabels from models_library.users import UserID from pydantic import NonNegativeFloat, NonNegativeInt +from servicelib import tracing from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.logging_utils import log_decorator from servicelib.rabbitmq import RabbitMQClient @@ -141,6 +142,7 @@ async def create_dynamic_service( request_scheme=x_dynamic_sidecar_request_scheme, request_simcore_user_agent=x_simcore_user_agent, can_save=service.can_save, + tracing_context=tracing.get_context(), ) return await scheduler.get_stack_status(service.node_uuid) @@ -186,7 +188,9 @@ async def stop_dynamic_service( assert request # nosec try: - await scheduler.mark_service_for_removal(node_uuid, can_save) + await scheduler.mark_service_for_removal( + node_uuid=node_uuid, can_save=can_save, skip_observation_recreation=False + ) except DynamicSidecarNotFoundError: # legacy service? if it's not then a 404 will anyway be received # forward to director-v0 diff --git a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py index 64a40d476e9a..3c7b781e3a72 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py @@ -42,6 +42,7 @@ TypeAdapter, field_validator, ) +from servicelib import tracing from servicelib.exception_utils import DelayedExceptionHandler from ..constants import ( @@ -460,6 +461,9 @@ def endpoint(self) -> AnyHttpUrl: default=None, description="contains harware information so we know on which hardware to run the service", ) + tracing_context: tracing.TracingContext | None = Field( + default=None, description="contains tracing context to be used" + ) @property def get_proxy_endpoint(self) -> AnyHttpUrl: @@ -531,6 +535,7 @@ def from_http_request( "wallet_info": service.wallet_info, "pricing_info": service.pricing_info, "hardware_info": service.hardware_info, + "tracing_context": service.tracing_context, } if run_id: obj_dict["run_id"] = run_id diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py index c5e47cc3c6b6..4759677fba77 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py @@ -107,7 +107,7 @@ async def mark_service_for_removal( node_uuid: NodeID, can_save: bool | None, *, - skip_observation_recreation: bool = False, + skip_observation_recreation: bool, ) -> None: """The service will be removed as soon as possible""" diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py index af2cf54cad7a..194faaee98de 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events.py @@ -88,7 +88,9 @@ async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool @classmethod async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: - sidecars_client = await get_sidecars_client(app, scheduler_data.node_uuid) + sidecars_client = await get_sidecars_client( + app, scheduler_data.node_uuid + ) # This guy has tracing dynamic_sidecar_endpoint = scheduler_data.endpoint dynamic_sidecars_scheduler_settings: DynamicServicesSchedulerSettings = ( app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py index 3ebe33ced687..6ed84fc835d3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py @@ -7,6 +7,7 @@ from common_library.error_codes import create_error_code from common_library.logging.logging_errors import create_troubleshooting_log_kwargs from fastapi import FastAPI +from opentelemetry.trace import Tracer from .....core.dynamic_services_settings.scheduler import ( DynamicServicesSchedulerSettings, @@ -31,6 +32,7 @@ async def _apply_observation_cycle( scheduler: "DynamicSidecarsScheduler", # type: ignore # noqa: F821 scheduler_data: SchedulerData, + tracer: Tracer | None = None, ) -> None: """ fetches status for service and then processes all the registered events @@ -64,8 +66,11 @@ async def _apply_observation_cycle( if await dynamic_scheduler_event.will_trigger( app=app, scheduler_data=scheduler_data ): - # event.action will apply changes to the output_scheduler_data - await dynamic_scheduler_event.action(app, scheduler_data) + with tracer.start_as_current_span( + f"dy-scheduler.{scheduler_data.service_name}.action.{dynamic_scheduler_event.__name__}", + ): + # event.action will apply changes to the output_scheduler_data + await dynamic_scheduler_event.action(app, scheduler_data) # check if the status of the services has changed from OK if initial_status != scheduler_data.dynamic_sidecar.status: @@ -87,6 +92,7 @@ async def observing_single_service( service_name: ServiceName, scheduler_data: SchedulerData, dynamic_scheduler: DynamicServicesSchedulerSettings, + tracer: Tracer | None = None, ) -> None: app: FastAPI = scheduler.app @@ -135,7 +141,7 @@ async def observing_single_service( scheduler_data_copy: SchedulerData = deepcopy(scheduler_data) try: - await _apply_observation_cycle(scheduler, scheduler_data) + await _apply_observation_cycle(scheduler, scheduler_data, tracer) logger.debug("completed observation cycle of %s", f"{service_name=}") except Exception as exc: # pylint: disable=broad-except service_name = scheduler_data.service_name diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index eae22154152b..0ccdf7a5fdca 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -41,7 +41,9 @@ from models_library.users import UserID from models_library.wallets import WalletID from pydantic import NonNegativeFloat +from servicelib import tracing from servicelib.background_task import create_periodic_task +from servicelib.fastapi.tracing import get_tracing_config from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress from servicelib.redis import RedisClientsManager, exclusive from settings_library.redis import RedisDatabase @@ -241,7 +243,7 @@ async def add_service( request_dns=request_dns, request_scheme=request_scheme, request_simcore_user_agent=request_simcore_user_agent, - can_save=can_save, + can_save=can_save, # meepmoop ) scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = ( arrow.utcnow().datetime @@ -323,7 +325,7 @@ async def mark_service_for_removal( node_uuid: NodeID, can_save: bool | None, *, - skip_observation_recreation: bool = False, + skip_observation_recreation: bool, ) -> None: """Marks service for removal, causing RemoveMarkedService to trigger""" async with self._lock: @@ -393,6 +395,7 @@ async def mark_all_services_in_wallet_for_removal( await self.mark_service_for_removal( scheduler_data.node_uuid, can_save=scheduler_data.dynamic_sidecar.service_removal_state.can_save, + skip_observation_recreation=False, ) async def is_service_awaiting_manual_intervention(self, node_uuid: NodeID) -> bool: @@ -535,21 +538,23 @@ def __create_observation_task( service_name: ServiceName, ) -> asyncio.Task: scheduler_data: SchedulerData = self._to_observe[service_name] - observation_task = asyncio.create_task( - observing_single_service( - scheduler=self, - service_name=service_name, - scheduler_data=scheduler_data, - dynamic_scheduler=dynamic_scheduler, - ), - name=f"{__name__}.observe_{service_name}", - ) - observation_task.add_done_callback( - functools.partial( - lambda s, _: self._service_observation_task.pop(s, None), - service_name, + with tracing.use_tracing_context(scheduler_data.tracing_context): + observation_task = asyncio.create_task( + observing_single_service( + scheduler=self, + service_name=service_name, + scheduler_data=scheduler_data, + dynamic_scheduler=dynamic_scheduler, + tracer=get_tracing_config().tracer_provider.get_tracer(__name__), + ), + name=f"{__name__}.observe_{service_name}", + ) + observation_task.add_done_callback( + functools.partial( + lambda s, _: self._service_observation_task.pop(s, None), + service_name, + ) ) - ) logger.debug("created %s for %s", f"{observation_task=}", f"{service_name=}") return observation_task diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py index 555ea16a9587..5001bafa8c77 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py @@ -18,6 +18,7 @@ from models_library.users import UserID from models_library.wallets import WalletID from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress +from servicelib.tracing import TracingContext from ....core.dynamic_services_settings.scheduler import ( DynamicServicesSchedulerSettings, @@ -78,6 +79,7 @@ async def add_service( request_simcore_user_agent: str, *, can_save: bool, + tracing_context: TracingContext | None = None, ) -> None: return await self.scheduler.add_service( service=service, @@ -102,10 +104,12 @@ async def mark_service_for_removal( node_uuid: NodeID, can_save: bool | None, *, - skip_observation_recreation: bool = False, + skip_observation_recreation: bool, ) -> None: return await self.scheduler.mark_service_for_removal( - node_uuid, can_save, skip_observation_recreation=skip_observation_recreation + node_uuid=node_uuid, + can_save=can_save, + skip_observation_recreation=skip_observation_recreation, ) async def mark_all_services_in_wallet_for_removal( diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py index 19ba5f72bcff..466995c9fb82 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py @@ -145,7 +145,9 @@ async def test_regression_break_endless_loop_cancellation_edge_case( # NOTE: this will create the observation task as well! # Simulates user action like going back to the dashboard. await dynamic_sidecar_scheduler.mark_service_for_removal( - scheduler_data_from_http_request.node_uuid, can_save=can_save + scheduler_data_from_http_request.node_uuid, + can_save=can_save, + skip_observation_recreation=False, ) assert ( diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py index 9f1861087398..e50c04ee4029 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py @@ -113,7 +113,7 @@ async def _assert_get_dynamic_services_mocked( yield stack_status await scheduler.mark_service_for_removal( - scheduler_data.node_uuid, can_save=True + scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False ) assert ( scheduler_data.service_name @@ -259,7 +259,9 @@ async def test_scheduler_add_remove( if with_observation_cycle: await manually_trigger_scheduler() - await scheduler.mark_service_for_removal(scheduler_data.node_uuid, can_save=True) + await scheduler.mark_service_for_removal( + scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False + ) if with_observation_cycle: await manually_trigger_scheduler() @@ -354,7 +356,7 @@ async def test_remove_missing_no_error( ) -> None: with pytest.raises(DynamicSidecarNotFoundError) as execinfo: await scheduler.mark_service_for_removal( - scheduler_data.node_uuid, can_save=True + scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False ) assert "not found" in str(execinfo.value)