Source code for jarvis.tasks.queue_jobs

"""Modules for job submission."""

import os
import subprocess
from collections import OrderedDict


[docs]class Queue(object): """Construct HPC Job class.""" def __init__( self, q_type="head_node", q_parameters={}, job_sub_cmd=None, job_check_cmd="", job_id="", ): """Initialize the class.""" self.q_type = q_type self.q_parameters = q_parameters self.job_sub_cmd = job_sub_cmd self.job_check_cmd = job_check_cmd self.job_id = job_id
[docs] def head_node(self, submit_cmd=None): """Select if run on the head node, not recommended.""" os.system(submit_cmd)
[docs] def to_dict(self): """Convert class to a dictionary.""" d = OrderedDict() d["q_type"] = self.q_type d["q_parameters"] = self.q_parameters d["job_sub_cmd"] = self.job_sub_cmd d["job_check_cmd"] = self.job_check_cmd d["job_id"] = self.job_id return d
[docs] @classmethod def from_dict(self, d={}): """Load from a dictionary.""" return Queue( q_type=d["q_type"], q_parameters=d["q_parameters"], job_sub_cmd=d["job_sub_cmd"], job_check_cmd=d["job_check_cmd"], job_id=d["job_id"], )
[docs] @classmethod def pbs( self, filename="submit_job", shell="#!/bin/bash", nnodes=1, cores=16, walltime=None, queue=None, account=None, group_name=None, jobname="myJob", jobout="job.out", joberr="job.err", memory=None, email=None, pre_job_lines=None, directory=None, env=None, job_line="echo I am here", post_job_lines=None, submit_cmd=None, ): """Select if run using PBS script.""" f = open(filename, "w") f.write("%s\n" % shell) f.write("#PBS -l nodes=%d:ppn=%d\n" % (nnodes, cores)) f.write("#PBS -N %s\n" % jobname) f.write("#PBS -o %s\n" % jobout) f.write("#PBS -e %s\n" % joberr) if walltime is not None: if isinstance(walltime, str): f.write("#PBS -l walltime=%s\n" % walltime) else: ValueError("Provide walltime in a string format", walltime) if queue is not None: f.write("#PBS -q %s\n" % queue) if account is not None: f.write("#PBS -A %s\n" % account) if group_name is not None: f.write("#PBS -G %s\n" % group_name) if env is not None: f.write("#PBS -V %s\n" % env) if email is not None: f.write("#PBS -M %s\n" % email) if memory is not None: f.write("#PBS -l %s\n" % memory) if pre_job_lines is not None: f.write("%s\n" % pre_job_lines) if directory is not None: f.write("cd %s\n" % directory) if job_line is not None: f.write("%s\n" % job_line) if post_job_lines is not None: f.write("%s\n" % post_job_lines) f.close() if submit_cmd is not None: with open("jobid", "w") as f: p = subprocess.Popen( submit_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = p.communicate() print("stdout,stderr", stdout, stderr) # job_id = str(stdout.split('Your job')[1].split(' ')[1]) f.write(str(stdout)) print("x")
[docs] @classmethod def slurm( self, filename="submit_job", shell="#!/bin/bash", nnodes=1, cores=16, walltime=None, queue=None, account=None, group_name=None, jobname="myJob", jobout="job.out", joberr="job.err", memory=None, email=None, pre_job_lines=None, directory=None, env=None, job_line="echo I am here", post_job_lines=None, submit_cmd=None, ): """Select if run using SLURM script.""" f = open(filename, "w") f.write("%s\n" % shell) if nnodes is not None: f.write("#SBATCH --nodes=%d\n" % (nnodes)) if cores is not None: f.write("#SBATCH --ntasks-per-node=%d\n" % (cores)) if walltime is not None: if isinstance(walltime, str): f.write("#SBATCH --time=%s\n" % walltime) else: ValueError("Provide walltime in a string format", walltime) if queue is not None: f.write("#SBATCH --partition=%s\n" % (queue)) if account is not None: f.write("#SBATCH --account=%s\n" % (account)) if memory is not None: f.write("#SBATCH --mem=%s\n" % (memory)) f.write("#SBATCH --error=%s\n" % joberr) f.write("#SBATCH --output=%s\n" % jobout) if pre_job_lines is not None: f.write("%s\n" % pre_job_lines) if directory is not None: f.write("cd %s\n" % directory) if job_line is not None: f.write("%s\n" % job_line) if post_job_lines is not None: f.write("%s\n" % post_job_lines) f.close() if submit_cmd is not None: with open("jobid", "w") as f: p = subprocess.Popen( submit_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = p.communicate() print("stdout,stderr", stdout, stderr) # job_id = str(stdout.split('Your job')[1].split(' ')[1]) f.write(str(stdout))
""" if __name__ == "__main__": Queue().pbs() Queue().slurm(filename="ll") Queue().head_node("echo ABC") """