"""
PBS clusters (:mod:`fluiddyn.clusters.pbs`)
===========================================
Provides:
.. autoclass:: ClusterPBS
:members:
"""
import os
from . import subprocess
from .local import ClusterLocal
[docs]
class ClusterPBS(ClusterLocal):
"""Base class for clusters with PBS job scheduler."""
_doc_commands = """
Useful commands
---------------
qsub
qstat -u $USER
qdel
qhold
qrls"""
name_cluster = ""
nb_cores_per_node = 32
default_project = None
cmd_run = "mpirun"
cmd_launch = "qsub"
max_walltime = "23:59:59"
def __init__(self):
self.check_pbs()
self.commands_setting_env = ["cd $PBS_O_WORKDIR"]
self.commands_unsetting_env = []
[docs]
def check_pbs(self):
"""Check if this script is run on a frontal with pbs installed."""
try:
subprocess.check_call(["qsub", "--version"], stdout=subprocess.PIPE)
except OSError:
raise ValueError(
"This script should be run on a cluster with PBS installed."
)
[docs]
def check_name_cluster(self, env="HOSTNAME"):
"""Check if self.name_cluster matches the environment variable."""
if self.name_cluster not in os.getenv(env):
raise ValueError(
"Cluster name mismatch detected; expected " + self.name_cluster
)
[docs]
def submit_command(
self,
command,
name_run="fluiddyn",
nb_nodes=1,
nb_cores_per_node=None,
walltime=None,
project=None,
queue=None,
nb_mpi_processes=None,
omp_num_threads=1,
nb_runs=1,
path_launching_script=None,
path_resume=None,
retain_script=True,
ask=True,
bash=True,
email=None,
interactive=False,
**kwargs,
):
"""Submit a command.
Parameters
----------
command : string
Command which executes the run
name_run : string
Name of the run to be displayed in PBS queue
nb_nodes : integer
Number of nodes
nb_cores_per_node : integer
Defaults to a maximum is fixed for a cluster, as set by self.nb_cores_per_node.
Set as 1 for a serial job. Set as 0 to spread jobs across nodes
(starts job faster, maybe slower).
walltime : string
Minimum walltime for the job
project : string
Sets the allocation to run the job under
queue: string
Sets the cluster queue to run the job on
nb_mpi_processes : integer
Number of MPI processes. Defaults to None (no MPI).
If ``"auto"``, computed as `nb_cores_per_node * nb_nodes`.
omp_num_threads : integer
Number of OpenMP threads
nb_runs : integer
Number of times to submit jobs (launch once using `command` and
resume thereafter with `path_resume` script / command).
path_launching_script: string
Path of the PBS jobscript
path_resume : string
Path of the script to resume a job, which takes one
argument - the `path_run` parsed from the output.
retain_script : boolean
Retail or delete script after launching job
ask : boolean
Ask for user input to submit the jobscript or not
bash : boolean
Submit jobscript via `call_bash` function
email : string
In case of failure notify to the specified email address
interactive : boolean
Use `cmd_run_interactive` instead of `cmd_run` inside the jobscript
"""
nb_cores_per_node, nb_mpi_processes = self._parse_cores_procs(
nb_nodes, nb_cores_per_node, nb_mpi_processes
)
path_launching_script = self._make_path_launching_script()
self._check_walltime(walltime)
is_resume_script = bool("resumer" in path_launching_script)
if project is None:
project = self.default_project
launching_command = self.cmd_launch
if is_resume_script:
dependencies = input("Enter jobid dependencies :").split()
launching_command += " -W depend=afternotok:" + ":".join(dependencies)
else:
dependencies = None
create_txt_kwargs = locals()
del create_txt_kwargs["self"]
txt = self._create_txt_launching_script(**create_txt_kwargs)
self._write_txt_launching_script(txt, path_launching_script)
launching_command += " ./" + path_launching_script
self._launch(launching_command, command, bash, ask)
if not retain_script:
os.remove(path_launching_script)
nb_times_resume = int(nb_runs) - 1
for n in range(0, nb_times_resume):
nb_runs = 1
path = path_resume
path_launching_script = self._make_path_launching_script("resumer")
submit_script_kwargs = locals()
del submit_script_kwargs["self"]
del submit_script_kwargs["command"]
self.submit_script(**submit_script_kwargs)
def _create_txt_launching_script(self, **kwargs):
"""
Examples
--------
#!/bin/bash -l
# The -l above is required to get the full environment with modules
# The name of the script is myjob
#PBS -N myjob
# Set the allocation to be charged for this job
# not required if you have set a default allocation
#PBS -A 201X-X-XX
# Set the queue to launch this job on
#PBS -q queuename
# Only 1 hour wall-clock time will be given to this job
#PBS -l walltime=1:00:00
# Number of nodes and processes per node
#PBS -l nodes=1:ppn=32
#PBS -e $PBS.myjob.$PBS_JOBID.stderr
#PBS -o $PBS.myjob.$PBS_JOBID.stdout
# Run the executable named myexe
mpirun -n 128 ./myexe
"""
path_launching_script = kwargs["path_launching_script"]
command = kwargs["command"]
name_run = kwargs["name_run"]
project = kwargs["project"]
queue = kwargs["queue"]
nb_nodes = kwargs["nb_nodes"]
nb_cores_per_node = kwargs["nb_cores_per_node"]
walltime = kwargs["walltime"]
nb_mpi_processes = kwargs["nb_mpi_processes"]
omp_num_threads = kwargs["omp_num_threads"]
dependencies = kwargs["dependencies"]
email = kwargs["email"]
is_resume_script = kwargs["is_resume_script"]
logfile = f"PBS.{name_run}"
logfile_stdout = logfile + ".${PBS_JOBID}.stdout"
txt = "#!/bin/bash -l\n\n"
txt += f"#PBS -N {name_run}\n\n"
if project is not None:
txt += f"#PBS -A {project}\n\n"
if queue is not None:
txt += f"#PBS -q {queue}\n\n"
if walltime is not None:
txt += f"#PBS -l walltime={walltime}\n"
txt += f"#PBS -l nodes={nb_nodes}:ppn={nb_cores_per_node}\n"
if email is not None:
txt += "#PBS -m a\n"
txt += f"#PBS -M {email}\n"
txt += f"#PBS -e {logfile}.%J.stderr\n"
txt += f"#PBS -o {logfile}.%J.stdout\n\n"
txt += 'echo "hostname: "$HOSTNAME\n\n'
nb_cores = nb_nodes * nb_cores_per_node
txt += self._log_job(
nb_cores,
path_launching_script,
logfile_stdout,
command,
"PBS_JOB.md",
)
txt += "\n".join(self.get_commands_setting_env()) + "\n\n"
if omp_num_threads is not None:
txt += f"export OMP_NUM_THREADS={omp_num_threads}\n\n"
if is_resume_script:
jobid = dependencies[0]
main_logfile = f"PBS.{name_run}.{jobid}.stdout"
txt += "PATH_RUN=$(sed -n '/path_run/{n;p;q}' " + "{}\n".format(
main_logfile
)
cmd = self.cmd_run
if nb_mpi_processes is not None and nb_mpi_processes > 1:
txt += f"{cmd} -n {nb_mpi_processes} "
if is_resume_script:
txt += f"{command} $PATH_RUN"
else:
txt += command
txt += "\n" + "\n".join(self.commands_unsetting_env)
return txt