From 765094a532aa48ab0c92d2df86112cdb425bf7c2 Mon Sep 17 00:00:00 2001 From: Kai Date: Sun, 5 Apr 2015 16:38:23 +0200 Subject: [PATCH 01/32] first try for pbs compatibilaty --- gridmap/job.py | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index e271026..c28a52e 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -108,10 +108,10 @@ class Job(object): 'name', 'queue', 'environment', 'working_dir', 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', - 'heart_beat', 'track_mem', 'track_cpu') + 'heart_beat', 'track_mem', 'track_cpu', 'engine') def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", - name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE): + name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, engine="SGE"): """ Initializes a new Job. @@ -133,6 +133,9 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", :param queue: SGE queue to schedule job on. :type queue: str + :param engine: Indicates compatability with a grid engine. Either SGE or TORQUE / PBS + :type engine: str + """ self.track_mem = [] self.track_cpu = [] @@ -158,6 +161,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", self.white_list = [] self.name = name.replace(' ', '_') self.queue = queue + self.engine = engine # Save copy of environment variables self.environment = {} for env_var, value in os.environ.items(): @@ -230,15 +234,30 @@ def native_specification(self): """ define python-style getter """ + pbs = (self.engine == "TORQUE" or self.engine == "PBS") + sge = (self.engine == "SGE") - ret = "-shell yes -b yes" + ret = "" + + if sge: + ret = "-shell yes -b yes" - if self.mem_free and USE_MEM_FREE: - ret += " -l mem_free={}".format(self.mem_free) if self.num_slots and self.num_slots > 1: - ret += " -pe smp {}".format(self.num_slots) + if sge: + ret += " -pe smp {}".format(self.num_slots) + if pbs: + ret += " -l nodes=1:ppn={}".format(self.num_slots) + + if self.mem_free and USE_MEM_FREE: + if sge: + ret += " -l mem_free={}".format(self.mem_free) + if pbs: + ret += " -l mem={}".format(self.mem_free) + if self.white_list: - ret += " -l h={}".format('|'.join(self.white_list)) + if sge: + ret += " -l h={}".format('|'.join(self.white_list)) + if self.queue: ret += " -q {}".format(self.queue) From 2157c05885932f626581dbddb9365f2b7dc6a499 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 15:34:00 +0100 Subject: [PATCH 02/32] fix compilation error and add attribute to slots --- gridmap/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 10d9456..e130635 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -116,11 +116,11 @@ class Job(object): 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', 'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell', - 'copy_env') + 'copy_env', 'engine') def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, - interpreting_shell=None, copy_env=True, add_env=None + interpreting_shell=None, copy_env=True, add_env=None, engine='SGE'): """ Initializes a new Job. From 6bc25d5592072d3f4ff98f28b61b8fb4706c06a4 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 16:13:00 +0100 Subject: [PATCH 03/32] Do not send -b flag to PBS --- gridmap/job.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gridmap/job.py b/gridmap/job.py index e130635..b828ef3 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -266,7 +266,9 @@ def native_specification(self): ret = "-shell yes" if self.interpreting_shell: ret += " -S {}".format(self.interpreting_shell) - ret += " -b yes" + + if sge: + ret += " -b yes" if self.num_slots and self.num_slots > 1: From 8f02fb1bf54016aa8ca1212ebfbead90bceba44c Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 16:13:20 +0100 Subject: [PATCH 04/32] add commandline options to example --- examples/manual.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/examples/manual.py b/examples/manual.py index 937ad92..c0afa15 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -32,7 +32,11 @@ from datetime import datetime from gridmap import Job, process_jobs +import argparse +parser = argparse.ArgumentParser() +parser.add_argument('--engine', help='Name of the grid engine you are using. TOURQUE|PBS|SGE', default='SGE') +parser.add_argument('--queue', help='Name of the queue you want to send jobs to.', default='all.q') def sleep_walk(secs): ''' @@ -57,7 +61,7 @@ def compute_factorial(n): return ret -def make_jobs(): +def make_jobs(engine, queue): """ creates a list of Job objects, which carry all information needed @@ -77,7 +81,7 @@ def make_jobs(): for arg in inputvec: # The default queue used by the Job class is all.q. You must specify # the `queue` keyword argument if that is not the name of your queue. - job = Job(compute_factorial, arg, queue='all.q') + job = Job(compute_factorial, arg, queue=queue, engine=engine) jobs.append(job) return jobs @@ -88,19 +92,23 @@ def main(): run a set of jobs on cluster """ + args = parser.parse_args() + engine = args.engine + queue = args.queue + logging.captureWarnings(True) logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + '%(message)s'), level=logging.INFO) print("=====================================") print("======== Submit and Wait ========") - print("=====================================") - print("") + print("=====================================\n") + - functionJobs = make_jobs() + functionJobs = make_jobs(engine, queue) + + print("Sending function jobs to cluster engine: {}. Into queue: {} \n".format(engine, queue)) - print("sending function jobs to cluster") - print("") job_outputs = process_jobs(functionJobs, max_processes=4) @@ -111,4 +119,3 @@ def main(): if __name__ == "__main__": main() - From 3f0fe4642cb4cd0a530cdbc0ed59b8b66056ce6a Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 18:01:01 +0100 Subject: [PATCH 05/32] more checks for PBS compatability --- gridmap/job.py | 12 ++++++++---- gridmap/runner.py | 14 +++++++++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index b828ef3..807752b 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -248,6 +248,7 @@ def execute(self): contain a pickled version of it. Input data is removed after execution to save space. """ + try: self.ret = self.function(*self.args, **self.kwlist) except Exception as exception: @@ -263,11 +264,13 @@ def native_specification(self): pbs = (self.engine == "TORQUE" or self.engine == "PBS") sge = (self.engine == "SGE") - ret = "-shell yes" - if self.interpreting_shell: - ret += " -S {}".format(self.interpreting_shell) + ret = "" if sge: + ret = "-shell yes" + if self.interpreting_shell: + ret += " -S {}".format(self.interpreting_shell) + ret += " -b yes" @@ -281,7 +284,7 @@ def native_specification(self): if sge: ret += " -l mem_free={}".format(self.mem_free) if pbs: - ret += " -l mem={}".format(self.mem_free) + ret += " -l vmem={}".format(self.mem_free) if self.white_list: if sge: @@ -290,6 +293,7 @@ def native_specification(self): if self.queue: ret += " -q {}".format(self.queue) + print("------" + ret) return ret diff --git a/gridmap/runner.py b/gridmap/runner.py index 48c8cd2..11385cb 100644 --- a/gridmap/runner.py +++ b/gridmap/runner.py @@ -189,9 +189,12 @@ def _run_job(job_id, address): # create heart beat process logger = logging.getLogger(__name__) parent_pid = os.getpid() + log_path = "" + if 'SGE_STDERR_PATH' in os.environ: + log_path = os.environ['SGE_STDERR_PATH'] heart = multiprocessing.Process(target=_heart_beat, args=(job_id, address, parent_pid, - os.environ['SGE_STDERR_PATH'], + log_path, HEARTBEAT_FREQUENCY)) logger.info("Starting heart beat") heart.start() @@ -272,12 +275,17 @@ def _main(): logger.info("Appended {0} to PYTHONPATH".format(args.module_dir)) sys.path.insert(0, args.module_dir) + current_job_id = 0 + if 'JOB_ID' in os.environ: + current_job_id = os.environ['JOB_ID'] + else: + current_job_id = os.environ['PBS_JOBID'] logger.debug("Job ID: %s\tHome address: %s\tModule dir: %s", - os.environ['JOB_ID'], + current_job_id, args.home_address, args.module_dir) # Process the database and get job started - _run_job(os.environ['JOB_ID'], args.home_address) + _run_job(current_job_id, args.home_address) if __name__ == "__main__": From af8635ae922e0b6833d2214934c2738be214383c Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 18:05:37 +0100 Subject: [PATCH 06/32] Add more commandline arguments to the manual example. --- examples/manual.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/manual.py b/examples/manual.py index c0afa15..1b86393 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -37,7 +37,7 @@ parser = argparse.ArgumentParser() parser.add_argument('--engine', help='Name of the grid engine you are using. TOURQUE|PBS|SGE', default='SGE') parser.add_argument('--queue', help='Name of the queue you want to send jobs to.', default='all.q') - +parser.add_argument('--vmem', help='Amount of memory to use on a node.', default='200m') def sleep_walk(secs): ''' Pass the time by adding numbers until the specified number of seconds has @@ -61,7 +61,7 @@ def compute_factorial(n): return ret -def make_jobs(engine, queue): +def make_jobs(engine, queue, vmem): """ creates a list of Job objects, which carry all information needed @@ -81,7 +81,8 @@ def make_jobs(engine, queue): for arg in inputvec: # The default queue used by the Job class is all.q. You must specify # the `queue` keyword argument if that is not the name of your queue. - job = Job(compute_factorial, arg, queue=queue, engine=engine) + job = Job(compute_factorial, arg, queue=queue, engine=engine, + mem_free=vmem) jobs.append(job) return jobs @@ -95,6 +96,7 @@ def main(): args = parser.parse_args() engine = args.engine queue = args.queue + vmem = args.vmem logging.captureWarnings(True) logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + @@ -105,7 +107,7 @@ def main(): print("=====================================\n") - functionJobs = make_jobs(engine, queue) + functionJobs = make_jobs(engine, queue, vmem) print("Sending function jobs to cluster engine: {}. Into queue: {} \n".format(engine, queue)) From befb9bad1fd8601bad8963b8cd96e1c59f875ef6 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 16 Dec 2015 18:28:43 +0100 Subject: [PATCH 07/32] remove debug output --- gridmap/job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gridmap/job.py b/gridmap/job.py index 807752b..bec18fb 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -293,7 +293,6 @@ def native_specification(self): if self.queue: ret += " -q {}".format(self.queue) - print("------" + ret) return ret From e4d2e1faf1f3c19712f73bbfae62402d6e1be9f4 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Fri, 18 Dec 2015 14:53:13 +0100 Subject: [PATCH 08/32] use pbs_jobdir as logging path --- gridmap/runner.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gridmap/runner.py b/gridmap/runner.py index 11385cb..4273180 100644 --- a/gridmap/runner.py +++ b/gridmap/runner.py @@ -190,8 +190,13 @@ def _run_job(job_id, address): logger = logging.getLogger(__name__) parent_pid = os.getpid() log_path = "" + if 'SGE_STDERR_PATH' in os.environ: log_path = os.environ['SGE_STDERR_PATH'] + + if 'PBS_JOBDIR' in os.environ: + log_path = os.environ['PBS_JOBDIR'] + heart = multiprocessing.Process(target=_heart_beat, args=(job_id, address, parent_pid, log_path, From 2d469e66d0af2a5a6ca135e424bba3177d783624 Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 28 Dec 2015 18:18:00 +0100 Subject: [PATCH 09/32] Add parameter to specify a port for the jobmonitor to listen on --- gridmap/job.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index bec18fb..9b1fe0a 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -304,7 +304,7 @@ class JobMonitor(object): """ Job monitor that communicates with other nodes via 0MQ. """ - def __init__(self, temp_dir=DEFAULT_TEMP_DIR): + def __init__(self, temp_dir=DEFAULT_TEMP_DIR, port=None): """ set up socket """ @@ -331,7 +331,11 @@ def __init__(self, temp_dir=DEFAULT_TEMP_DIR): self.interface = "tcp://%s" % (self.ip_address) # bind to random port and remember it - self.port = self.socket.bind_to_random_port(self.interface) + if port is None: + self.port = self.socket.bind_to_random_port(self.interface) + else: + self.port = port + self.home_address = "%s:%i" % (self.interface, self.port) self.logger.info("Setting up JobMonitor on %s", self.home_address) @@ -868,7 +872,7 @@ def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True): def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, - max_processes=1, local=False, require_cluster=False): + max_processes=1, local=False, require_cluster=False, port=None): """ Take a list of jobs and process them on the cluster. @@ -905,7 +909,7 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, if not local: # initialize monitor to get port number - with JobMonitor(temp_dir=temp_dir) as monitor: + with JobMonitor(temp_dir=temp_dir, port=port) as monitor: # get interface and port home_address = monitor.home_address From 9e0cdfc0962ff7ae35319d577e5c1e1b7d8404dd Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 28 Dec 2015 18:18:25 +0100 Subject: [PATCH 10/32] Add cmd line parameter for port and log level --- examples/manual.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/examples/manual.py b/examples/manual.py index 1b86393..89eed66 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -38,6 +38,9 @@ parser.add_argument('--engine', help='Name of the grid engine you are using. TOURQUE|PBS|SGE', default='SGE') parser.add_argument('--queue', help='Name of the queue you want to send jobs to.', default='all.q') parser.add_argument('--vmem', help='Amount of memory to use on a node.', default='200m') +parser.add_argument('--port', help='The port through which to communicate with the JobMonitor', default=None, type=int) +parser.add_argument('--local', help='Flag indicating whether the jobs should run locally instead of on the cluster', default=False, type=bool) +parser.add_argument("--logging", type=str, choices=['INFO', 'DEBUG', 'WARN'], help='increase output verbosity', default='INFO') def sleep_walk(secs): ''' Pass the time by adding numbers until the specified number of seconds has @@ -97,10 +100,19 @@ def main(): engine = args.engine queue = args.queue vmem = args.vmem + port = args.port + local =args.local + level = args.logging + + if level is 'DEBUG': + level = logging.DEBUG + elif level is 'WARN': + level = logging.WARN + elif level is 'INFO': + level = logging.INFO logging.captureWarnings(True) - logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + - '%(message)s'), level=logging.INFO) + logging.basicConfig(format=('%(asctime)s - %(name)s - %(levelname)s - ' + '%(message)s'), level=level) print("=====================================") print("======== Submit and Wait ========") @@ -108,11 +120,13 @@ def main(): functionJobs = make_jobs(engine, queue, vmem) - - print("Sending function jobs to cluster engine: {}. Into queue: {} \n".format(engine, queue)) + if local : + print('Running jobs locally') + else: + print("Sending function jobs to cluster engine: {}. Into queue: {} \n".format(engine, queue)) - job_outputs = process_jobs(functionJobs, max_processes=4) + job_outputs = process_jobs(functionJobs, max_processes=4, port=port, local=local) print("results from each job") for (i, result) in enumerate(job_outputs): From 7db85f4e823a894a089e1c46e049ce68fb556e39 Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 28 Dec 2015 18:18:56 +0100 Subject: [PATCH 11/32] additional output in case libdrmaa cannot be found --- gridmap/conf.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gridmap/conf.py b/gridmap/conf.py index bfc7d5f..14a018e 100644 --- a/gridmap/conf.py +++ b/gridmap/conf.py @@ -75,10 +75,11 @@ try: import drmaa DRMAA_PRESENT = True -except (ImportError, OSError, RuntimeError): +except (ImportError, OSError, RuntimeError) as e: logger = logging.getLogger(__name__) logger.warning('Could not import drmaa. Only local multiprocessing ' + 'supported.') + logger.warning(str(e)) DRMAA_PRESENT = False # plot cpu and mem usage and send via email From 9511c3504311eacc325196455d9f73fd87dd51b2 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 30 Dec 2015 14:58:28 +0100 Subject: [PATCH 12/32] check return values for cokmpletion when they are not strings --- gridmap/job.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 9b1fe0a..1da375c 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -56,6 +56,11 @@ SMTPDataError) import zmq +#add this for chekcing of string instances in python 2 and 3 +try: + basestring +except NameError: + basestring = str from gridmap.conf import (CHECK_FREQUENCY, CREATE_PLOTS, DEFAULT_QUEUE, DRMAA_PRESENT, ERROR_MAIL_RECIPIENT, @@ -495,7 +500,9 @@ def check_if_alive(self): for job in self.jobs: # noting was returned yet - if job.ret == _JOB_NOT_FINISHED: + print("checking: ") + print(job.ret) + if isinstance(job.ret, basestring) and job.ret == _JOB_NOT_FINISHED: # exclude first-timers if job.timestamp is not None: @@ -561,18 +568,23 @@ def all_jobs_done(self): """ checks for all jobs if they are done """ + + def condition(retval): + return (isinstance(retval, basestring) and retval == _JOB_NOT_FINISHED) + + for j in self.jobs: + print(j.ret) + print(condition(j.ret)) + + running_jobs = [ job for job in self.jobs if condition(job.ret) ] + num_jobs = len(self.jobs) + if self.logger.getEffectiveLevel() == logging.DEBUG: - num_jobs = len(self.jobs) - num_completed = sum((job.ret != _JOB_NOT_FINISHED and - not isinstance(job.ret, Exception)) - for job in self.jobs) - self.logger.debug('%i out of %i jobs completed', num_completed, + self.logger.debug('%i out of %i jobs completed', num_jobs - len(running_jobs), num_jobs) # exceptions will be handled in check_if_alive - return all((job.ret != _JOB_NOT_FINISHED and not isinstance(job.ret, - Exception)) - for job in self.jobs) + return len(running_jobs) == 0 def _send_mail(subject, body_text, attachments=None): From 20cb975c43263ca4cf8e37eb22870845a6b9a01d Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 30 Dec 2015 15:30:34 +0100 Subject: [PATCH 13/32] remove debug output --- gridmap/job.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 1da375c..3a14e85 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -500,8 +500,6 @@ def check_if_alive(self): for job in self.jobs: # noting was returned yet - print("checking: ") - print(job.ret) if isinstance(job.ret, basestring) and job.ret == _JOB_NOT_FINISHED: # exclude first-timers From a1435d9c8b03b8d90a0087a17197a827a1f820a5 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 30 Dec 2015 17:23:03 +0100 Subject: [PATCH 14/32] remove more stupid debug output --- gridmap/job.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 3a14e85..452babd 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -570,10 +570,6 @@ def all_jobs_done(self): def condition(retval): return (isinstance(retval, basestring) and retval == _JOB_NOT_FINISHED) - for j in self.jobs: - print(j.ret) - print(condition(j.ret)) - running_jobs = [ job for job in self.jobs if condition(job.ret) ] num_jobs = len(self.jobs) From d856f53a5aedc7030099085d51d204d98ca0eacb Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Wed, 30 Dec 2015 17:23:58 +0100 Subject: [PATCH 15/32] write log files on PBS --- gridmap/runner.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/gridmap/runner.py b/gridmap/runner.py index 4273180..2527290 100644 --- a/gridmap/runner.py +++ b/gridmap/runner.py @@ -188,14 +188,18 @@ def _run_job(job_id, address): """ # create heart beat process logger = logging.getLogger(__name__) - parent_pid = os.getpid() - log_path = "" + parent_pid = os.getpid() + log_path = None if 'SGE_STDERR_PATH' in os.environ: log_path = os.environ['SGE_STDERR_PATH'] - if 'PBS_JOBDIR' in os.environ: - log_path = os.environ['PBS_JOBDIR'] + if log_path is None and 'PBS_O_WORKDIR' in os.environ: + n = 'log_{}.out'.format(os.environ['PBS_JOBID'], ) + log_path = os.path.join(os.environ['PBS_O_WORKDIR'], n) + + if log_path is None: + log_path='./log.out' heart = multiprocessing.Process(target=_heart_beat, args=(job_id, address, parent_pid, From 62e02f60b21b43948092113fa8b1c02f4310409f Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Sat, 2 Jan 2016 11:15:22 +0100 Subject: [PATCH 16/32] try to get session id --- gridmap/job.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/gridmap/job.py b/gridmap/job.py index 452babd..efea08b 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -374,6 +374,7 @@ def __exit__(self, exc_type, exc_value, exc_tb): exc_type.__name__) # try to kill off all old jobs try: + self.logger.info('Sending Terminate for all jobs on Session {} with id {}'.format(session, self.session_id)) session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) except InvalidJobException: @@ -463,6 +464,12 @@ def check(self, session_id, jobs): exc_info=True) return_msg = "all good" job.timestamp = datetime.now() + usage = 0 + for j in self.jobs: + if len(j.track_cpu) > 0: + usage += j.track_cpu[-1][0] + + print("Total CPU usage: {} % for a total of {} submitted jobs.".format(usage, len(self.jobs)), end='\r') if msg["command"] == "get_job": # serve job for display @@ -904,8 +911,8 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, :returns: List of Job results """ + logger = logging.getLogger(__name__) if (not local and not DRMAA_PRESENT): - logger = logging.getLogger(__name__) if require_cluster: raise DRMAANotPresentException( 'Could not import drmaa, but cluster access required.' @@ -924,6 +931,7 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, white_list=white_list, quiet=quiet) # handling of inputs, outputs and heartbeats + logger.info("Started DRMAA session with ID: {}".format(sid)) monitor.check(sid, jobs) else: _process_jobs_locally(jobs, max_processes=max_processes) From 3911e0afd3f50b10db7e1c077c5984b315e89008 Mon Sep 17 00:00:00 2001 From: Kai Date: Sat, 2 Jan 2016 11:34:15 +0100 Subject: [PATCH 17/32] use one session instance instead of passing around ids --- gridmap/job.py | 111 ++++++++++++++++++++++++------------------------- 1 file changed, 55 insertions(+), 56 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index efea08b..12bcefd 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -348,7 +348,7 @@ def __init__(self, temp_dir=DEFAULT_TEMP_DIR, port=None): # uninitialized field (set in check method) self.jobs = [] self.ids = [] - self.session_id = None + self.session = None self.id_to_job = {} def __enter__(self): @@ -366,29 +366,33 @@ def __exit__(self, exc_type, exc_value, exc_tb): self.socket.close() # Clean up if we have a valid session - if self.session_id is not None: - with Session(self.session_id) as session: + if self.session is not None: # If we encounter an exception, kill all jobs - if exc_type is not None: - self.logger.info('Encountered %s, so killing all jobs.', - exc_type.__name__) - # try to kill off all old jobs - try: - self.logger.info('Sending Terminate for all jobs on Session {} with id {}'.format(session, self.session_id)) - session.control(JOB_IDS_SESSION_ALL, - JobControlAction.TERMINATE) - except InvalidJobException: - self.logger.debug("Could not kill all jobs for " + - "session.", exc_info=True) - - # Get rid of job info to prevent memory leak + if exc_type is not None: + self.logger.info('Encountered %s, so killing all jobs.', exc_type.__name__) + # try to kill off all old jobs try: - session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, - dispose=True) - except ExitTimeoutException: - pass + self.logger.info('Sending Terminate for all jobs on Session {} '.format(self.session)) + session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) + + except InvalidJobException: + self.logger.debug("Could not kill all jobs for session.", exc_info=True) + finally: + self.logger('Exiting drmaa session') + self.session.exit() + else: + self.logger('Exiting drmaa session') + self.session.exit() + # Get rid of job info to prevent memory leak + try: + session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, + dispose=True) + except ExitTimeoutException: + pass + + - def check(self, session_id, jobs): + def check(self, session, jobs): """ serves input and output data """ @@ -396,8 +400,8 @@ def check(self, session_id, jobs): self.jobs = jobs self.id_to_job = {job.id: job for job in self.jobs} - # keep track of DRMAA session_id (for resubmissions) - self.session_id = session_id + # keep track of DRMAA session (for resubmissions) + self.session = session # determines in which interval to check if jobs are alive self.logger.debug('Starting local hearbeat') @@ -426,8 +430,7 @@ def check(self, session_id, jobs): if msg["command"] == "fetch_input": return_msg = self.id_to_job[job_id] job.timestamp = datetime.now() - self.logger.debug("Received input request from %s", - job_id) + self.logger.debug("Received input request from %s", job_id) if msg["command"] == "store_output": # be nice @@ -439,13 +442,11 @@ def check(self, session_id, jobs): # copy relevant fields job.ret = tmp_job.ret job.traceback = tmp_job.traceback - self.logger.info("Received output from %s", - job_id) + self.logger.info("Received output from %s", job_id) # Returned exception instead of job, so store that elif isinstance(msg["data"], tuple): job.ret, job.traceback = msg["data"] - self.logger.info("Received exception from %s", - job_id) + self.logger.info("Received exception from %s", job_id) else: self.logger.error(("Received message with " + "invalid data: %s"), msg) @@ -558,7 +559,7 @@ def check_if_alive(self): old_id = job.id job.track_cpu = [] job.track_mem = [] - handle_resubmit(self.session_id, job, temp_dir=self.temp_dir) + handle_resubmit(self.session, job, temp_dir=self.temp_dir) # Update job ID if successfully resubmitted self.logger.info('Resubmitted job %s; it now has ID %s', old_id, @@ -717,7 +718,7 @@ def send_error_mail(job): os.unlink(img_mem_fn) -def handle_resubmit(session_id, job, temp_dir=DEFAULT_TEMP_DIR): +def handle_resubmit(session, job, temp_dir=DEFAULT_TEMP_DIR): """ heuristic to determine if the job should be resubmitted @@ -744,7 +745,7 @@ def handle_resubmit(session_id, job, temp_dir=DEFAULT_TEMP_DIR): job.num_resubmits += 1 job.cause_of_death = "" - _resubmit(session_id, job, temp_dir) + _resubmit(session, job, temp_dir) else: raise JobException(("Job {0} ({1}) failed after {2} " + "resubmissions").format(job.name, job.id, @@ -812,19 +813,19 @@ def _submit_jobs(jobs, home_address, temp_dir=DEFAULT_TEMP_DIR, white_list=None, :returns: Session ID """ - with Session() as session: - for job in jobs: - # set job white list - job.white_list = white_list + session = Session() + session.initialize() + for job in jobs: + # set job white list + job.white_list = white_list - # remember address of submission host - job.home_address = home_address + # remember address of submission host + job.home_address = home_address - # append jobs - _append_job_to_session(session, job, temp_dir=temp_dir, quiet=quiet) + # append jobs + _append_job_to_session(session, job, temp_dir=temp_dir, quiet=quiet) - sid = session.contact - return sid + return session def _append_job_to_session(session, job, temp_dir=DEFAULT_TEMP_DIR, quiet=True): @@ -927,19 +928,19 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, home_address = monitor.home_address # job_id field is attached to each job object - sid = _submit_jobs(jobs, home_address, temp_dir=temp_dir, + session = _submit_jobs(jobs, home_address, temp_dir=temp_dir, white_list=white_list, quiet=quiet) # handling of inputs, outputs and heartbeats - logger.info("Started DRMAA session with ID: {}".format(sid)) - monitor.check(sid, jobs) + logger.info("Started DRMAA session with Session {}".format(session)) + monitor.check(session, jobs) else: _process_jobs_locally(jobs, max_processes=max_processes) return [job.ret for job in jobs] -def _resubmit(session_id, job, temp_dir): +def _resubmit(session, job, temp_dir): """ Resubmit a failed job. @@ -949,17 +950,15 @@ def _resubmit(session_id, job, temp_dir): logger.info("starting resubmission process") if DRMAA_PRESENT: - # append to session - with Session(session_id) as session: # try to kill off old job - try: - session.control(job.id, JobControlAction.TERMINATE) - logger.info("zombie job killed") - except Exception: - logger.error("Could not kill job with SGE id %s", job.id, - exc_info=True) - # create new job - _append_job_to_session(session, job, temp_dir=temp_dir) + try: + session.control(job.id, JobControlAction.TERMINATE) + logger.info("zombie job killed") + except Exception: + logger.error("Could not kill job with SGE id %s", job.id, + exc_info=True) + # create new job + _append_job_to_session(session, job, temp_dir=temp_dir) else: logger.error("Could not restart job because we're in local mode.") From 07675573c03db5938759cd4ba7878707e8006191 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Sat, 2 Jan 2016 16:17:27 +0100 Subject: [PATCH 18/32] fix bug when setting port number --- examples/manual.py | 2 +- gridmap/job.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/examples/manual.py b/examples/manual.py index 89eed66..306026b 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -35,7 +35,7 @@ import argparse parser = argparse.ArgumentParser() -parser.add_argument('--engine', help='Name of the grid engine you are using. TOURQUE|PBS|SGE', default='SGE') +parser.add_argument('--engine', help='Name of the grid engine you are using.', choices=['TOURQUE','PBS','SGE'], default='SGE') parser.add_argument('--queue', help='Name of the queue you want to send jobs to.', default='all.q') parser.add_argument('--vmem', help='Amount of memory to use on a node.', default='200m') parser.add_argument('--port', help='The port through which to communicate with the JobMonitor', default=None, type=int) diff --git a/gridmap/job.py b/gridmap/job.py index 12bcefd..3e650e7 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -340,6 +340,7 @@ def __init__(self, temp_dir=DEFAULT_TEMP_DIR, port=None): self.port = self.socket.bind_to_random_port(self.interface) else: self.port = port + self.socket.bind("{}:{}".format(self.interface, port)) self.home_address = "%s:%i" % (self.interface, self.port) @@ -373,23 +374,21 @@ def __exit__(self, exc_type, exc_value, exc_tb): # try to kill off all old jobs try: self.logger.info('Sending Terminate for all jobs on Session {} '.format(self.session)) - session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) + self.session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) except InvalidJobException: self.logger.debug("Could not kill all jobs for session.", exc_info=True) - finally: - self.logger('Exiting drmaa session') - self.session.exit() - else: - self.logger('Exiting drmaa session') - self.session.exit() # Get rid of job info to prevent memory leak try: - session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, + self.session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, dispose=True) except ExitTimeoutException: pass + finally: + self.logger.info('Exiting drmaa session') + self.session.exit() + def check(self, session, jobs): From 73f6121807cef28bcb3af0e0297c8aa8cb42239c Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 15:46:39 +0100 Subject: [PATCH 19/32] add more output --- gridmap/job.py | 57 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 3e650e7..a1dd175 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -82,6 +82,8 @@ class DRMAANotPresentException(ImportError): JobControlAction, JOB_IDS_SESSION_ALL, Session, TIMEOUT_NO_WAIT) + import drmaa.errors + # Python 2.x backward compatibility if sys.version_info < (3, 0): range = xrange @@ -377,7 +379,12 @@ def __exit__(self, exc_type, exc_value, exc_tb): self.session.control(JOB_IDS_SESSION_ALL, JobControlAction.TERMINATE) except InvalidJobException: - self.logger.debug("Could not kill all jobs for session.", exc_info=True) + self.logger.warn("Could not kill all jobs for session.", exc_info=True) + + except drmaa.errors.InternalException: + #cleanup in finaly clause below + self.logger.warn("Could not kill all jobs for session.") + pass # Get rid of job info to prevent memory leak try: self.session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, @@ -386,6 +393,14 @@ def __exit__(self, exc_type, exc_value, exc_tb): pass finally: + #cleanup remaining jobs that cannot be kille by JOB_IDS_SESSION_ALL + for job in self.jobs: + try: + self.logger.info("Killing job {}".format(job.id)) + self.session.control(job.id, JobControlAction.TERMINATE) + except: + pass + self.logger.info('Exiting drmaa session') self.session.exit() @@ -464,12 +479,10 @@ def check(self, session, jobs): exc_info=True) return_msg = "all good" job.timestamp = datetime.now() - usage = 0 - for j in self.jobs: - if len(j.track_cpu) > 0: - usage += j.track_cpu[-1][0] - print("Total CPU usage: {} % for a total of {} submitted jobs.".format(usage, len(self.jobs)), end='\r') + running_jobs = [ j for j in self.jobs if (len(j.track_cpu) > 0 and (isinstance(j.ret, basestring) and j.ret == _JOB_NOT_FINISHED))] + usage = [j.track_cpu[-1][0] for j in running_jobs] + print("Total CPU usage: {} % for a total of {} running jobs.".format(sum(usage), len(running_jobs)), end='\r') if msg["command"] == "get_job": # serve job for display @@ -611,7 +624,7 @@ def _send_mail(subject, body_text, attachments=None): msg["From"] = ERROR_MAIL_SENDER msg["To"] = ERROR_MAIL_RECIPIENT - logger.info('Email body: %s', body_text) + logger.info('Email Sent with subject : {}'.format(subject)) body_msg = MIMEText(body_text) msg.attach(body_msg) @@ -933,9 +946,15 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, # handling of inputs, outputs and heartbeats logger.info("Started DRMAA session with Session {}".format(session)) monitor.check(session, jobs) + + if SEND_ERROR_MAIL: + send_completion_mail(name="gridmap job", jobs=jobs) + + else: _process_jobs_locally(jobs, max_processes=max_processes) + return [job.ret for job in jobs] @@ -1045,12 +1064,12 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', # send a completion mail (if requested and configured) if completion_mail and SEND_ERROR_MAIL: - send_completion_mail(name=name) + send_completion_mail(name=name, jobs=jobs) return job_results -def send_completion_mail(name): +def send_completion_mail(name, jobs): """ send out success email """ @@ -1060,6 +1079,26 @@ def send_completion_mail(name): # compose error message body_text = "" body_text += "Job {}\n".format(name) + body_text += "Collected results from {} jobs \n \n".format(len(jobs)) + for job in jobs: + if job.heart_beat: + body_text += "Last memory usage: {}\n".format(job.heart_beat["memory"]) + body_text += "Mean cpu load: {}\n".format(sum(job.heart_beat["cpu_load"])/len(job.heart_beat["cpu_load"])) + body_text += ("Process was running at last check: " + + "{}\n\n").format(job.heart_beat["cpu_load"][1]) + + body_text += "Host: {}\n\n".format(job.host_name) + + if isinstance(job.ret, Exception): + body_text += "Job encountered exception: {}\n".format(job.ret) + body_text += "Stacktrace: {}\n\n".format(job.traceback) + + # attach log file + if job.heart_beat and "log_file" in job.heart_beat: + log_file_attachement = MIMEText(job.heart_beat['log_file']) + log_file_attachement.add_header('Content-Disposition', 'attachment', + filename='{}_log.txt'.format(job.id)) + attachments.append(log_file_attachement) # Send mail _send_mail(subject, body_text) From da24fe69f1a3ee961dec0e174432238bde89b7fd Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 15:48:22 +0100 Subject: [PATCH 20/32] add gitiginreo --- .gitignore | 61 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index e73a52a..7d44098 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,59 @@ -*.pyc -*.pyo +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class -*.egg -gridmap.egg-info/ -dist/ +# C extensions +*.so + +# Distribution / packaging +.Python +env/ build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation docs/_build/ + +# PyBuilder +target/ From d8f28c234be4558b524fd81ec92cf6b603679cc9 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 15:59:32 +0100 Subject: [PATCH 21/32] new readme --- README.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..932ca1a --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +#GridMap + +This is a WIP fork of the original gridmap project. This was created to achieve compatability +with the TORQUE and PBS grid engine. + +### Requirements +Jobs can only be submitted from nodes than are allowed to do that (i.e they can run 'qsub') + +A couple of environment variables need to be set in order to work. + +ERROR_MAIL_RECIPIENT = +DRMAA_LIBRARY_PATH = +DEFAULT_TEMP_DIR = +SEND_ERROR_MAIL=TRUE +SMTP_SERVER = + + +### Python Requirements + + +- drmaa +- psutil +- pyzmq +- Python 3.4+ From 5f2e3ec39ca6a5d5500b564e7dd7cd3d0862f2f3 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 16:00:45 +0100 Subject: [PATCH 22/32] fix md syntax --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 932ca1a..e7210c0 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,15 @@ Jobs can only be submitted from nodes than are allowed to do that (i.e they can A couple of environment variables need to be set in order to work. -ERROR_MAIL_RECIPIENT = -DRMAA_LIBRARY_PATH = -DEFAULT_TEMP_DIR = +ERROR_MAIL_RECIPIENT = *your email address* + +DRMAA_LIBRARY_PATH = *like pbs_drmaa/libs/libdrmaa.so for pbs* + +DEFAULT_TEMP_DIR = *scratch space on the computing nodes* + SEND_ERROR_MAIL=TRUE -SMTP_SERVER = + +SMTP_SERVER = *your smtp server. like: unimail.tu-dortmund.de* ### Python Requirements From 96c5b2d22fb0ef18ffbd002629e4c5c2efd3b2fd Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 16:01:15 +0100 Subject: [PATCH 23/32] delete old readme --- README.rst | 58 ------------------------------------------------------ 1 file changed, 58 deletions(-) delete mode 100644 README.rst diff --git a/README.rst b/README.rst deleted file mode 100644 index a0c91db..0000000 --- a/README.rst +++ /dev/null @@ -1,58 +0,0 @@ -GridMap ------------ - -.. image:: https://img.shields.io/travis/pygridtools/gridmap/stable.svg - :alt: Build status - :target: https://travis-ci.org/pygridtools/gridmap - -.. image:: https://img.shields.io/coveralls/pygridtools/gridmap/stable.svg - :target: https://coveralls.io/r/pygridtools/gridmap - -.. image:: https://img.shields.io/pypi/dm/gridmap.svg - :target: https://warehouse.python.org/project/gridmap/ - :alt: PyPI downloads - -.. image:: https://img.shields.io/pypi/v/gridmap.svg - :target: https://warehouse.python.org/project/gridmap/ - :alt: Latest version on PyPI - -.. image:: https://img.shields.io/pypi/l/gridmap.svg - :alt: License - -A package to allow you to easily create jobs on the cluster directly from -Python. You can directly map Python functions onto the cluster without needing -to write any wrapper code yourself. - -This is the ETS fork of an older project called `Python Grid `__. Unlike the older -version, it is Python 2/3 compatible. Another major difference is that you can -change the configuration via environment variables instead of having to modify -a Python file in your ``site-packages`` directory. We've also fixed some bugs. - -For some examples of how to use it, check out ``map_reduce.py`` (for a simple -example of how you can map a function onto the cluster) and ``manual.py`` (for -an example of how you can create list of jobs yourself) in the examples folder. - -For complete documentation `read the docs `__. - -*NOTE*: You cannot use GridMap on a machine that is not allowed to submit jobs -(e.g., slave nodes). - -Requirements -~~~~~~~~~~~~ - -- `drmaa `__ -- `psutil `__ -- `pyzmq `__ -- Python 2.7+ - -Acknowledgments -~~~~~~~~~~~~~~~ - -Thank you to `Max-Planck-Society `__ and -`Educational Testing Service `__ for -funding the development of GridMap. - -Changelog -~~~~~~~~~ - -See `GitHub releases `__. From cfc559d209c96db64d29aa638c9d11c86adffdaf Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 16:39:01 +0100 Subject: [PATCH 24/32] fix setup.py for new readme --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 523a236..30b05ba 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def readme(): - with open('README.rst') as f: + with open('README.md') as f: return f.read() From 34398ce9113fce0d264b02bf27a290aaf96b7e9f Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 5 Jan 2016 18:24:37 +0100 Subject: [PATCH 25/32] more envs in readme --- README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e7210c0..21dd671 100644 --- a/README.md +++ b/README.md @@ -10,14 +10,19 @@ A couple of environment variables need to be set in order to work. ERROR_MAIL_RECIPIENT = *your email address* -DRMAA_LIBRARY_PATH = *like pbs_drmaa/libs/libdrmaa.so for pbs* +export DRMAA_LIBRARY_PATH = *like pbs_drmaa/libs/libdrmaa.so for pbs* -DEFAULT_TEMP_DIR = *scratch space on the computing nodes* +export DEFAULT_TEMP_DIR="/local/$USER/" -SEND_ERROR_MAIL=TRUE +export USE_MEM_FREE=TRUE -SMTP_SERVER = *your smtp server. like: unimail.tu-dortmund.de* +export SMTP_SERVER="unimail.tu-dortmund.de" +export ERROR_MAIL_RECIPIENT="your.email@address.com" + +export ERROR_MAIL_SENDER="torque@hpc-main3.phido.physik.tu-dortmund.de" + +export SEND_ERROR_MAIL=TRUE ### Python Requirements From f16df0d02b14cc4b8fce779c244b4f2b6aae96e5 Mon Sep 17 00:00:00 2001 From: Kai Date: Sun, 17 Jan 2016 10:50:43 +0100 Subject: [PATCH 26/32] fix unitiliaazed variable in completion mail --- gridmap/job.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index a1dd175..c85ed2e 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -44,6 +44,7 @@ import sys import traceback import functools +import tempfile from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -692,7 +693,9 @@ def send_error_mail(job): time = [HEARTBEAT_FREQUENCY * i for i in range(len(job.track_mem))] # attack mem plot - img_mem_fn = os.path.join('/tmp', "{}_mem.png".format(job.id)) + temp_dir = tempfile.mkdtemp() + + img_mem_fn = os.path.join(temp_dir, "{}_mem.png".format(job.id)) plt.figure(1) plt.plot(time, job.track_mem, "-o") plt.xlabel("time (s)") @@ -707,7 +710,7 @@ def send_error_mail(job): attachments.append(img_mem_attachement) # attach cpu plot - img_cpu_fn = os.path.join("/tmp", "{}_cpu.png".format(job.id)) + img_cpu_fn = os.path.join(temp_dir, "{}_cpu.png".format(job.id)) plt.figure(2) plt.plot(time, [cpu_load for cpu_load, _ in job.track_cpu], "-o") plt.xlabel("time (s)") @@ -1074,13 +1077,14 @@ def send_completion_mail(name, jobs): send out success email """ # create message - subject = "GridMap completed grid_map {}".format(name) + subject = "GridMap completed {}".format(name) # compose error message body_text = "" body_text += "Job {}\n".format(name) body_text += "Collected results from {} jobs \n \n".format(len(jobs)) + attachments = [] for job in jobs: if job.heart_beat: body_text += "Last memory usage: {}\n".format(job.heart_beat["memory"]) @@ -1101,4 +1105,4 @@ def send_completion_mail(name, jobs): filename='{}_log.txt'.format(job.id)) attachments.append(log_file_attachement) # Send mail - _send_mail(subject, body_text) + _send_mail(subject, body_text, attachments=attachments) From 4e9440c456ceab057a4e9127470b971b027ea313 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 19 Jan 2016 11:14:05 +0100 Subject: [PATCH 27/32] change version number to test installatino dependencies --- gridmap/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gridmap/version.py b/gridmap/version.py index 53a3201..24ae398 100644 --- a/gridmap/version.py +++ b/gridmap/version.py @@ -28,5 +28,5 @@ :organization: ETS ''' -__version__ = '0.13.0' +__version__ = '0.13.1' VERSION = tuple(int(x) for x in __version__.split('.')) From c6afa026a9310d01bf7f1f61c38772bb4d5a2741 Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 16 Feb 2016 15:10:37 +0100 Subject: [PATCH 28/32] add more output on job retiurn and exit --- gridmap/job.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/gridmap/job.py b/gridmap/job.py index c85ed2e..6a7cc89 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -458,6 +458,8 @@ def check(self, session, jobs): job.ret = tmp_job.ret job.traceback = tmp_job.traceback self.logger.info("Received output from %s", job_id) + if isinstance(job.ret, basestring) and len(job.ret) < 1000: + self.logger.info("Output was {}".format(job.ret) ) # Returned exception instead of job, so store that elif isinstance(msg["data"], tuple): job.ret, job.traceback = msg["data"] @@ -950,8 +952,12 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, logger.info("Started DRMAA session with Session {}".format(session)) monitor.check(session, jobs) + print("JobMonitor killed.") + if SEND_ERROR_MAIL: send_completion_mail(name="gridmap job", jobs=jobs) + print("returning results so far") + return [job.ret for job in jobs] else: From 72d7812b9198f3defbde373a2f5d6c99fae5feae Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 16 Feb 2016 15:18:58 +0100 Subject: [PATCH 29/32] make jobs in exampele run different durations --- examples/manual.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/manual.py b/examples/manual.py index 306026b..3b5f314 100755 --- a/examples/manual.py +++ b/examples/manual.py @@ -53,11 +53,11 @@ def sleep_walk(secs): num = num + 1 -def compute_factorial(n): +def compute_factorial(n, sleep=10): """ computes factorial of n """ - sleep_walk(10) + sleep_walk(sleep) ret = 1 for i in range(n): ret = ret * (i + 1) @@ -75,7 +75,7 @@ def make_jobs(engine, queue, vmem): """ # set up list of arguments - inputvec = [[3], [5], [10], [20]] + inputvec = [[3, 10], [5, 20], [10, 10], [20, 20]] # create empty job vector jobs = [] From d850bc709fad0dce562e73d2429e5483bd533bcc Mon Sep 17 00:00:00 2001 From: Kai Bruegge Date: Tue, 16 Feb 2016 16:04:18 +0100 Subject: [PATCH 30/32] use keyboard interupt to return intermediate ruslts. very hacky --- gridmap/job.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index 6a7cc89..df7807d 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -45,6 +45,7 @@ import traceback import functools import tempfile +import signal from datetime import datetime from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -402,7 +403,7 @@ def __exit__(self, exc_type, exc_value, exc_tb): except: pass - self.logger.info('Exiting drmaa session') + self.logger.info('Exiting drmaa session {}'.format(self.session)) self.session.exit() @@ -929,6 +930,7 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, :returns: List of Job results """ + logger = logging.getLogger(__name__) if (not local and not DRMAA_PRESENT): if require_cluster: @@ -940,19 +942,22 @@ def process_jobs(jobs, temp_dir=DEFAULT_TEMP_DIR, white_list=None, quiet=True, if not local: # initialize monitor to get port number - with JobMonitor(temp_dir=temp_dir, port=port) as monitor: - # get interface and port - home_address = monitor.home_address - - # job_id field is attached to each job object - session = _submit_jobs(jobs, home_address, temp_dir=temp_dir, - white_list=white_list, quiet=quiet) - - # handling of inputs, outputs and heartbeats - logger.info("Started DRMAA session with Session {}".format(session)) - monitor.check(session, jobs) - - print("JobMonitor killed.") + try: + with JobMonitor(temp_dir=temp_dir, port=port) as monitor: + # get interface and port + home_address = monitor.home_address + + # job_id field is attached to each job object + session = _submit_jobs(jobs, home_address, temp_dir=temp_dir, + white_list=white_list, quiet=quiet) + + # handling of inputs, outputs and heartbeats + logger.info("Started DRMAA session with Session {}".format(session)) + monitor.check(session, jobs) + except KeyboardInterrupt: + print("JobMonitor killed.") + print("Caught KeyBoard interrupt. Returning intermediate results.") + return [job.ret for job in jobs] if SEND_ERROR_MAIL: send_completion_mail(name="gridmap job", jobs=jobs) From f067b6839f93776139dff9acfc74b177bbf3d78a Mon Sep 17 00:00:00 2001 From: jebuss Date: Tue, 16 Feb 2016 16:32:29 +0100 Subject: [PATCH 31/32] added walltime for PBS jobs --- gridmap/job.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gridmap/job.py b/gridmap/job.py index 6a7cc89..c12968c 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -129,7 +129,7 @@ class Job(object): def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, interpreting_shell=None, copy_env=True, add_env=None, - engine='SGE'): + engine='SGE', walltime=None): """ Initializes a new Job. @@ -183,6 +183,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", self.ret = _JOB_NOT_FINISHED self.num_slots = num_slots self.mem_free = mem_free + self.walltime = walltime self.white_list = [] self.name = name.replace(' ', '_') self.queue = queue @@ -298,6 +299,10 @@ def native_specification(self): if sge: ret += " -l h={}".format('|'.join(self.white_list)) + if self.walltime: + if pbs: + ret += " -l walltime={}".format(self.walltime) + if self.queue: ret += " -q {}".format(self.queue) From 81a45115b56f0a2857ec08325364c6897ffa5e49 Mon Sep 17 00:00:00 2001 From: jebuss Date: Tue, 16 Feb 2016 16:46:04 +0100 Subject: [PATCH 32/32] added walltime to list of slots --- gridmap/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gridmap/job.py b/gridmap/job.py index c12968c..ac51f94 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -124,7 +124,7 @@ class Job(object): 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', 'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell', - 'copy_env', 'engine') + 'copy_env', 'engine', 'walltime') def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE,