Source code for psij.job_attributes

import logging
import re
from datetime import timedelta
from typing import Optional, Dict

from typeguard import check_argument_types


logger = logging.getLogger(__name__)


_WALLTIME_FMT_ERROR = 'Unknown walltime format: %s. Accepted formats are hh:mm:ss, ' \
                      'hh:mm, mm, or n\\s*[h|m|s].'


[docs]class JobAttributes(object): """A class containing ancillary job information that describes how a job is to be run.""" def __init__(self, duration: timedelta = timedelta(minutes=10), queue_name: Optional[str] = None, account: Optional[str] = None, reservation_id: Optional[str] = None, custom_attributes: Optional[Dict[str, object]] = None, project_name: Optional[str] = None) -> None: """ :param duration: Specifies the duration (walltime) of the job. A job whose execution exceeds its walltime can be terminated forcefully. :param queue_name: If a backend supports multiple queues, this parameter can be used to instruct the backend to send this job to a particular queue. :param account: An account to use for billing purposes. Please note that the executor implementation (or batch scheduler) may use a different term for the option used for accounting/billing purposes, such as `project`. However, scheduler must map this attribute to the accounting/billing option in the underlying execution mechanism. :param reservation_id: Allows specifying an advanced reservation ID. Advanced reservations enable the pre-allocation of a set of resources/compute nodes for a certain duration such that jobs can be run immediately, without waiting in the queue for resources to become available. :param custom_attributes: Specifies a dictionary of custom attributes. Implementations of :class:`~psij.JobExecutor` define and are responsible for interpreting custom attributes. The typical usage scenario for custom attributes is to pass information to the executor or underlying job execution mechanism that cannot otherwise be passed using the classes and properties provided by PSI/J. A specific example is that of the subclasses of :class:`~psij.executors.batch.batch_scheduler_executor.BatchSchedulerExecutor`, which look for custom attributes prefixed with their name and a dot (e.g., `slurm.constraint`, `pbs.c`, `lsf.core_isolation`) and translate them into the corresponding batch scheduler directives (e.g., `#SLURM --constraint=...`, `#PBS -c ...`, `#BSUB -core_isolation ...`). :param project_name: Deprecated. Please use the `account` attribute. All constructor parameters are accessible as properties. """ assert check_argument_types() self.account = account self.duration = duration self.queue_name = queue_name self.project_name = project_name self.reservation_id = reservation_id self._custom_attributes = custom_attributes
[docs] def set_custom_attribute(self, name: str, value: object) -> None: """Sets a custom attribute.""" if self._custom_attributes is None: self._custom_attributes = {} self._custom_attributes[name] = value
[docs] def get_custom_attribute(self, name: str) -> Optional[object]: """Retrieves the value of a custom attribute.""" if self._custom_attributes is None: return None if name not in self._custom_attributes: return None return self._custom_attributes[name]
@property def custom_attributes(self) -> Optional[Dict[str, object]]: """Returns a dictionary with the custom attributes.""" return self._custom_attributes @custom_attributes.setter def custom_attributes(self, attrs: Optional[Dict[str, object]]) -> None: """ Sets all custom attributes from the given dictionary. Existing custom attributes defined on this instance of `JobAttributes` are removed. Parameters ---------- attrs A dictionary with the custom attributes to set. """ self._custom_attributes = attrs def __repr__(self) -> str: """Returns a string representation of this object.""" return 'JobAttributes(duration={}, queue_name={}, account={}, reservation_id={}, ' \ 'custom_attributes={})'.format(self.duration, self.queue_name, self.account, self.reservation_id, self._custom_attributes) def __eq__(self, o: object) -> bool: """ Tests if this JobAttributes object is equal to another object. The objects are equal if all their properties are equal. """ if not isinstance(o, JobAttributes): return False for prop_name in ['duration', 'queue_name', 'account', 'reservation_id', 'custom_attributes']: if getattr(self, prop_name) != getattr(o, prop_name): return False return True
[docs] @staticmethod def parse_walltime(walltime: str) -> timedelta: r""" Parses a walltime string into a :class:`~datetime.timedelta`. The accepted walltime strings formats are: * hh:mm:ss * hh:mm * mm * n\s*[y|M|d|h|m\s] Parameters ---------- walltime A string in one of the above formats representing a time duration Returns ------- A :class:`~datetime.timedelta` representing the same time duration as the ``walltime`` parameter. """ if ':' in walltime: parts = walltime.split(':') seconds = 0 if len(parts) == 3: seconds = int(parts[2]) if len(parts) <= 3: return timedelta(hours=int(parts[0]), minutes=int(parts[1]), seconds=seconds) else: raise ValueError(_WALLTIME_FMT_ERROR % walltime) if walltime.isdigit(): return timedelta(minutes=int(walltime)) m = re.search(r'(\d+)\s*([hms])', walltime) if m: digits = m.group(1) unit = m.group(2) val = int(digits) if unit == 'h': return timedelta(hours=val) elif unit == 'm': return timedelta(minutes=val) elif unit == 's': return timedelta(seconds=val) raise ValueError(_WALLTIME_FMT_ERROR % walltime)
@property def project_name(self) -> Optional[str]: """Deprecated. Please use the `account` attribute.""" return self.account @project_name.setter def project_name(self, project_name: Optional[str]) -> None: if project_name is not None: logger.warning('The "project_name" attribute is deprecated. Please use the "account" ' 'attribute instead.') self.account = project_name