Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package org.trustedanalytics.atk.engine.command.mgmt

import java.util.concurrent.TimeUnit

import org.joda.time.DateTime
import org.trustedanalytics.atk.domain.jobcontext.JobContext
import org.trustedanalytics.atk.engine.{ EngineConfig, Engine }
import org.trustedanalytics.atk.engine.plugin.Invocation
Expand All @@ -34,16 +35,26 @@ class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends R
while (true) {
engine.getCommandsNotComplete().foreach { command =>
engine.getCommandJobContext(command) match {
case Some(context) => if (hasStaleContext(context)) {
engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting."))
}
case Some(context) =>
val (isStale, msg) = hasStaleContext(context)
if (isStale) {
engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting. Details: $msg"))
}
case None => ; // there is no know YARN job to shutdown (command remains not complete, but this is not the responsibility of a YARN jobs monitor
}
}
TimeUnit.MINUTES.sleep(timeoutMinutes)
}
}

def hasStaleContext(context: JobContext): Boolean =
System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000
def hasStaleContext(context: JobContext): (Boolean, String) = {
val modMillis = context.modifiedOn.getMillis
val nowMillis = new DateTime().getMillis
val timeoutMillis = timeoutMinutes * 60 * 1000
val answer = nowMillis - modMillis > timeoutMillis
val msg = s"YarnJobsMonitor hasStaleContext: $nowMillis - $modMillis > $timeoutMillis is $answer"
info(msg)
(answer, msg)
//System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000
}
}