Source code for mlmc.tool.pbs_job

import os
import sys
import time
import warnings
import json
import ruamel.yaml as yaml
import pickle
from mlmc.sampling_pool import SamplingPool
from ruamel.yaml.error import ReusedAnchorWarning, UnsafeLoaderWarning
warnings.simplefilter("ignore", UnsafeLoaderWarning)
warnings.simplefilter("ignore", ReusedAnchorWarning)


[docs] class PbsJob: SCHEDULED = "{}_scheduled.yaml" # Store scheduled samples as List[(level_sim._level_id, sample_id, seed)] SUCCESSFUL_RESULTS = "{}_successful_results.yaml" # Simulation results as Dict[level_id, List[Tuple[sample_id, (fine result, coarse result)]]] FAILED_RESULTS = "{}_failed_results.yaml" # Failed samples as Dict[level_id, List[Tuple[sample id, error message]]] TIME = "{}_times.yaml" # Dict[level_id, List[time, finished samples]] PBS_ID = "{}_" # File which name assign our job id to pbs jobs id 'JobID_Pbs_ID' CLASS_FILE = "pbs_process_serialized.txt" # Serialized data which are "passed" from sampling pool to pbs process SAMPLE_ID_JOB_ID = "sample_id_job_id.json" # Sample id with corresponding job id, # used to indicate that sample is stored in _successful_results.yaml or _failed_results.yaml def __init__(self, output_dir, jobs_dir, job_id, level_sim_file, debug): """ Construct a PbsJob instance used both by SamplingPool (to create a job) and by PBS worker process. :param output_dir: str, directory where sample work dirs and outputs live :param jobs_dir: str, directory where scheduler/job control files are stored :param job_id: str, unique identifier of this job :param level_sim_file: str, format string for per-level serialized LevelSimulation files :param debug: bool; if True do not remove per-sample directories after successful runs """ self._output_dir = output_dir self._jobs_dir = jobs_dir self._job_id = job_id self._level_sim_file = level_sim_file self._debug = debug self._level_simulations = {} # LevelSimulation instances deserialized on demand
[docs] @classmethod def create_job(cls, output_dir, jobs_dir, job_id, level_sim_file, debug): """ Create and serialize a PbsJob descriptor for a PBS process to later deserialize. The created descriptor (CLASS_FILE) is written under output_dir for the PBS worker. :param output_dir: str :param jobs_dir: str :param job_id: str :param level_sim_file: str, format of LevelSimulation serialization filenames :param debug: bool :return: PbsJob instance """ pbs_process = cls(output_dir, jobs_dir, job_id, level_sim_file, debug) PbsJob._serialize_pbs_process(pbs_process) return pbs_process
[docs] @classmethod def create_process(cls): """ Create PbsJob instance inside PBS worker process. The worker expects command-line arguments (see command_params) and a serialized CLASS_FILE in output_dir describing jobs_dir and level_sim_file format. :return: PbsJob instance """ job_id, output_dir = PbsJob.command_params() jobs_dir, level_sim_file_format, debug = PbsJob._deserialize_pbs_process(output_dir) return cls(output_dir, jobs_dir, job_id, level_sim_file_format, debug)
@staticmethod def _serialize_pbs_process(pbs_process): """ Persist minimal information (jobs_dir, level_sim_file format, debug) for PBS worker. This function writes CLASS_FILE inside the pbs_process._output_dir for later deserialization. :param pbs_process: PbsJob instance to serialize :return: None """ if not os.path.exists(os.path.join(pbs_process._output_dir, PbsJob.CLASS_FILE)): with open(os.path.join(pbs_process._output_dir, PbsJob.CLASS_FILE), "w") as writer: writer.write(pbs_process._jobs_dir + ";") writer.write(pbs_process._level_sim_file + ";") writer.write(str(pbs_process._debug) + ";") @staticmethod def _deserialize_pbs_process(output_dir): """ Read CLASS_FILE written by _serialize_pbs_process and return stored parameters. :param output_dir: str path where CLASS_FILE was written :return: tuple (jobs_dir: str, level_sim_file: str, debug: bool) """ with open(os.path.join(output_dir, PbsJob.CLASS_FILE), "r") as reader: line = reader.readline().split(';') return line[0], line[1], True if line[2] == 'True' else False
[docs] @staticmethod def command_params(): """ Parse PBS worker command-line parameters. Called inside worker process. Expects sys.argv[1] = output_dir, sys.argv[2] = job_id :return: tuple (job_id: str, output_dir: str) """ output_dir = sys.argv[1] job_id = sys.argv[2] return job_id, output_dir
def _get_level_sim(self, level_id): """ Deserialize LevelSimulation object for a given level id and store it in self._level_simulations. :param level_id: int or str identifier of level (used to format self._level_sim_file) :return: None (LevelSimulation object is stored internally) """ with open(os.path.join(self._output_dir, self._level_sim_file.format(level_id)), "rb") as reader: l_sim = pickle.load(reader) self._level_simulations[l_sim._level_id] = l_sim def _get_level_id_sample_id_seed(self): """ Read scheduled samples list for this job. The scheduled YAML file contains a list of tuples (level_id, sample_id, seed). :return: Sorted list of tuples [(level_id, sample_id, seed), ...] sorted by level_id ascending """ with open(os.path.join(self._jobs_dir, PbsJob.SCHEDULED.format(self._job_id))) as file: level_id_sample_id_seed = yaml.load(file, yaml.Loader) level_id_sample_id_seed.sort(key=lambda tup: tup[0]) return level_id_sample_id_seed
[docs] def calculate_samples(self): """ Main worker routine: calculate each scheduled sample, move produced files, and record success/failure. This method: - reads the scheduled list, - deserializes LevelSimulation objects on demand, - calls SamplingPool.calculate_sample for each scheduled sample, - moves successful/failed artifacts, - writes partial results to YAML files (successful, failed, times). :return: None """ self._success_file = os.path.join(self._jobs_dir, PbsJob.SUCCESSFUL_RESULTS.format(self._job_id)) self._failed_file = os.path.join(self._jobs_dir, PbsJob.FAILED_RESULTS.format(self._job_id)) self._times_file = os.path.join(self._jobs_dir, PbsJob.TIME.format(self._job_id)) # List of Tuple[level id, sample id, random seed] level_id_sample_id_seed = self._get_level_id_sample_id_seed() failed = [] success = [] current_level = 0 current_samples = [] start_time = time.time() successful_samples_time = 0 times = [] n_times = 0 successful_dest_dir = os.path.join(self._output_dir, SamplingPool.SEVERAL_SUCCESSFUL_DIR) for level_id, sample_id, seed in level_id_sample_id_seed: start_time = time.time() # Deserialize level simulation config if not loaded if level_id not in self._level_simulations: self._get_level_sim(level_id) # When level changes, reset time accounting for previous level if current_level != level_id: times.append((current_level, successful_samples_time, n_times)) n_times = 0 start_time = time.time() successful_samples_time = 0 current_level = level_id level_sim = self._level_simulations[current_level] assert level_sim._level_id == current_level # Calculate sample (may create sample working dir, call external tools) _, res, err_msg, _ = SamplingPool.calculate_sample(sample_id, level_sim, work_dir=self._output_dir, seed=seed) if not err_msg: success.append((current_level, sample_id, (res[0], res[1]))) # Move successful artifacts unless in debug mode if not self._debug: SamplingPool.move_successful_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=SamplingPool.SEVERAL_SUCCESSFUL_DIR) n_times += 1 successful_samples_time += (time.time() - start_time) print("sample time ", time.time() - start_time) else: failed.append((current_level, sample_id, err_msg)) SamplingPool.move_failed_rm(sample_id, level_sim, output_dir=self._output_dir, dest_dir=SamplingPool.FAILED_DIR) current_samples.append(sample_id) times.append((current_level, successful_samples_time, n_times)) self._save_to_file(success, failed, times, current_samples) # Reset accumulators for next loop iteration success = [] failed = [] current_samples = [] times = [] # Final flush (in case any accumulators still have items) self._save_to_file(success, failed, times, current_samples)
def _save_to_file(self, success, failed, times, current_samples): """ Append success/failure/time data to corresponding YAML result files. :param success: list of successful sample tuples :param failed: list of failed sample tuples :param times: list of (level_id, cumulative_time, n_samples) tuples :param current_samples: list of current sample ids processed :return: None """ if success: self._append_file(success, self._success_file) if failed: self._append_file(failed, self._failed_file) if times: self._append_file(times, self._times_file)
[docs] def save_sample_id_job_id(self, job_id, sample_ids): """ Save mapping of sample ids to this job_id so other tools can query which job handled a sample. :param job_id: str :param sample_ids: iterable of sample-identifiers (each sample_id is usually a tuple or list, code expects sample_id[1]) :return: None """ sample_id_job_id_file = os.path.join(self._jobs_dir, PbsJob.SAMPLE_ID_JOB_ID) job_id_list = [job_id] * len(sample_ids) new_ids = dict(zip([sid[1] for sid in sample_ids], job_id_list)) saved_ids = {} if os.path.exists(sample_id_job_id_file): with open(sample_id_job_id_file, "r") as file: saved_ids = json.load(file) with open(sample_id_job_id_file, "w") as file: saved_ids.update(new_ids) json.dump(saved_ids, file)
[docs] @staticmethod def job_id_from_sample_id(sample_id, jobs_dir): """ Lookup job id that processed a given sample id. :param sample_id: str sample identifier :param jobs_dir: path to jobs directory where SAMPLE_ID_JOB_ID file is stored :return: str job id associated with sample_id """ sample_id_job_id_file = os.path.join(jobs_dir, PbsJob.SAMPLE_ID_JOB_ID) with open(sample_id_job_id_file, "r") as file: saved_ids = json.load(file) return saved_ids[sample_id]
def _append_file(self, data, path): """ Append `data` (serializable by YAML) to a file by opening in append mode and dumping. :param data: Python object serializable by ruamel.yaml (list, dict, etc.) :param path: Path to YAML file to append to :return: None """ with open(path, "a") as f: yaml.dump(data, f) def _handle_sim_files(self, sample_id, level_sim): """ If simulation requires workspace, switch to per-sample directory and copy common files there. :param sample_id: str :param level_sim: LevelSimulation instance :return: None """ if level_sim.need_sample_workspace: SamplingPool.change_to_sample_directory(self._output_dir, sample_id) if level_sim.common_files is not None: SamplingPool.copy_sim_files(level_sim.common_files, os.getcwd())
[docs] @staticmethod def read_results(job_id, jobs_dir): """ Read and aggregate results produced by a PBS job into dictionaries. The function reads SUCCESSFUL_RESULTS, FAILED_RESULTS and TIME YAML files (if present) and returns aggregated dicts keyed by level_id. :param job_id: str :param jobs_dir: path to directory containing job result YAML files :return: tuple (successful_dict, failed_dict, time_dict) where: - successful_dict[level_id] = [(sample_id, result), ...] - failed_dict[level_id] = [(sample_id, error_message), ...] - time_dict[level_id] = [(n_samples, cumulative_time), ...] """ successful = {} failed = {} time = {} # Load successful results succ_path = os.path.join(jobs_dir, PbsJob.SUCCESSFUL_RESULTS.format(job_id)) if os.path.exists(succ_path): with open(succ_path, "r") as reader: successful_samples = yaml.load(reader) for level_id, sample_id, result in successful_samples: successful.setdefault(level_id, []).append((sample_id, result)) # Load failed results failed_path = os.path.join(jobs_dir, PbsJob.FAILED_RESULTS.format(job_id)) if os.path.exists(failed_path): with open(failed_path, "r") as reader: failed_samples = yaml.load(reader) for level_id, sample_id, err_msg in failed_samples: failed.setdefault(level_id, []).append((sample_id, err_msg)) # Load times times_path = os.path.join(jobs_dir, PbsJob.TIME.format(job_id)) if os.path.exists(times_path): with open(times_path, "r") as reader: times = yaml.load(reader) for level_id, n_samples, t in times: time.setdefault(level_id, []).append((n_samples, t)) # Mark any scheduled-but-not-recorded samples as failed ("job failed") level_id_sample_id_seed = PbsJob.get_scheduled_sample_ids(job_id, jobs_dir) for level_id, sample_id, _ in level_id_sample_id_seed: successfull_ids = [s[0] for s in successful.get(level_id, [])] failed_ids = [f[0] for f in failed.get(level_id, [])] if sample_id not in failed_ids and sample_id not in successfull_ids: failed.setdefault(level_id, []).append((sample_id, "job failed")) return successful, failed, time
[docs] @staticmethod def get_scheduled_sample_ids(job_id, jobs_dir): """ Read the scheduled YAML file and return the list of scheduled (level_id, sample_id, seed) tuples. :param job_id: str :param jobs_dir: str :return: list of tuples (level_id, sample_id, seed) """ with open(os.path.join(jobs_dir, PbsJob.SCHEDULED.format(job_id))) as file: level_id_sample_id_seed = yaml.load(file, yaml.Loader) return level_id_sample_id_seed
[docs] def write_pbs_id(self, pbs_job_id): """ Write an empty file whose filename encodes the mapping from our internal job id to the external PBS job id. :param pbs_job_id: str (external PBS job identifier) :return: None """ file_name = os.path.join(self._jobs_dir, PbsJob.PBS_ID.format(self._job_id)) file_name += pbs_job_id with open(file_name, 'w') as w: pass
[docs] def save_scheduled(self, scheduled): """ Store scheduled samples list into the jobs folder. :param scheduled: list of tuples (level_id, sample_id, seed) or similar structure :return: None """ try: with open(os.path.join(self._jobs_dir, PbsJob.SCHEDULED.format(self._job_id)), "w") as file: yaml.dump(scheduled, file) except FileNotFoundError: print("Make sure you call _create_files method previously")
[docs] @staticmethod def get_job_n_running(job_id, jobs_dir): """ Return number of scheduled samples for a job (length of scheduled list file). :param job_id: str :param jobs_dir: str path to jobs directory :return: int count of scheduled entries """ with open(os.path.join(jobs_dir, PbsJob.SCHEDULED.format(job_id))) as file: lines = yaml.load(file, yaml.Loader) return len(lines)
if __name__ == "__main__": pbs_process = PbsJob.create_process() pbs_process.calculate_samples()