From 603238ace31a0ce52b95d51fcc51624ebcdc422f Mon Sep 17 00:00:00 2001 From: Briton Barker Date: Tue, 31 May 2016 11:27:14 -0700 Subject: [PATCH] YarnJobsMonitor use new DateTime rather than getting system time --- .../engine/command/mgmt/YarnJobsMonitor.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala index 430f37b9c3..0b523f9764 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/command/mgmt/YarnJobsMonitor.scala @@ -17,6 +17,7 @@ package org.trustedanalytics.atk.engine.command.mgmt import java.util.concurrent.TimeUnit +import org.joda.time.DateTime import org.trustedanalytics.atk.domain.jobcontext.JobContext import org.trustedanalytics.atk.engine.{ EngineConfig, Engine } import org.trustedanalytics.atk.engine.plugin.Invocation @@ -34,9 +35,11 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R while (true) { engine.getCommandsNotComplete().foreach { command => engine.getCommandJobContext(command) match { - case Some(context) => if (hasStaleContext(context)) { - engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting.")) - } + case Some(context) => + val (isStale, msg) = hasStaleContext(context) + if (isStale) { + engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting. Details: $msg")) + } case None => ; // there is no know YARN job to shutdown (command remains not complete, but this is not the responsibility of a YARN jobs monitor } } @@ -44,6 +47,14 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R } } - def hasStaleContext(context: JobContext): Boolean = - System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + def hasStaleContext(context: JobContext): (Boolean, String) = { + val modMillis = context.modifiedOn.getMillis + val nowMillis = new DateTime().getMillis + val timeoutMillis = timeoutMinutes * 60 * 1000 + val answer = nowMillis - modMillis > timeoutMillis + val msg = s"YarnJobsMonitor hasStaleContext: $nowMillis - $modMillis > $timeoutMillis is $answer" + info(msg) + (answer, msg) + //System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000 + } }