import logging
import threading
from abc import ABC, abstractmethod
from datetime import timedelta, datetime
from typing import Optional, Sequence, Union, Callable, Set
from uuid import uuid4
import psij
from psij.exceptions import SubmitException
from psij.job_spec import JobSpec
from psij.job_state import JobState, JobStateOrder
from psij.job_status import JobStatus
logger = logging.getLogger(__name__)
# timedelta.max can't be added to now since it overflows
# furthermore, timedate.max overflows the timeout to Condition()
LARGE_TIMEOUT = timedelta(days=3650)
def _generate_id() -> str:
return str(uuid4())
[docs]class Job(object):
"""
This class represents a PSI/J job.
It encapsulates all of the information needed to run a job as well as the job’s state.
"""
def __init__(self, spec: Optional[JobSpec] = None) -> None:
"""
When constructed, a job is in the :attr:`~psij.JobState.NEW` state.
:param spec: an optional :class:`~psij.JobSpec` that describes the details of the job.
"""
self.spec = spec
"""The job specification of this job."""
self._id = _generate_id()
self._status = JobStatus(JobState.NEW)
# need indirect ref to avoid a circular reference
self.executor: Optional['psij.JobExecutor'] = None
# allow the native ID to be anything and do the string conversion in the getter; there's
# no point in storing integers as strings.
self._native_id: Optional[object] = None
self._cb: Optional[JobStatusCallback] = None
self._status_cv = threading.Condition()
if logger.isEnabledFor(logging.DEBUG):
logger.debug('New Job: {}'.format(self))
@property
def id(self) -> str:
"""
A read-only property containing the PSI/J job ID.
The ID is assigned automatically by the implementation when this `Job` object is
constructed. The ID is guaranteed to be unique on the machine on which the `Job` object
was instantiated. The ID does not have to match the ID of the underlying LRM job, but is
used to identify `Job` instances as seen by a client application.
"""
return self._id
@property
def native_id(self) -> Optional[str]:
"""A read-only property containing the native ID of the job.
The native ID is the ID assigned to the job by the underlying implementation. The native ID
may not be available until after the job is submitted to a :class:`~psij.JobExecutor`, in
which case the value of this property is ``None``.
"""
if self._native_id is None:
return None
else:
return str(self._native_id)
@property
def status(self) -> JobStatus:
"""
Contains the current status of the job.
It is guaranteed that the status returned by this method is monotonic in time with respect
to the partial ordering of :class:`~psij.JobStatus` types. That is, if
`job_status_1.state` and `job_status_2.state` are comparable and
`job_status_1.state < job_status_2.state`, then it is impossible for `job_status_2` to be
returned by a call placed prior to a call that returns `job_status_1` if both calls are
placed from the same thread or if a proper memory barrier is placed between the calls.
Furthermore the job is guaranteed to go through all intermediate states in the
`state model <https://exaworks.org/job-api-spec/specification#state-model>`_ before
reaching a particular state.
:return: the current state of this job
"""
return self._status
@status.setter
def status(self, status: JobStatus) -> None:
with self._status_cv:
crt = self._status.state
nxt = status.state
if crt == nxt or crt.is_greater_than(nxt):
return
prev = JobStateOrder.prev(nxt)
if prev is not None and prev != crt:
self.status = JobStatus(prev)
logger.debug('Job status change %s: %s -> %s', self, self._status.state, status.state)
with self._status_cv:
self._status = status
self._status_cv.notify_all()
if self._cb:
try:
self._cb.job_status_changed(self, status)
except Exception as ex:
logger.warning('Job status callback for %s threw an exception: %s', self.id, ex)
if self.executor:
self.executor._notify_callback(self, status)
[docs] def set_job_status_callback(self,
cb: Union['JobStatusCallback',
Callable[['Job', 'psij.JobStatus'], None]]) -> None:
"""
Registers a status callback with this job.
The callback can either be a subclass of :class:`~psij.job.JobStatusCallback` or a
procedure accepting two arguments: a :class:`~psij.Job` and a :class:`~psij.JobStatus`.
The callback is invoked whenever a status change occurs for this job, independent of
any callback registered on the job's :class:`~psij.JobExecutor`. The callback can be
removed by setting this property to ``None``.
:param cb: An instance of :class:`~psij.job.JobStatusCallback` or a callable with two
parameters, ``job`` of type :class:`~psij.Job`, ``job_status`` of type
:class:`~psij.JobStatus`, and returning nothing.
"""
if isinstance(cb, JobStatusCallback):
self._cb = cb
else:
self._cb = FunctionJobStatusCallback(cb)
[docs] def cancel(self) -> None:
"""
Cancels this job.
The job is canceled by calling :func:`~psij.JobExecutor.cancel` on the job
executor that was used to submit this job.
:raises SubmitException: if the job has not yet been submitted.
"""
if self.status.final:
return
if not self.executor:
raise SubmitException('Cannot cancel job: not bound to an executor.')
else:
self.executor.cancel(self)
def _all_greater(self, states: Optional[Union[JobState, Sequence[JobState]]]) \
-> Optional[Set[JobState]]:
if states is None:
return None
if isinstance(states, JobState):
states = [states]
ts = set()
for state1 in states:
ts.add(state1)
for state2 in JobState:
if state2.is_greater_than(state1):
ts.add(state2)
return ts
[docs] def wait(self, timeout: Optional[timedelta] = None,
target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \
-> Optional[JobStatus]:
"""
Waits for the job to reach certain states.
This method returns either when the job reaches one of the `target_states`, a state
following one of the `target_states`, a final state, or when an amount of time indicated by
the `timeout` parameter, if specified, passes. Returns the :class:`~psij.JobStatus` object
that has one of the desired states or `None` if the timeout is reached. For example,
`wait(target_states = [JobState.QUEUED]` waits until the job is in any of the `QUEUED`,
`ACTIVE`, `COMPLETED`, `FAILED`, or `CANCELED` states.
:param timeout: An optional timeout after which this method returns even if none of the
`target_states` was reached. If not specified, wait indefinitely.
:param target_states: A set of states to wait for. If not specified, wait for any
of the :attr:`~psij.JobState.final` states.
:return: returns the :class:`~psij.JobStatus` object that caused the caused this call to
complete or `None` if the timeout is specified and reached.
"""
start = datetime.now()
if not timeout:
timeout = LARGE_TIMEOUT
end = start + timeout
ts = self._all_greater(target_states)
while True:
with self._status_cv:
status = self._status
state = status.state
if ts:
if state.final or state in ts:
return status
else:
pass # wait
else:
if state.final:
return status
else:
pass # wait
left = end - datetime.now()
left_seconds = left.total_seconds()
if left_seconds <= 0:
return None
self._status_cv.wait(left_seconds)
def __hash__(self) -> int:
"""Returns a hash for this job."""
return hash(self._id)
def __str__(self) -> str:
"""Returns a string representation of this job."""
return 'Job[id={}, native_id={}, executor={}, status={}]'.format(self._id, self._native_id,
self.executor, self.status)
[docs]class JobStatusCallback(ABC):
"""An interface used to listen to job status change events."""
@abstractmethod
def job_status_changed(self, job: Job, job_status: JobStatus) -> None:
"""
This method is invoked when a status change occurs on a job.
Client code interested in receiving status notifications must implement this method. It is
entirely possible that :attr:`psij.Job.status` when referenced from the body of this
method would return something different from the `status` passed to this callback. This is
because the status of the job can be updated during the execution of the body of this
method and, in particular, before the potential dereference to :attr:`psij.Job.status` is
made.
Client code implementing this method must return quickly and cannot be used for lengthy
processing. Furthermore, client code implementing this method should not throw exceptions.
:param job: The job whose status has changed.
:param job_status: The new status of the job.
"""
pass
[docs]class FunctionJobStatusCallback(JobStatusCallback):
"""A JobStatusCallback that wraps a function."""
def __init__(self, fn: Callable[[Job, 'psij.JobStatus'], None]):
"""Initializes a `_FunctionJobStatusCallback`."""
self.fn = fn
def job_status_changed(self, job: Job, job_status: 'psij.JobStatus') -> None:
"""See :func:`~psij.JobStatusCallback.job_status_changed`."""
self.fn(job, job_status)