Source code for mlmc.sampling_pool_pbs

import os
import shutil
import subprocess
import re
import pickle
import json
import glob
import time
import numpy as np
from mlmc.level_simulation import LevelSimulation
from mlmc.sampling_pool import SamplingPool
from mlmc.tool.pbs_job import PbsJob

"""
SamplingPoolPBS description
    - this class inherits from SampleStorage, both abstract methods and other crucial ones are described

    schedule_sample(sample_id, level_sim)
        - serialize level_sim (mlmc/level_simulation.py), pickle is used
        - compute random seed from sample_id
        - add (level_sim._level_id, sample_id, seed) to job's scheduled samples 
        - add job weight, increment number of samples in job and execute if job_weight is exceeded

    execute()
        - it is call when job weight (Maximum sum of task sizes summation in single one job) is exceeded
        - methods from mlmc/pbs_job.py are called
            - PbsJob class is created and serialized (PbsJob static method does both)
            - scheduled samples are saved through PbsJob class static method
        - pbs script is written out and ready to run

    get_finished()
        - run execute()
        - get finished_pbs_jobs and unfinished_pbs_jobs from qstat output
        - call get_result_files(), it returns successful samples, failed samples and times all of that is return to Sampler

    _get_result_files()
        - set n_running - number of running samples, it is given from unfinished_pbs_jobs
        - successful samples, failed samples and run times are retrieved from PbsJob class with given job_id
        - if there are _unfinished_sample_ids ('renew' command was use) these samples are appended to previous ones


    This class cooperates with PbsJob (mlmc/pbs_job), which is used as "mediator" between master process and
    worker (job) process. Data which are necessary for worker process are passed to PbsJob from SampleStoragePbs. 
    Master process serializes PbsJob instance.
    Then PbsJob is deserialized in worker process.

"""


[docs] class SamplingPoolPBS(SamplingPool): """ Sampling pool PBS (Portable batch system) runtime environment """ OUTPUT_DIR = "output" JOBS_DIR = "jobs" LEVEL_SIM_CONFIG = "level_{}_simulation_config" # Serialized level simulation JOB = "{}_job.sh" # Pbs process QSUB_FAILED_MAX_N = 10 # Ignore 10 consecutive ''qsub' command failures QSTAT_FAILED_MAX_N = 10 # Ignore 10 consecutive 'qstat' command failures
[docs] def __init__(self, work_dir, debug=False): """ :param work_dir: Path to working directory :param debug: bool, if True keep sample directories it is the strongest parameter so it overshadows 'clean' param """ self._work_dir = os.path.abspath(work_dir) # Working directory - other subdirectories are created in this one self._current_job_weight = 0 # Current job weight. # Job is scheduled when current job weight is above 1 (this condition replaces previous job_weight param) self._n_samples_in_job = 0 # Number of samples in job self.pbs_script = None self._pbs_config = None # Data inserted to the pbs script self._pbs_header_template = None # Lines to put at the beginning of the PBS script. self._scheduled = [] # List of scheduled samples self._pbs_ids = [] # List of pbs job ids which should run self._unfinished_sample_ids = set() # List of sample id which are not collected - collection attempts are done in the get_finished() self._debug = debug # If true then keep sample directories super().__init__(self._work_dir, self._debug) self._jobs_dir = self._create_dir(directory=SamplingPoolPBS.JOBS_DIR) self._job_count = self._get_job_count() # Current number of jobs - sort of jobID self._qsub_failed_n = 0 self._qstat_failed_n = 0
# Number of failed execution of commands qsub, qstat def _get_job_count(self): """ Get number of created jobs :return: """ files_pattern = os.path.join(self._jobs_dir, "*_job.sh") files = glob.glob(files_pattern) if not files: return 0 job_id = re.findall(r'(\d+)_job.sh', files[-1])[0] return int(job_id) + 1 def _save_structure(self): """ Save structure of files which are needed for pbs process run :return: None """ files_structure = {"job_dir": self._jobs_dir, "level_sim_config": os.path.join(self._output_dir, SamplingPoolPBS.LEVEL_SIM_CONFIG) } with open(self._files_structure, "w") as writer: json.dump(files_structure, writer) def pbs_common_setting(self, **kwargs): """ Values for common header of script :param kwargs: Dict[ env_setting: environmental setting - load modules, install packages, ... n_nodes: number of used nodes, n_cores: number of cores a node, mem: used memory a job, queue: used queue on the server, optional params: select_flags: other select flags, see https://wiki.metacentrum.cz/wiki/About_scheduling_system for other possible parameters python: python command, default: python3 ] :return: None """ # Script header select_flags_dict = kwargs.get('select_flags', {}) # Set scratch dir if any(re.compile('scratch.*').match(flag) for flag in list(select_flags_dict.keys())): if kwargs['scratch_dir'] is None: kwargs['scratch_dir'] = "$SCRATCHDIR" else: kwargs['scratch_dir'] = '' if select_flags_dict: kwargs['select_flags'] = ":" + ':'.join('{}={}'.format(*item) for item in select_flags_dict.items()) else: kwargs['select_flags'] = "" # Python3 by default if 'python' not in kwargs: kwargs['python'] = "python3" if 'std_out_err' not in kwargs: kwargs['std_out_err'] = 'oe' # Standard error and standard output are merged into standard output. self._pbs_header_template = ["#!/bin/bash", '#PBS -S /bin/bash', '#PBS -l select={n_nodes}:ncpus={n_cores}:mem={mem}{select_flags}', '#PBS -l walltime={walltime}', '#PBS -q {queue}', '#PBS -N {pbs_name}', '#PBS -j {std_out_err}', # Specifies whether and how to join the job's # standard error and standard output streams. '#PBS -o {pbs_output_dir}/{job_name}.OU', '#PBS -e {pbs_output_dir}/{job_name}.ER' ] self._pbs_header_template.extend( kwargs['optional_pbs_requests']) # e.g. ['#PBS -m ae'] means mail is sent when the job aborts or terminates self._pbs_header_template.extend(('MLMC_WORKDIR=\"{}\"'.format(self._work_dir),)) self._pbs_header_template.extend(kwargs['env_setting']) self._pbs_header_template.extend(('{python} -m mlmc.tool.pbs_job {output_dir} {job_name} {scratch_dir} >' '{pbs_output_dir}/{job_name}_STDOUT 2>&1',)) self._pbs_config = kwargs def schedule_sample(self, sample_id, level_sim): """ Add sample to current PBS package :param sample_id: unique sample id from Sampler :param level_sim: LevelSimulation instance :return: None """ self.serialize_level_sim(level_sim) seed = self.compute_seed(sample_id) self._scheduled.append((level_sim._level_id, sample_id, seed)) self._n_samples_in_job += 1 self._current_job_weight += level_sim.task_size if self._current_job_weight > 1 or self._n_samples_in_job > 1000: self.execute() def serialize_level_sim(self, level_sim: LevelSimulation): """ Pickle LevelSimulation instance :param level_sim: LevelSimulation :return: None """ file_path = os.path.join(self._output_dir, SamplingPoolPBS.LEVEL_SIM_CONFIG.format(level_sim._level_id)) if not os.path.exists(file_path): with open(file_path, "wb") as f: pickle.dump(level_sim, f) def execute(self): """ Execute pbs script :return: None """ if len(self._scheduled) > 0: job_id = "{:04d}".format(self._job_count) # Create pbs job pbs_process = PbsJob.create_job(self._output_dir, self._jobs_dir, job_id, SamplingPoolPBS.LEVEL_SIM_CONFIG, self._debug) pbs_process.save_sample_id_job_id(job_id, self._scheduled) # Write scheduled samples to file pbs_process.save_scheduled(self._scheduled) # Format pbs script self._create_script() if self.pbs_script is None or self._n_samples_in_job == 0: return # Write pbs script job_file = os.path.join(self._jobs_dir, SamplingPoolPBS.JOB.format(job_id)) script_content = "\n".join(self.pbs_script) self.write_script(script_content, job_file) while self._qsub_failed_n <= SamplingPoolPBS.QSUB_FAILED_MAX_N: process = subprocess.run(['qsub', job_file], stderr=subprocess.PIPE, stdout=subprocess.PIPE) try: if process.returncode != 0: raise Exception(process.stderr.decode('ascii')) # Find all finished jobs self._qsub_failed_n = 0 # Write current job count self._job_count += 1 # Get pbs_id from qsub output pbs_id = process.stdout.decode("ascii").split(".")[0] # Store pbs id for future qstat calls self._pbs_ids.append(pbs_id) pbs_process.write_pbs_id(pbs_id) self._current_job_weight = 0 self._n_samples_in_job = 0 self._scheduled = [] break except: self._qsub_failed_n += 1 time.sleep(30) if self._qsub_failed_n > SamplingPoolPBS.QSUB_FAILED_MAX_N: raise Exception(process.stderr.decode("ascii")) def _create_script(self): """ Format pbs script :return: None """ # Job output with similar name to job self._pbs_config['job_name'] = "{:04d}".format(self._job_count) self._pbs_config['pbs_output_dir'] = self._jobs_dir self._pbs_config['output_dir'] = self._output_dir self._pbs_config['work_dir'] = self._work_dir self.pbs_script = [line.format(**self._pbs_config) for line in self._pbs_header_template] def write_script(self, content, job_file): """ Create :param content: script content :param job_file: job file path :return: None """ with open(job_file, "w") as f: f.write(content) os.chmod(job_file, 0o774) def get_finished(self): """ Get results :return: """ self.execute() finished_pbs_jobs, unfinished_pbs_jobs = self._qstat_pbs_job() return self._get_result_files(finished_pbs_jobs, unfinished_pbs_jobs) def collect_data(self): successful_results = {} failed_results = {} times = {} sim_data_results = {} # running_times = {} # extract_mesh_times = {} # make_field_times = {} # generate_rnd_times = {} # fine_flow_times = {} # coarse_flow_times = {} n_running = 0 os.chdir(self._jobs_dir) for file in glob.glob("*_STDOUT"): job_id = re.findall(r'(\d+)_STDOUT', file)[0] successful, failed, time = PbsJob.read_results(job_id, self._jobs_dir) # Split results to levels for level_id, results in successful.items(): successful_results.setdefault(level_id, []).extend(results) for level_id, results in failed.items(): failed_results.setdefault(level_id, []).extend(results) for level_id, results in time.items(): if level_id in times: times[level_id][0] += results[-1][0] times[level_id][1] += results[-1][1] else: times[level_id] = list(results[-1]) # # Optional simulation data # for level_id, results in sim_data.items(): # sim_data_results.setdefault(level_id, []).extend(results) # for level_id, results in running_time.items(): # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in extract_mesh.items(): # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in make_field.items(): # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in generate_rnd.items(): # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in fine_flow.items(): # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in coarse_flow.items(): # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] return successful_results, failed_results, n_running #, sim_data_results #list(times.items()), list(running_times.items()), \ # list(extract_mesh_times.items()), list(make_field_times.items()), list(generate_rnd_times.items()), \ # list(fine_flow_times.items()), \ # list(coarse_flow_times.items()) def _qstat_pbs_job(self): """ Parse qstat output and get all unfinished job ids :return: finished and unfinished jobs both list of job ids (str) """ finished_pbs_jobs = [] if len(self._pbs_ids) > 0: # Get PBS id's status, # '-x' - displays status information for finished and moved jobs in addition to queued and running jobs. qstat_call = ["qstat", "-xs"] qstat_call.extend(self._pbs_ids) while self._qstat_failed_n <= SamplingPoolPBS.QSTAT_FAILED_MAX_N: # qstat call unknown_job_ids = [] process = subprocess.run(qstat_call, stderr=subprocess.PIPE, stdout=subprocess.PIPE) try: if process.returncode != 0: err_output = process.stderr.decode("ascii") # Presumably, Job Ids are 'unknown' for PBS after some time of their inactivity unknown_job_ids = re.findall(r"Unknown Job Id (\d+)\.", err_output) if len(unknown_job_ids) == 0: raise Exception(process.stderr.decode("ascii")) output = process.stdout.decode("ascii") # Find all finished jobs finished_pbs_jobs = re.findall(r"(\d+)\..*\d+ F", output) finished_moved_pbs_jobs = re.findall(r"(\d+)\..*\d+ M.*\n.*Job finished", output) finished_pbs_jobs.extend(finished_moved_pbs_jobs) finished_pbs_jobs.extend(unknown_job_ids) self._qstat_failed_n = 0 break except: self._qstat_failed_n += 1 time.sleep(30) if self._qstat_failed_n > SamplingPoolPBS.QSTAT_FAILED_MAX_N: raise Exception(process.stderr.decode("ascii")) finished_pbs_jobs = [] # Get unfinished as diff between planned and finished unfinished_pbs_jobs = [] for pbs_id in self._pbs_ids: if pbs_id not in finished_pbs_jobs: unfinished_pbs_jobs.append(pbs_id) else: # Remove finished ids from all saved pbs_ids # It prevents qstat exception: "Unknown Job Id", # that occurs because there is some kind of qstat 'forgetfulness' of terminated jobs # It is a very rare phenomenon, which is observed only during the long run (e.g. 8 hours for a job) # of many (e.g. 2500) simulations, here it happened after around a day and a half of running MLMC. self._pbs_ids.remove(pbs_id) return finished_pbs_jobs, unfinished_pbs_jobs def _get_result_files(self, finished_pbs_jobs, unfinished_pbs_jobs): """ Get results from files :param finished_pbs_jobs: List[str], finished pbs jobs, :param unfinished_pbs_jobs: List[str], unfinished pbs jobs, :return: successful_results: Dict[level_id, List[Tuple[sample_id: str, Tuple[fine_result: np.ndarray, coarse_result: n.ndarray]]]] failed_results: Dict[level_id, List[Tuple[sample_id: str, err_msg: str]]] n_running: int, number of running samples times: """ os.chdir(self._jobs_dir) # Get number of running samples n_running = 0 for pbs_id in unfinished_pbs_jobs: reg = "*_{}".format(pbs_id) file = glob.glob(reg) if len(file) > 0: job_id = re.findall(r'(\d+)_\d+', file[0])[0] n_running += PbsJob.get_job_n_running(job_id, self._jobs_dir) successful_results = {} failed_results = {} times = {} #sim_data_results = {} # running_times = {} # extract_mesh_times = {} # make_field_times = {} # generate_rnd_times = {} # fine_flow_times = {} # coarse_flow_times = {} for pbs_id in finished_pbs_jobs: reg = "*_{}".format(pbs_id) # JobID_PbsId file file = glob.glob(reg) if file: # Find jobID file = file[0] job_id = re.findall(r'(\d+)_\d+', file)[0] # Get sample results successful, failed, time = PbsJob.read_results(job_id, self._jobs_dir) # Split results to levels for level_id, results in successful.items(): successful_results.setdefault(level_id, []).extend(results) for level_id, results in failed.items(): failed_results.setdefault(level_id, []).extend(results) # for level_id, results in sim_data.items(): # sim_data_results.setdefault(level_id, []).extend(results) for level_id, results in time.items(): if level_id in times: times[level_id][0] += results[-1][0] times[level_id][1] += results[-1][1] else: times[level_id] = list(results[-1]) # for level_id, results in running_time.items(): # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in extract_mesh.items(): # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in make_field.items(): # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in generate_rnd.items(): # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in fine_flow.items(): # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in coarse_flow.items(): # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # Delete pbsID file - it means job is finished SamplingPoolPBS.delete_pbs_id_file(file) if self._unfinished_sample_ids: successful_results, failed_results, times = self._collect_unfinished(successful_results, failed_results, times, ) # running_times # extract_mesh_times, # make_field_times, # generate_rnd_times, # fine_flow_times, # coarse_flow_times) return successful_results, failed_results, n_running, list(times.items())#, sim_data_results def _collect_unfinished(self, successful_results, failed_results, times): """ Collect samples which had finished after main process crashed, append them to new collected samples :param successful_results: dict :param failed_results: dict :param times: dict :return: all input dictionaries """ already_collected = set() for sample_id in self._unfinished_sample_ids: if sample_id in already_collected: continue try: job_id = PbsJob.job_id_from_sample_id(sample_id, self._jobs_dir) except (FileNotFoundError, KeyError) as e: level_id = int(re.findall(r'L0?(\d*)', sample_id)[0]) failed_results.setdefault(level_id, []).append((sample_id, "".format(e))) continue successful, failed, time = PbsJob.read_results(job_id, self._jobs_dir) # Split results to levels for level_id, results in successful.items(): successful_results.setdefault(level_id, []).extend(results) for level_id, results in failed.items(): failed_results.setdefault(level_id, []).extend(results) for level_id, results in time.items(): times[level_id] = results[-1] # for level_id, results in sim_data.items(): # sim_data_results.setdefault(level_id, []).extend(results) # for level_id, results in running_time.items(): # running_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in extract_mesh.items(): # extract_mesh_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in make_field.items(): # make_field_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in generate_rnd.items(): # generate_rnd_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in fine_flow.items(): # fine_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] # # for level_id, results in coarse_flow.items(): # coarse_flow_times[level_id] = [np.sum(results, axis=0)[0], results[-1][1]] level_id_sample_id_seed = PbsJob.get_scheduled_sample_ids(job_id, self._jobs_dir) for level_id, sample_id, _ in level_id_sample_id_seed: already_collected.add(sample_id) # Delete pbsID file - it means job is finished # SamplingPoolPBS.delete_pbs_id_file(file) self._unfinished_sample_ids = set() return successful_results, failed_results, times#, sim_data_results def have_permanent_samples(self, sample_ids): """ List of unfinished sample ids, the corresponding samples are collecting in next get_finished() call """ self._unfinished_sample_ids = set(sample_ids) @staticmethod def delete_pbs_id_file(file_path): """ Delete jobId_pbsId file - it indicates finished job :param file_path: str :return: None """ try: os.remove(file_path) except FileNotFoundError: print("Failed to remove PBS id file, file not found")