diff --git a/gridmap/__init__.py b/gridmap/__init__.py index e61dbd9..5973d13 100644 --- a/gridmap/__init__.py +++ b/gridmap/__init__.py @@ -29,6 +29,8 @@ :var USE_MEM_FREE: Does your cluster support specifying how much memory a job will use via mem_free? (Default: ``False``) +:var USE_NUM_PROC: Does your cluster support specifying how many procs a job + will use via num_proc? (Default: ``False``) :var DEFAULT_QUEUE: The default job scheduling queue to use. (Default: ``all.q``) :var DEFAULT_TEMP_DIR: The default temporary directory for job output. @@ -73,7 +75,7 @@ HEARTBEAT_FREQUENCY, IDLE_THRESHOLD, MAX_IDLE_HEARTBEATS, MAX_TIME_BETWEEN_HEARTBEATS, NUM_RESUBMITS, SEND_ERROR_MAIL, SMTP_SERVER, - USE_MEM_FREE, DEFAULT_TEMP_DIR) + USE_MEM_FREE, USE_NUM_PROC, DEFAULT_TEMP_DIR) from gridmap.job import (Job, JobException, process_jobs, grid_map, DRMAANotPresentException) from gridmap.version import __version__, VERSION @@ -85,4 +87,4 @@ 'ERROR_MAIL_SENDER', 'HEARTBEAT_FREQUENCY', 'IDLE_THRESHOLD', 'MAX_IDLE_HEARTBEATS', 'MAX_TIME_BETWEEN_HEARTBEATS', 'NUM_RESUBMITS', 'SEND_ERROR_MAIL', 'SMTP_SERVER', 'USE_MEM_FREE', - 'DEFAULT_TEMP_DIR'] + 'USE_NUM_PROC', 'DEFAULT_TEMP_DIR'] diff --git a/gridmap/conf.py b/gridmap/conf.py index bfc7d5f..2beb9e7 100644 --- a/gridmap/conf.py +++ b/gridmap/conf.py @@ -29,6 +29,8 @@ :var USE_MEM_FREE: Does your cluster support specifying how much memory a job will use via mem_free? (Default: ``False``) +:var USE_NUM_PROC: Does your cluster support specifying how many procs a job + will use via num_proc? (Default: ``False``) :var DEFAULT_QUEUE: The default job scheduling queue to use. (Default: ``all.q``) :var CREATE_PLOTS: Should we plot cpu and mem usage and send via email? @@ -125,6 +127,9 @@ # Is mem_free configured properly on the cluster? USE_MEM_FREE = 'TRUE' == os.getenv('USE_MEM_FREE', 'False').upper() +# Is num_proc configured properly on the cluster? +USE_NUM_PROC = 'TRUE' == os.getenv('USE_NUM_PROC', 'False').upper() + # Which queue should we use by default DEFAULT_QUEUE = os.getenv('DEFAULT_QUEUE', 'all.q') diff --git a/gridmap/job.py b/gridmap/job.py index 46e8abd..3df17b2 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -63,7 +63,7 @@ IDLE_THRESHOLD, MAX_IDLE_HEARTBEATS, MAX_TIME_BETWEEN_HEARTBEATS, NUM_RESUBMITS, SEND_ERROR_MAIL, SMTP_SERVER, USE_MEM_FREE, - DEFAULT_TEMP_DIR) + USE_NUM_PROC, DEFAULT_TEMP_DIR) from gridmap.data import zdumps, zloads from gridmap.runner import _heart_beat @@ -111,14 +111,14 @@ class Job(object): """ __slots__ = ('_f', 'args', 'id', 'kwlist', 'cleanup', 'ret', 'traceback', - 'num_slots', 'mem_free', 'white_list', 'path', 'uniq_id', + 'num_slots', 'num_proc', 'mem_free', 'white_list', 'path', 'uniq_id', 'name', 'queue', 'environment', 'working_dir', 'cause_of_death', 'num_resubmits', 'home_address', 'log_stderr_fn', 'log_stdout_fn', 'timestamp', 'host_name', 'heart_beat', 'track_mem', 'track_cpu', 'interpreting_shell', 'copy_env') - def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", + def __init__(self, f, args, kwlist=None, cleanup=True, num_proc=1, mem_free="1G", name='gridmap_job', num_slots=1, queue=DEFAULT_QUEUE, interpreting_shell=None, copy_env=True, add_env=None): """ @@ -132,6 +132,8 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", :type kwlist: dict :param cleanup: flag that determines the cleanup of input and log file :type cleanup: boolean + :param num_proc: Estimate of how many procs this job will need (for scheduling) + :type num_proc: int :param mem_free: Estimate of how much memory this job will need (for scheduling) :type mem_free: str @@ -170,6 +172,7 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G", self.cleanup = cleanup self.ret = _JOB_NOT_FINISHED self.num_slots = num_slots + self.num_proc = num_proc self.mem_free = mem_free self.white_list = [] self.name = name.replace(' ', '_') @@ -260,6 +263,8 @@ def native_specification(self): ret += " -S {}".format(self.interpreting_shell) ret += " -b yes" + if self.num_proc and USE_NUM_PROC: + ret += " -l num_proc={}".format(self.num_proc) if self.mem_free and USE_MEM_FREE: ret += " -l mem_free={}".format(self.mem_free) if self.num_slots and self.num_slots > 1: @@ -925,7 +930,7 @@ def _resubmit(session_id, job, temp_dir): ##################### # MapReduce Interface ##################### -def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', +def grid_map(f, args_list, cleanup=True, num_proc=1, mem_free="1G", name='gridmap_job', num_slots=1, temp_dir=DEFAULT_TEMP_DIR, white_list=None, queue=DEFAULT_QUEUE, quiet=True, local=False, max_processes=1, interpreting_shell=None, copy_env=True, add_env=None, @@ -946,6 +951,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', each job when we're done? (They are left in place if there's an error.) :type cleanup: bool + :param num_proc: Estimate of how many procs this job will need (for scheduling) + :type num_proc: int :param mem_free: Estimate of how much memory each job will need (for scheduling). (Not currently used, because our cluster does not have that setting enabled.) @@ -990,7 +997,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', # construct jobs jobs = [Job(f, [args] if not isinstance(args, list) else args, - cleanup=cleanup, mem_free=mem_free, + cleanup=cleanup, num_proc=num_proc, mem_free=mem_free, name='{}{}'.format(name, job_num), num_slots=num_slots, queue=queue, interpreting_shell=interpreting_shell, copy_env=copy_env, add_env=add_env)