"""
OAR clusters (:mod:`fluiddyn.clusters.oar`)
===========================================
Provides:
.. autoclass:: ClusterOAR
:members:
.. autofunction:: get_job_id
.. autofunction:: get_job_info
.. autofunction:: count_number_jobs
"""
import datetime
import os
import stat
import time
from subprocess import getoutput
from sys import version_info as version
from ..io.query import call_bash, run_asking_agreement
from . import Cluster, subprocess
[docs]
def get_job_id(name_job):
"""Get the job id from the name job
Typical output of ``oarstat -u``
::
Job id S User Duration System message
--------- - -------- ---------- ------------------------------------------------
644829 F augier3p 0:05:08 R=4,W=0:10:0,J=B,N=fluidsim-restart_nx160_Rb20_N20,T=idempotent (Karma=0.038,quota_ok)
"""
output = getoutput("oarstat -u")
if name_job not in output:
return
for line in output.split("\n"):
if name_job in line:
break
return line.split(maxsplit=1)[0].strip()
[docs]
def get_job_info(name_or_id):
"""Get some information about a job"""
output = getoutput("oarstat -u")
if not isinstance(name_or_id, str):
name_or_id = str(name_or_id)
if name_or_id not in output:
return
for line in output.split("\n"):
if name_or_id in line:
break
keys = ["id", "status", "user", "duration", "message", "karma"]
info = {k: w for k, w in zip(keys, line.split())}
possible_status = {"F": "failing", "W": "waiting", "R": "running"}
info["status"] = possible_status.get(info["status"], "?")
keys_message = {"R": "nb_procs", "W": "walltime", "N": "name", "T": "type"}
for param in info["message"].split(","):
key, value = param.split("=")
if key in keys_message:
info[keys_message[key]] = value
return info
[docs]
def count_number_jobs(name_job):
"""Get the number of jobs returned by ``oarstat -u``"""
output = getoutput("oarstat -u")
return sum(True for line in output.split("\n") if name_job in line)
[docs]
class ClusterOAR(Cluster):
name_cluster = ""
nb_cores_per_node = 12
has_to_add_name_cluster = False
_doc_commands = """
Useful commands
---------------
oarsub -S script.sh
oarstat -u
oardel $JOB_ID
oarsub -C $JOB_ID"""
[docs]
def check_oar(self):
"""check if this script is run on a frontal with oar installed"""
try:
subprocess.check_call(["oarsub", "--version"], stdout=subprocess.PIPE)
except OSError as error:
raise OSError("oar does not seem to be installed.") from error
def submit_script(
self,
path,
name_run="fluiddyn",
nb_nodes=1,
nb_cores_per_node=None,
walltime="24:00:00",
project=None,
nb_mpi_processes=None,
omp_num_threads=None,
idempotent=False,
anterior=None,
delay_signal_walltime=300,
network_address=None,
ask=True,
submit=True,
run_with_exec=True,
resource_conditions=None,
use_oar_envsh=None,
):
path = os.path.expanduser(path)
if not os.path.exists(path.split(" ")[0]):
raise ValueError("The script does not exists! path:\n" + path)
if not path.startswith("python "):
command = "python " + path
self.submit_command(
command,
name_run=name_run,
nb_nodes=nb_nodes,
nb_cores_per_node=nb_cores_per_node,
walltime=walltime,
project=project,
nb_mpi_processes=nb_mpi_processes,
omp_num_threads=omp_num_threads,
idempotent=idempotent,
anterior=anterior,
delay_signal_walltime=delay_signal_walltime,
network_address=network_address,
ask=ask,
submit=submit,
run_with_exec=run_with_exec,
resource_conditions=resource_conditions,
use_oar_envsh=use_oar_envsh,
)
def submit_command(
self,
command,
name_run="fluiddyn",
nb_nodes=1,
nb_cores_per_node=None,
walltime="24:00:00",
project=None,
nb_mpi_processes=None,
omp_num_threads=None,
idempotent=False,
anterior=None,
delay_signal_walltime=300,
network_address=None,
ask=True,
submit=True,
run_with_exec=True,
resource_conditions=None,
use_oar_envsh=None,
):
self.check_oar()
nb_cores_per_node, nb_mpi_processes = self._parse_cores_procs(
nb_nodes, nb_cores_per_node, nb_mpi_processes
)
str_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path_launching_script = "oar_launcher_" + str_time
if os.path.exists(path_launching_script):
i = 1
while os.path.exists(path_launching_script + "_" + str(i)):
i += 1
path_launching_script += "_" + str(i)
txt = self._create_txt_launching_script(
command,
name_run,
nb_nodes,
nb_cores_per_node,
walltime,
nb_mpi_processes=nb_mpi_processes,
omp_num_threads=omp_num_threads,
network_address=network_address,
run_with_exec=run_with_exec,
resource_conditions=resource_conditions,
use_oar_envsh=use_oar_envsh,
)
with open(path_launching_script, "w", encoding="utf-8") as file:
file.write(txt)
os.chmod(
path_launching_script, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
)
launching_command = "oarsub"
if project is not None:
launching_command += " --project " + project
if delay_signal_walltime is not None:
launching_command += " --checkpoint " + str(delay_signal_walltime)
if idempotent:
launching_command += " -t idempotent"
if anterior is not None:
launching_command += f" --anterior {anterior}"
launching_command += " -S ./" + path_launching_script
print(f'A launcher for the command "{command}" has been created.')
if submit:
if ask:
run_asking_agreement(launching_command)
else:
print(
"The script is submitted with the command:\n",
launching_command,
)
call_bash(launching_command)
def _create_txt_launching_script(
self,
command,
name_run,
nb_nodes,
nb_cores_per_node,
walltime,
nb_mpi_processes=None,
omp_num_threads=None,
network_address=None,
run_with_exec=True,
resource_conditions=None,
use_oar_envsh=None,
):
txt = f"#!/bin/bash\n\n#OAR -n {name_run}\n"
txt += "#OAR -l "
if self.has_to_add_name_cluster and network_address is None:
conditions = f"cluster='{self.name_cluster}'"
elif network_address is not None:
conditions = f"network_address='{network_address}"
else:
conditions = ""
if resource_conditions is not None:
if conditions:
conditions += " and "
conditions += resource_conditions
if conditions:
txt += "{" + conditions + "}"
txt += "/nodes={}/core={},walltime={}\n\n".format(
nb_nodes, nb_cores_per_node, walltime
)
txt += 'echo "hostname: "$HOSTNAME\n\n'
txt += "\n".join(self.get_commands_setting_env()) + "\n\n"
if omp_num_threads is not None:
txt += f"export OMP_NUM_THREADS={omp_num_threads}\n\n"
if use_oar_envsh is None:
use_oar_envsh = nb_mpi_processes is not None and nb_nodes > 1
if use_oar_envsh:
txt += (
"# Shell with environment variables forwarded\n"
"export OMPI_MCA_plm_rsh_agent=oar-envsh\n\n"
)
if run_with_exec:
txt += "exec "
if nb_mpi_processes is not None:
txt += f"mpirun -np {nb_mpi_processes} "
if nb_nodes > 1:
txt += "-machinefile ${OAR_NODEFILE} "
txt += command + "\n"
return txt
[docs]
def stall(self, name_job, limit_number_jobs=1, time_check=30):
"""Wait until job(s) completion.
Parameters
----------
name_run: str
Description of the job. Should be the same as in submit_script.
limit_number_jobs: int (default: 1)
Stall when the number of job is larger or equal to `limit_number_jobs`.
time_check: int
Time to sleep in seconds.
"""
tstart = time.time()
while count_number_jobs(name_job) >= limit_number_jobs:
time.sleep(time_check)
print(f"job {name_job} finished in {time.time() - tstart} s")