Source code for psij.executors.local

"""This module contains the local :class:`~psij.JobExecutor`."""
import logging
import os
import shlex
import signal
import subprocess
import threading
import time
from abc import ABC, abstractmethod
from tempfile import mkstemp
from types import FrameType
from typing import Optional, Dict, List, Tuple, Type, cast

import psutil

from psij import InvalidJobException, SubmitException, Launcher, ResourceSpecV1
from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus
from psij import JobExecutor
from psij.utils import SingletonThread

logger = logging.getLogger(__name__)


def _format_shell_cmd(args: List[str]) -> str:
    """Formats an argument list in a way that allows it to be pasted in a shell."""
    cmd = ''
    for arg in args:
        cmd += shlex.quote(arg)
        cmd += ' '
    return cmd


def _handle_sigchld(signum: int, frame: Optional[FrameType]) -> None:
    _ProcessReaper.get_instance()._handle_sigchld()


if threading.current_thread() != threading.main_thread():
    logger.warning('The psij module is being imported from a non-main thread. This prevents the'
                   'use of signals in the local executor, which will slow things down a bit.')
else:
    signal.signal(signal.SIGCHLD, _handle_sigchld)


_REAPER_SLEEP_TIME = 0.1


class _ProcessEntry(ABC):
    def __init__(self, job: Job, executor: 'LocalJobExecutor', launcher: Optional[Launcher]):
        self.job = job
        self.executor = executor
        self.exit_code: Optional[int] = None
        self.done_time: Optional[float] = None
        self.out: Optional[str] = None
        self.kill_flag = False
        self.process: Optional[subprocess.Popen[bytes]] = None
        self.launcher = launcher

    @abstractmethod
    def kill(self) -> None:
        assert self.process is not None
        root = psutil.Process(self.process.pid)
        for proc in root.children(recursive=True):
            proc.kill()
        self.process.kill()

    @abstractmethod
    def poll(self) -> Tuple[Optional[int], Optional[str]]:
        pass

    def __repr__(self) -> str:
        pid = '-'
        if self.process:
            pid = str(self.process.pid)
        return '{}[jobid: {}, pid: {}]'.format(self.__class__.__name__, self.job.id, pid)


class _ChildProcessEntry(_ProcessEntry):
    def __init__(self, job: Job, executor: 'LocalJobExecutor',
                 launcher: Optional[Launcher]) -> None:
        super().__init__(job, executor, launcher)
        self.nodefile: Optional[str] = None

    def kill(self) -> None:
        super().kill()

    def poll(self) -> Tuple[Optional[int], Optional[str]]:
        assert self.process is not None
        exit_code = self.process.poll()
        if exit_code is not None:
            if self.nodefile:
                os.unlink(self.nodefile)
            if self.process.stdout:
                return exit_code, self.process.stdout.read().decode('utf-8')
            else:
                return exit_code, None
        else:
            return None, None


class _AttachedProcessEntry(_ProcessEntry):
    def __init__(self, job: Job, process: psutil.Process, executor: 'LocalJobExecutor'):
        super().__init__(job, executor, None)
        self.process = process

    def kill(self) -> None:
        super().kill()

    def poll(self) -> Tuple[Optional[int], Optional[str]]:
        try:
            assert self.process
            ec: Optional[int] = self.process.wait(timeout=0)
            if ec is None:
                return 0, None
            else:
                return ec, None
        except psutil.TimeoutExpired:
            return None, None


def _get_env(spec: JobSpec, nodefile: Optional[str]) -> Optional[Dict[str, str]]:
    env: Optional[Dict[str, str]] = None
    if spec.inherit_environment:
        if spec.environment is None and nodefile is None:
            # if env is none in Popen, it inherits env from parent
            return None
        else:
            # merge current env with spec env
            env = os.environ.copy()
            if spec.environment:
                env.update(spec.environment)
            if nodefile is not None:
                env['PSIJ_NODEFILE'] = nodefile
            return env
    else:
        # only spec env
        if nodefile is None:
            env = spec.environment
        else:
            env = {'PSIJ_NODEFILE': nodefile}
            if spec.environment:
                env.update(spec.environment)

        return env


class _ProcessReaper(SingletonThread):

    @classmethod
    def get_instance(cls: Type['_ProcessReaper']) -> '_ProcessReaper':
        return cast('_ProcessReaper', super().get_instance())

    def __init__(self) -> None:
        super().__init__(name='Local Executor Process Reaper', daemon=True)
        self._jobs: Dict[Job, _ProcessEntry] = {}
        self._lock = threading.RLock()
        self._cvar = threading.Condition()

    def register(self, entry: _ProcessEntry) -> None:
        logger.debug('Registering process %s', entry)
        with self._lock:
            self._jobs[entry.job] = entry

    def run(self) -> None:
        logger.debug('Started {}'.format(self))
        done: List[_ProcessEntry] = []
        while True:
            with self._lock:
                for entry in done:
                    del self._jobs[entry.job]
                jobs = dict(self._jobs)
            try:
                done = self._check_processes(jobs)
            except Exception as ex:
                logger.error('Error polling for process status', ex)
            with self._cvar:
                self._cvar.wait(_REAPER_SLEEP_TIME)

    def _handle_sigchld(self) -> None:
        with self._cvar:
            try:
                self._cvar.notify_all()
            except RuntimeError:
                # In what looks like rare cases, notify_all(), seemingly when combined with
                # signal handling, raises `RuntimeError: release unlocked lock`.
                # There appears to be an unresolved Python bug about this:
                #    https://bugs.python.org/issue34486
                # We catch the exception here and log it. It is hard to tell if that will not lead
                # to further issues. It would seem like it shouldn't: after all, all we're doing is
                # making sure we don't sleep too much, but, even if we do, the consequence is a
                # small delay in processing a completed job. However, since this exception seems
                # to be a logical impossibility when looking at the code in threading.Condition,
                # there is really no telling what else could go wrong.
                logger.debug('Exception in Condition.notify_all()')

    def _check_processes(self, jobs: Dict[Job, _ProcessEntry]) -> List[_ProcessEntry]:
        done: List[_ProcessEntry] = []

        for entry in jobs.values():
            if entry.kill_flag:
                entry.kill()

            exit_code, out = entry.poll()
            if exit_code is not None:
                entry.exit_code = exit_code
                entry.done_time = time.time()
                entry.out = out
                done.append(entry)

        for entry in done:
            entry.executor._process_done(entry)

        return done

    def cancel(self, job: Job) -> None:
        with self._lock:
            p = self._jobs[job]
            p.kill_flag = True


[docs]class LocalJobExecutor(JobExecutor): """ A job executor that runs jobs locally using :class:`subprocess.Popen`. This job executor is intended to be used either to run jobs directly on the same machine as the PSI/J library or for testing purposes. .. note:: In Linux, attached jobs always appear to complete with a zero exit code regardless of the actual exit code. .. warning:: Instantiation of a local executor from both parent process and a `fork()`-ed process is not guaranteed to work. In general, using `fork()` and multi-threading in Linux is unsafe, as suggested by the `fork()` man page. While PSI/J attempts to minimize problems that can arise when `fork()` is combined with threads (which are used by PSI/J), no guarantees can be made and the chances of unexpected behavior are high. Please do not use PSI/J with `fork()`. If you do, please be mindful that support for using PSI/J with `fork()` will be limited. """ def __init__(self, url: Optional[str] = None, config: Optional[JobExecutorConfig] = None) -> None: """ :param url: Not used, but required by the spec for automatic initialization. :param config: The `LocalJobExecutor` does not have any configuration options. :type config: psij.JobExecutorConfig """ super().__init__(url=url, config=config if config else JobExecutorConfig()) self._reaper = _ProcessReaper.get_instance() def _generate_nodefile(self, job: Job, p: _ChildProcessEntry) -> Optional[str]: assert job.spec is not None if job.spec.resources is None: return None if job.spec.resources.version == 1: assert isinstance(job.spec.resources, ResourceSpecV1) n = job.spec.resources.computed_process_count if n == 1: # as a bit of an optimization, we don't generate a nodefile when doing "single # node" jobs on local. return None (file, p.nodefile) = mkstemp(suffix='.nodelist') for i in range(n): os.write(file, 'localhost\n'.encode()) os.close(file) return p.nodefile else: raise SubmitException('Cannot handle resource specification with version %s' % job.spec.resources.version) def submit(self, job: Job) -> None: """ Submits the specified :class:`~psij.Job` to be run locally. Successful return of this method indicates that the job has been started locally and all changes in the job status, including failures, are reported using notifications. If the job specification is invalid, an :class:`~psij.InvalidJobException` is thrown. If the actual submission fails for reasons outside the validity of the job, a :class:`~psij.SubmitException` is thrown. :param job: The job to be submitted. """ spec = self._check_job(job) p = _ChildProcessEntry(job, self, self._get_launcher(self._get_launcher_name(spec))) assert p.launcher args = p.launcher.get_launch_command(job) try: with job._status_cv: if job.status.state == JobState.CANCELED: raise SubmitException('Job canceled') if logger.isEnabledFor(logging.DEBUG): logger.debug('Running %s', _format_shell_cmd(args)) nodefile = self._generate_nodefile(job, p) env = _get_env(spec, nodefile) p.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, cwd=spec.directory, env=env) self._reaper.register(p) job._native_id = p.process.pid self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time(), metadata={'nativeId': job._native_id})) self._set_job_status(job, JobStatus(JobState.ACTIVE, time=time.time())) except Exception as ex: raise SubmitException('Failed to submit job', exception=ex) def cancel(self, job: Job) -> None: """ Cancels a job. :param job: The job to cancel. """ self._set_job_status(job, JobStatus(JobState.CANCELED)) self._reaper.cancel(job) def _process_done(self, p: _ProcessEntry) -> None: assert p.exit_code is not None message = None if p.exit_code == 0: state = JobState.COMPLETED elif p.exit_code < 0 and p.kill_flag: state = JobState.CANCELED else: # We want to capture errors in the launcher scripts. Since, under normal circumstances, # the exit code of the launcher is the exit code of the job, we must use a different # mechanism to distinguish between job errors and launcher errors. So we delegate to # the launcher implementation to figure out if the error belongs to the job or not if p.launcher and p.out and p.launcher.is_launcher_failure(p.out): message = p.launcher.get_launcher_failure_message(p.out) state = JobState.FAILED self._set_job_status(p.job, JobStatus(state, time=p.done_time, exit_code=p.exit_code, message=message)) def list(self) -> List[str]: """ Return a list of ids representing jobs that are running on the underlying implementation. Specifically for the `LocalJobExecutor`, this returns a list of `~psij.NativeId` objects corresponding to the processes running under the current user on the local machine. These processes need not correspond to jobs statrted by calling the `submit()` method of an instance of a `LocalJobExecutor`. :return: The list of `~psij.NativeId` objects corresponding to the current user's processes running locally. """ my_username = psutil.Process().username() return [str(p.pid) for p in psutil.process_iter(['pid', 'username']) if p.info['username'] == my_username] def attach(self, job: Job, native_id: str) -> None: """ Attaches a job to a process. The job must be in the :attr:`~psij.JobState.NEW` state. The exit code of the attached job will not be available upon completion and a zero exit code will always be returned for jobs attached by the `LocalJobExecutor`. :param job: The job to attach. :param native_id: The native ID of the process to attached to, as obtained through :func:`~psij.executors.LocalJobExecutor.list` method. """ if job.status.state != JobState.NEW: raise InvalidJobException('Job must be in the NEW state') job.executor = self pid = int(native_id) self._reaper.register(_AttachedProcessEntry(job, psutil.Process(pid), self)) # We assume that the native_id above is a PID that was obtained at some point using # list(). If so, the process is either still running or has completed. Either way, we must # bring it up to ACTIVE state self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time())) self._set_job_status(job, JobStatus(JobState.ACTIVE, time=time.time())) def _get_launcher_name(self, spec: JobSpec) -> str: if spec.launcher is None: return 'single' else: return spec.launcher