Skip to content
115 changes: 113 additions & 2 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation);

// Publish worker crash telemetry for Kusto analysis
var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>();
await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, message.JobId, returnCode);
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Expand All @@ -641,8 +645,40 @@ await processChannel.SendAsync(
await renewJobRequest;

Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}");
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

// Check if enhanced crash handling is enabled via agent knob
bool enhancedworkercrashhandlingenabled = AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean();
Trace.Info($"Enhanced worker crash handling enabled: {enhancedworkercrashhandlingenabled}");

if (enhancedworkercrashhandlingenabled)
{
bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent);
bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode);

Trace.Info($"Enhanced crash handling enabled - Normal completion crash analysis [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, IsPlanV8Plus:{isPlanV8Plus}, IsWorkerCrash:{isWorkerCrash}, ExitCode:{returnCode}, NeedsForcedCompletion:{isPlanV8Plus && isWorkerCrash}]");

if (isPlanV8Plus && isWorkerCrash)
{
// Direct plan event reporting for Plan v8+ worker crashes
Trace.Warning($"Plan event reporting for Plan v8+ worker crash [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await ReportJobCompletionEventAsync(message, result);
Trace.Info("Plan event reporting executed successfully for worker crash");
}
else
{
// Standard completion for Plan v7 or normal Plan v8+ scenarios
Trace.Info($"Standard completion for normal scenario [JobId:{message.JobId}, PlanVersion:{message.Plan.Version}, ExitCode:{returnCode}, Result:{result}]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
Trace.Info("Standard completion executed successfully");
}
}
else
{
// Original simple completion logic
Trace.Info($"Using previous completion logic [JobId:{message.JobId}, EnhancedHandling:Disabled]");
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
}

Trace.Info("Job request completion completed");

// print out unhandled exception happened in worker after we complete job request.
Expand Down Expand Up @@ -971,6 +1007,81 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
throw new AggregateException(exceptions);
}

// Reports job completion to server via plan event (similar to how worker reports)
// Used for Plan v8+ scenarios where listener needs to notify server of job completion
private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result)
{
Trace.Info($"Plan event reporting initiated - Sending job completion event to server [JobId:{message.JobId}, Result:{result}]");

try
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
try
{
Uri urlResult = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}");
jobServerUrl = urlResult;
}
}
catch (InvalidOperationException ex)
{
Trace.Error(ex);
}
catch (UriFormatException ex)
{
Trace.Error(ex);
}
}

using (var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation: false))
{
await jobServer.ConnectAsync(jobConnection);
// Create job completed event (similar to worker)
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, false);
try
{
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
Trace.Info($"Plan event reporting completed successfully [JobId:{message.JobId}, Result:{result}]");
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
}
catch (Exception ex)
{
Trace.Error($"Exception during plan event reporting for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}
}
catch (Exception ex)
{
Trace.Error($"Critical error during plan event reporting setup for job {message.JobId}: {ex.Message}");
Trace.Error(ex);
}
}

// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
Expand Down
50 changes: 50 additions & 0 deletions src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Newtonsoft.Json;

namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry
{
[ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))]
public interface IWorkerCrashTelemetryPublisher : IAgentService
{
Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode);
}

public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher
{
public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, Guid? taskInstanceId, int exitCode)
{
try
{
var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>();

var telemetryData = new Dictionary<string, object>
{
["JobId"] = jobId.ToString(),
["TaskInstanceId"] = taskInstanceId?.ToString() ?? "N/A",
["ExitCode"] = exitCode.ToString()
};

var command = new Command("telemetry", "publish")
{
Data = JsonConvert.SerializeObject(telemetryData)
};
command.Properties.Add("area", "AzurePipelinesAgent");
command.Properties.Add("feature", "WorkerCrash");

await telemetryPublisher.PublishEvent(hostContext, command);
Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}");
}
catch (Exception ex)
{
Trace.Warning($"Failed to publish worker crash telemetry: {ex.Message}");
}
}
}
}
6 changes: 6 additions & 0 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ public class AgentKnobs
new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnhancedWorkerCrashHandling = new Knob(
nameof(EnhancedWorkerCrashHandling),
"If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events",
new EnvironmentKnobSource("ENHANCED_WORKER_CRASH_HANDLING"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob AllowWorkDirectoryRepositories = new Knob(
nameof(AllowWorkDirectoryRepositories),
"Allows repositories to be checked out below work directory level on self hosted agents.",
Expand Down
Loading