"""This module contains the RP :class:`~psij.JobExecutor`."""
import time
import logging
from typing import Any, Optional, List, Tuple, Dict
from psij import InvalidJobException, SubmitException
from psij import Job, JobExecutorConfig, JobState, JobStatus, JobSpec
from psij import JobExecutor
logger = logging.getLogger(__name__)
[docs]class RPJobExecutor(JobExecutor):
"""
A job executor that runs jobs via the `RADICAL Pilot system
<https://radical-cybertools.github.io/radical-pilot/>`_.
"""
import radical.pilot as _rp
_state_map = {_rp.NEW: JobState.NEW,
_rp.TMGR_STAGING_INPUT_PENDING: JobState.QUEUED,
_rp.AGENT_EXECUTING: JobState.ACTIVE,
_rp.DONE: JobState.COMPLETED,
_rp.FAILED: JobState.FAILED,
_rp.CANCELED: JobState.CANCELED}
def __init__(self, url: Optional[str] = None,
config: Optional[JobExecutorConfig] = None) -> None:
"""
:param url: Not used, but required by the spec for automatic initialization.
:param config: The `RPJobExecutor` does not have any configuration options.
"""
# TODO: url is not passed
# if not url.startswith('rp://'):
# raise ValueError('expected `rp://` url')
super().__init__(url=url, config=config)
self._session = self._rp.Session()
self._pmgr = self._rp.PilotManager(session=self._session)
self._tmgr = self._rp.TaskManager(session=self._session)
self._pmgr.register_callback(self._pilot_state_cb)
self._tmgr.register_callback(self._task_state_cb)
pd = self._rp.PilotDescription({'resource': 'local.localhost',
'cores': 32,
'runtime': 15})
self._pilot = self._pmgr.submit_pilots(pd)
self._tmgr.add_pilots(self._pilot)
self._tasks: Dict[str, Tuple[Any, Any]] = dict()
def _pilot_state_cb(self, pilot: _rp.Pilot, rp_state: str) -> None:
logger.info('pilot %s: %s', pilot.uid, pilot.state)
def _task_state_cb(self, task: _rp.Task, rp_state: str) -> None:
jpsi_uid = task.name
jpsi_job = self._tasks[jpsi_uid][0]
ec = None
if task.state in self._rp.FINAL:
ec = task.exit_code
old_state = jpsi_job.status.state
new_state = self._state_map.get(task.state)
logger.debug('%s --> %s - %s', jpsi_uid, task.state, new_state)
if new_state is None:
# not an interesting state transition, ignore
return
if old_state == new_state:
return
metadata = {'nativeId': task.uid}
if ec:
metadata['exit_code'] = ec
if task.state in self._rp.FINAL:
metadata['final'] = True
status = JobStatus(new_state, time=time.time(),
metadata=metadata)
self._set_job_status(jpsi_job, status)
def submit(self, job: Job) -> None:
"""
Submits the specified :class:`~psij.Job` to the pilot.
Successful return of this method indicates that the job has been
submitted to RP and all changes in the job status, including failures,
are reported using notifications. If the job specification is invalid,
an :class:`~psij.InvalidJobException` is thrown. If the actual
submission fails for reasons outside the validity of the job,
a :class:`~psij.SubmitException` is thrown.
:param job: The job to be submitted.
"""
self._check_job(job)
try:
td = self._job_2_descr(job)
task = self._tmgr.submit_tasks(td)
self._tasks[job.id] = (job, task)
except Exception as ex:
raise SubmitException('Failed to submit job') from ex
def _job_2_descr(self, job: Job) -> Dict[str, Any]:
# TODO: use resource spec
# TODO: use meta data for jpsi uid
spec: Optional[JobSpec] = job.spec
if not spec:
raise InvalidJobException('Missing specification')
from_dict: Dict[str, Any] = {'name': job.id,
'executable': spec.executable,
'arguments': spec.arguments or [],
'environment': spec.environment or {},
'stdout': spec.stdout_path or '',
'stderr': spec.stderr_path or '',
'sandbox': spec.directory or ''}
return self._rp.TaskDescription(from_dict=from_dict)
def cancel(self, job: Job) -> None:
"""
Cancels a job.
:param job: The job to cancel.
"""
with job._status_cv:
if job.status.state == JobState.NEW:
self._set_job_status(job, JobStatus(JobState.CANCELED))
return
if job.id not in self._tasks:
raise ValueError('job not known')
_, task = self._tasks[job.id]
self._tmgr.cancel_tasks(uids=task.uid)
def list(self) -> List[str]:
"""See :func:`~psij.job_executor.JobExecutor.list`.
Return a list of ids representing jobs that are running on the
underlying implementation - in this case RP task IDs.
:return: The list of known tasks.
"""
return [str(uid) for uid in self._tmgr.list_tasks()]
def attach(self, job: Job, native_id: str) -> None:
"""
Attaches a job to a process.
The job must be in the :attr:`~psij.job_state.JobState.NEW` state.
:param job: The job to attach.
:param native_id: The native ID of the process to attached to, as
obtained through :func:`~list` method.
"""
if job.status.state != JobState.NEW:
raise InvalidJobException('Job must be in the NEW state')
job.executor = self
task = self._tmgr.get_tasks(uids=[native_id])[0]
self._tasks[job.id] = (job, task)
state = self._state_map[task.state]
self._set_job_status(job, JobStatus(state, time=time.time()))