import os
import sys
import shutil
import numpy as np
from mlmc.moments import Legendre
[docs]
class ProcessBase:
"""
Parent class for particular simulation processes.
Subclasses should implement `setup_config`.
"""
def __init__(self):
"""
Parse CLI arguments and run the requested command.
The constructor reads command-line arguments (sys.argv[1:]) using get_arguments,
sets default attributes and then either runs or re-runs the workflow based on
provided arguments.
:return: None
"""
args = ProcessBase.get_arguments(sys.argv[1:])
self.step_range = (1, 0.01)
self.work_dir = args.work_dir
self.append = False
self.clean = args.clean
self.debug = args.debug
if args.command == 'run':
self.run()
else:
self.append = True
self.clean = False
self.run(renew=True) if args.command == 'renew' else self.run()
[docs]
@staticmethod
def get_arguments(arguments):
"""
Parse command-line arguments.
:param arguments: list of arguments (typically sys.argv[1:])
:return: argparse.Namespace with parsed arguments:
- command: one of ['run', 'collect', 'renew', 'process']
- work_dir: str path
- clean: bool
- debug: bool
"""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('command', choices=['run', 'collect', 'renew', 'process'],
help='run - create new execution,'
'collect - keep collected, append existing HDF file'
'renew - renew failed samples, run new samples with failed sample ids (which determine random seed)')
parser.add_argument('work_dir', help='Work directory')
parser.add_argument("-c", "--clean", default=False, action='store_true',
help="Clean before run, used only with 'run' command")
parser.add_argument("-d", "--debug", default=False, action='store_true',
help="Keep sample directories")
args = parser.parse_args(arguments)
return args
[docs]
def run(self, renew=True):
"""
High-level entry point to run the MLMC workflow.
Creates the working directory, sets up MLMC configurations for a set of level
counts (currently hard-coded to [1]) and schedules/generates jobs. After job creation
it triggers collection of results via all_collect.
:param renew: bool, if True indicates renewing failed samples (passed down to setup_config in some subclasses)
:return: None
"""
os.makedirs(self.work_dir, mode=0o775, exist_ok=True)
mlmc_list = []
for nl in [1]: # , 2, 3, 4,5, 7, 9]:
mlmc = self.setup_config(nl, clean=self.clean)
self.generate_jobs(mlmc, n_samples=[8], sample_sleep=self.sample_sleep, sample_timeout=self.sample_timeout)
mlmc_list.append(mlmc)
self.all_collect(mlmc_list)
[docs]
def set_environment_variables(self):
"""
Determine environment-dependent configuration values (PBS config, executables, timeouts).
The method inspects the work_dir path to decide whether it runs on a cluster or locally and
sets attributes used later (pbs_config, sample_sleep, init_sample_timeout, sample_timeout, flow123d, gmsh).
:return: None
"""
root_dir = os.path.abspath(self.work_dir)
while root_dir != '/':
root_dir, tail = os.path.split(root_dir)
self.pbs_config = dict(
job_weight=250000, # max number of elements per job
n_cores=1,
n_nodes=1,
select_flags=['cgroups=cpuacct'],
mem='4gb',
queue='charon',
home_dir='/storage/liberec3-tul/home/martin_spetlik/')
if tail == 'storage':
# Cluster settings
self.sample_sleep = 30
self.init_sample_timeout = 600
self.sample_timeout = 0
self.pbs_config['qsub'] = '/usr/bin/qsub'
self.flow123d = 'flow123d'
self.gmsh = "/storage/liberec3-tul/home/martin_spetlik/astra/gmsh/bin/gmsh"
else:
# Local settings
self.sample_sleep = 1
self.init_sample_timeout = 60
self.sample_timeout = 60
self.pbs_config['qsub'] = None
self.flow123d = "/home/jb/workspace/flow123d/bin/fterm flow123d dbg"
self.gmsh = "/home/jb/local/gmsh-3.0.5-git-Linux/bin/gmsh"
[docs]
def setup_config(self, n_levels, clean):
"""
Set simulation configuration depending on particular task.
Subclasses **must** override this method and return a configured mlmc.MLMC object.
:param n_levels: int, number of MLMC levels
:param clean: bool, whether to clean/create new files or use existing ones
:return: mlmc.MLMC instance (implementation dependent)
:raises NotImplementedError: always in base class
"""
raise NotImplementedError("Simulation configuration is not set")
[docs]
def rm_files(self, output_dir):
"""
Remove (recursively) output_dir and create an empty directory in its place.
:param output_dir: str path to remove and recreate
:return: None
"""
if os.path.isdir(output_dir):
shutil.rmtree(output_dir, ignore_errors=True)
os.makedirs(output_dir, mode=0o775, exist_ok=True)
[docs]
def create_pbs_object(self, output_dir, clean):
"""
Initialize PBS helper object for submitting/executing jobs.
This creates self.pbs_obj and configures it with common PBS settings.
:param output_dir: str, directory where PBS scripts and job state will be created
:param clean: bool, if True remove existing scripts before creating new ones
:return: None
"""
pbs_work_dir = os.path.join(output_dir, "scripts")
num_jobs = 0
if os.path.isdir(pbs_work_dir):
num_jobs = len([_ for _ in os.listdir(pbs_work_dir)])
# pbs module is expected to be imported where available
self.pbs_obj = pbs.Pbs(pbs_work_dir,
job_count=num_jobs,
qsub=self.pbs_config['qsub'],
clean=clean)
self.pbs_obj.pbs_common_setting(flow_3=True, **self.pbs_config)
[docs]
def generate_jobs(self, mlmc, n_samples=None):
"""
Prepare and kick off sampling jobs for the provided MLMC object.
The method optionally sets the initial n_samples (if provided), refills the sampler
queues and triggers the PBS object execution. It then waits for simulations to finish.
:param mlmc: mlmc.MLMC instance
:param n_samples: None or list specifying number of samples to request for each level
:return: None
"""
if n_samples is not None:
mlmc.set_initial_n_samples(n_samples)
mlmc.refill_samples()
if self.pbs_obj is not None:
self.pbs_obj.execute()
mlmc.wait_for_simulations(sleep=self.sample_sleep, timeout=self.sample_timeout)
[docs]
def set_moments(self, n_moments, log=False):
"""
Create and store a moments function instance (Legendre polynomial family).
:param n_moments: int, number of moments
:param log: bool, whether to apply log-transform to quantity prior to moment evaluation
:return: Legendre moments instance
"""
self.moments_fn = Legendre(n_moments, self.domain, safe_eval=True, log=log)
return self.moments_fn
[docs]
def n_sample_estimate(self, mlmc, target_variance=0.001):
"""
Heuristic routine to estimate a good number of initial samples for MLMC using target variance.
It triggers an initial sampling run, estimates the domain, constructs moments, and requests
additional samples using mlmc.target_var_adding_samples.
:param mlmc: mlmc.MLMC instance
:param target_variance: float target variance for moment estimates
:return: None
"""
mlmc.set_initial_n_samples()
mlmc.refill_samples()
self.pbs_obj.execute()
mlmc.wait_for_simulations(sleep=self.sample_sleep, timeout=self.init_sample_timeout)
self.domain = mlmc.estimator.estimate_domain()
self.set_moments(self.n_moments, log=True)
mlmc.target_var_adding_samples(target_variance, self.moments_fn, pbs=self.pbs_obj)
[docs]
def all_collect(self, sampler_list):
"""
Poll samplers to collect running samples until none are left.
Repeatedly asks each sampler for the number of running jobs and keeps polling until all complete.
:param sampler_list: list of sampler-like objects providing ask_sampling_pool_for_samples(sleep, timeout)
:return: None
"""
running = 1
while running > 0:
running = 0
for sampler in sampler_list:
running += sampler.ask_sampling_pool_for_samples(sleep=self.sample_sleep, timeout=0.1)
print("N running: ", running)
[docs]
def process_analysis(self, cl):
"""
Top-level analysis entry point. Calls specific analysis routines (many commented out).
:param cl: CompareLevels instance (or equivalent) holding estimation/collected data
:return: None
"""
cl.collected_report()
mlmc_level = 1
#self.analyze_pdf_approx(cl)
# analyze_regression_of_variance(cl, mlmc_level)
self.analyze_error_of_variance(cl, mlmc_level)
# analyze_error_of_regression_variance(cl, mlmc_level)
# analyze_error_of_level_variances(cl, mlmc_level)
# analyze_error_of_regression_level_variances(cl, mlmc_level)
# analyze_error_of_log_variance(cl, mlmc_level)
[docs]
def analyze_pdf_approx(self, cl):
"""
Perform PDF approximation experiments and plotting.
:param cl: CompareLevels instance
:return: None
"""
np.random.seed(15)
cl.set_common_domain(0)
print("cl domain:", cl.domain)
cl.reinit(n_moments=35)
il = 1
cl.construct_densities(tol=0.01, reg_param=1)
cl.plot_densities(i_sample_mlmc=0)
[docs]
def analyze_regression_of_variance(self, cl, mlmc_level):
"""
Analyze regression of variance for a selected level.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
mc = cl[mlmc_level]
mc.ref_estimates_bootstrap(10)
sample_vec = [5000, 5000, 1700, 600, 210, 72, 25, 9, 3]
mc.mlmc.subsample(sample_vec[mc.n_levels])
mc.plot_var_regression([1, 2, 4, 8, 16, 20])
[docs]
def analyze_error_of_variance(self, cl, mlmc_level):
"""
Analyze error of variance estimators and plot related diagnostics.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
np.random.seed(20)
cl.plot_variances()
cl.plot_level_variances()
mc = cl[mlmc_level]
mc.plot_bs_var_error_contributions()
[docs]
def analyze_error_of_regression_variance(self, cl, mlmc_level):
"""
Bootstrap-based analysis of regression variance errors.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
sample_vec = [5000, 5000, 1700, 600, 210, 72, 25, 9, 3]
mc = cl[mlmc_level]
mc.ref_estimates_bootstrap(300, sample_vector=sample_vec[mc.n_levels], regression=True)
mc.mlmc.update_moments(cl.moments)
mc.mlmc.subsample()
mc.plot_bs_var_error_contributions()
[docs]
def analyze_error_of_level_variances(self, cl, mlmc_level):
"""
Analyze errors in per-level variance estimates and plot results.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
mc = cl[mlmc_level]
sample_vec = [5000, 5000, 1700, 600, 210, 72, 25, 9, 3]
mc.ref_estimates_bootstrap(300, sample_vector=sample_vec[:mc.n_levels])
mc.mlmc.update_moments(cl.moments)
mc.mlmc.subsample()
mc.plot_bs_level_variances_error()
[docs]
def analyze_error_of_regression_level_variances(self, cl, mlmc_level):
"""
Analyze combined regression and level variance errors with bootstrap.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
mc = cl[mlmc_level]
sample_vec = [5000, 5000, 1700, 600, 210, 72, 25, 9, 3]
mc.ref_estimates_bootstrap(10, sample_vector=sample_vec[:mc.n_levels], regression=True)
mc.mlmc.update_moments(cl.moments)
mc.mlmc.subsample()
mc.plot_bs_level_variances_error()
[docs]
def analyze_error_of_log_variance(self, cl, mlmc_level):
"""
Analyze bootstrap error of log-variance estimates.
:param cl: CompareLevels instance
:param mlmc_level: int index of method/level to analyze
:return: None
"""
sample_vec = [5000, 5000, 1700, 600, 210, 72, 25, 9, 3]
mc = cl[mlmc_level]
mc.ref_estimates_bootstrap(300, sample_vector=sample_vec[:mc.n_levels], log=True)
mc.mlmc.update_moments(cl.moments)
mc.mlmc.subsample()
mc.plot_bs_var_log_var()