Source code for psij.job_launcher

"""This module contains the core classes of the launchers infrastructure."""

from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Set

from psij.descriptor import Descriptor, _VersionEntry
from psij._plugins import _register_plugin, _get_plugin_class, _print_plugin_status
from psij.job_executor_config import JobExecutorConfig
from psij.job import Job


[docs]class Launcher(ABC): """An abstract base class for all launchers.""" _launchers: Dict[str, List[_VersionEntry['Launcher']]] = {} DEFAULT_LAUNCHER_NAME = 'single' def __init__(self, config: Optional[JobExecutorConfig] = None) -> None: """ :param config: An optional configuration. If not specified, :attr:`~psij.JobExecutorConfig.DEFAULT` is used. """ if config is None: config = JobExecutorConfig.DEFAULT self.config = config
[docs] @abstractmethod def get_launch_command(self, job: Job) -> List[str]: """ Constructs a command to launch a job given a job specification. :param job: The job to launch. :return: A list of strings representing the launch command and all of its arguments. """ pass
[docs] @abstractmethod def is_launcher_failure(self, output: str) -> bool: """ Determines whether the launcher invocation output contains a launcher failure or not. Parameters ---------- output The output (combined stdout/stderr) from an invocation of the launcher command Returns ------- Returns `True` if the `output` parameter contains a string that represents a launncher failure. """ pass
[docs] @abstractmethod def get_launcher_failure_message(self, output: str) -> str: """ Extracts the launcher error message from the output of this launcher's invocation. It is understood that the value of the `output` parameter is such that :meth:`is_launcher_failure` returns `True` on it. Parameters ---------- output The output (combined stdout/stderr) from an invocation of the launcher command. Returns ------- A string representing the part of the launcher output that describes the launcher error. """ pass
[docs] @staticmethod def get_instance(name: str, version_constraint: Optional[str] = None, config: Optional[JobExecutorConfig] = None) -> 'Launcher': """ Returns an instance of a launcher optionally configured using a certain configuration. The returned instance may or may not be a singleton object. :param name: The name of the launcher to return an instance of. :param config: An optional configuration. :return: A launcher instance. """ selected = _get_plugin_class(name, version_constraint, 'launcher', Launcher._launchers) assert selected.ecls is not None assert issubclass(selected.ecls, Launcher) setattr(selected.ecls, '_NAME_', name) setattr(selected.ecls, '_VERSION_', selected.version) instance = selected.ecls(config=config) return instance
[docs] @staticmethod def register_launcher(desc: Descriptor, root: str) -> None: """ Registers a launcher class. The registered class can then be instantiated using :func:`~psij.Launcher.get_instance`. Parameters ---------- desc A :class:`~psij.descriptor.Descriptor` with information about the launcher to register. root A filesystem path under which the implementation of the launcher is to be loaded from. Launchers from other locations, even if under the correct package, will not be registered by this method. If a launcher implementation is only available under a different root path, this method will throw an exception. """ _register_plugin(desc, root, 'launcher', Launcher._launchers)
@staticmethod def _print_plugin_status() -> None: _print_plugin_status(Launcher._launchers, 'launcher')
[docs] @staticmethod def get_launcher_names() -> Set[str]: """ Returns a set of registered launcher names. Names returned by this method can be passed to :func:`~psij.Launcher.get_instance` as the `name` parameter. Returns ------- A set of launcher names corresponding to the known executors. """ return set(Launcher._launchers.keys())