Source code for mlmc.sampler

import time
import numpy as np
from typing import List
from mlmc.sample_storage import SampleStorage
from mlmc.sampling_pool import SamplingPool
from mlmc.sim.simulation import Simulation


[docs] class Sampler: """ Manages sample scheduling, result collection, and persistent storage. Coordinates the sampling pool, simulation factory, and sample storage: - schedules new samples according to target counts, - collects finished samples and writes them to storage, - handles failed samples and runtime (n_ops) bookkeeping. """ ADDING_SAMPLES_TIMEOUT = 1e-15
[docs] def __init__(self, sample_storage: SampleStorage, sampling_pool: SamplingPool, sim_factory: Simulation, level_parameters: List[List[float]], seed=1234): """ Initialize sampler and prepare per-level simulation objects. :param sample_storage: store scheduled samples, results and result structure :param sampling_pool: sampling pool responsible for executing simulations :param sim_factory: factory that creates level Simulation instances and provides result_format() :param level_parameters: List of per-level parameters (e.g. simulation steps) :param seed: global RNG seed used to seed NumPy's RNG """ np.random.seed(seed) self.sample_storage = sample_storage self._sampling_pool = sampling_pool # Target number of samples per level (may be updated later) self._n_target_samples = np.zeros(len(level_parameters)) # Create LevelSimulation objects for each level using the provided factory self._level_sim_objects = [] self._create_level_sim_objects(level_parameters, sim_factory) # Persist global data (level parameters and result format) into storage sample_storage.save_global_data(level_parameters=level_parameters, result_format=sim_factory.result_format()) # Load already scheduled samples (if any) from storage self._n_scheduled_samples = [len(level_scheduled) for level_id, level_scheduled in sample_storage.load_scheduled_samples().items()] # If there are no scheduled samples yet, initialize to zeros if not self._n_scheduled_samples: self._n_scheduled_samples = np.zeros(len(level_parameters)) # Check for unfinished samples and inform the sampling pool self._check_failed_samples()
# @TODO: If sampler is restarted, collect any samples finished while offline: # - add permanent samples into pool queues, # - before scheduling new samples, call get_finished to know how many are already done. @property def n_levels(self): """Return number of MLMC levels managed by this sampler.""" return len(self._level_sim_objects) @property def n_finished_samples(self): """ Retrieve numbers of finished samples for all levels. :return: array-like containing finished counts per level """ return self.sample_storage.n_finished() def _create_level_sim_objects(self, level_parameters, sim_factory): """ Create LevelSimulation object for each level via the simulation factory. :param level_parameters: List of per-level parameters :param sim_factory: Simulation factory providing level_instance and calculate methods :return: None """ n_levels = len(level_parameters) for level_id in range(n_levels): if level_id == 0: level_sim = sim_factory.level_instance(level_parameters[level_id], [0]) else: level_sim = sim_factory.level_instance(level_parameters[level_id], level_parameters[level_id - 1]) # Attach factory methods and metadata to the LevelSimulation level_sim._calculate = sim_factory.calculate level_sim._result_format = sim_factory.result_format level_sim._level_id = level_id self._level_sim_objects.append(level_sim) def sample_range(self, n0, nL): """ Generate a geometric sequence of length L decreasing from n0 to nL. Useful to generate a set of target sample counts across levels. :param n0: int, number of samples at finest level :param nL: int, number of samples at coarsest level :return: np.ndarray of length self.n_levels with integer sample counts """ return np.round(np.exp2(np.linspace(np.log2(n0), np.log2(nL), self.n_levels))).astype(int) def set_initial_n_samples(self, n_samples=None): """ Set initial target number of samples for each level. Accepts: - None (defaults to [100, 10]), - single integer (interpreted as n0, with default nL=10), - two-element list [n0, nL] (geometric interpolation across levels). :param n_samples: scalar, length-2 list, or array specifying target counts :return: None """ if n_samples is None: n_samples = [100, 10] n_samples = np.atleast_1d(n_samples) # Single value -> treat as n0 with default nL if len(n_samples) == 1: n_samples = np.array([n_samples[0], 10]) # Two values -> create geometric progression across levels if len(n_samples) == 2: n0, nL = n_samples n_samples = self.sample_range(n0, nL) self._n_target_samples = n_samples def _get_sample_tag(self, level_id): """ Create a unique sample tag for a given level. :param level_id: identifier of current level :return: str unique sample tag (e.g. 'L00_S0000123') """ return "L{:02d}_S{:07d}".format(level_id, int(self._n_scheduled_samples[level_id])) def schedule_samples(self, timeout=None, level_id=None, n_samples=None): """ Schedule new simulation samples in the sampling pool and record them in storage. For each scheduled sample: 1) generate a unique sample id shared by fine and coarse tasks, 2) obtain the LevelSimulation instance for the level, 3) schedule the sample with SamplingPool, 4) store scheduled sample ids in SampleStorage. :param timeout: float or None, passed to ask_sampling_pool_for_samples() before scheduling :param level_id: int or None, if provided schedule only for this level (default: highest level) :param n_samples: int or None, if provided schedule exactly this many samples for the specified level :return: None """ # First, collect any finished samples self.ask_sampling_pool_for_samples(timeout=timeout) plan_samples = self._n_target_samples - self._n_scheduled_samples # Default to the coarsest level if not specified if level_id is None: level_id = len(plan_samples) - 1 # If a specific number of samples for one level is requested if n_samples is not None: samples = [] for _ in range(int(n_samples)): sample_id = self._get_sample_tag(level_id) level_sim = self._level_sim_objects[level_id] self._sampling_pool.schedule_sample(sample_id, level_sim) self._n_scheduled_samples[level_id] += 1 samples.append(sample_id) self.sample_storage.save_scheduled_samples(level_id, samples) else: # Iterate levels from coarsest to finest and schedule required samples for n_samples in np.flip(plan_samples): samples = [] for _ in range(int(n_samples)): sample_id = self._get_sample_tag(level_id) level_sim = self._level_sim_objects[level_id] self._sampling_pool.schedule_sample(sample_id, level_sim) self._n_scheduled_samples[level_id] += 1 samples.append(sample_id) self.sample_storage.save_scheduled_samples(level_id, samples) level_id -= 1 def _check_failed_samples(self): """ Query storage for unfinished sample IDs and inform the sampling pool. This allows the sampling pool to reattach or handle 'permanent' samples that may have been started previously. :return: None """ unfinished_sample_ids = self.sample_storage.unfinished_ids() self._sampling_pool.have_permanent_samples(unfinished_sample_ids) def ask_sampling_pool_for_samples(self, sleep=0, timeout=None): """ Poll the sampling pool for finished simulations and store their results. :param sleep: float, time to sleep between polls (seconds) :param timeout: float or None, maximum time to wait; if <= 0 returns immediately :return: int, number of running simulations remaining after the call """ if timeout is None: timeout = 0 elif timeout <= 0: return 1 n_running = 1 t0 = time.perf_counter() while n_running > 0: successful_samples, failed_samples, n_running, n_ops = self._sampling_pool.get_finished() # Persist finished samples and operation counts self._store_samples(successful_samples, failed_samples, n_ops) time.sleep(sleep) if 0 < timeout < (time.perf_counter() - t0): break return n_running def _store_samples(self, successful_samples, failed_samples, n_ops): """ Persist finished samples and operation time estimates to storage. :param successful_samples: Dict[level_id, List[Tuple[sample_id:str, (fine, coarse)]]] :param failed_samples: Dict[level_id, List[Tuple[sample_id:str, error_message:str]]] :param n_ops: Dict[level_id, Tuple[total_time:float, n_success_samples:int]] :return: None """ self.sample_storage.save_samples(successful_samples, failed_samples) self.sample_storage.save_n_ops(n_ops) def process_adding_samples(self, n_estimated, sleep=0, add_coeff=0.1, timeout=ADDING_SAMPLES_TIMEOUT): """ Add newly estimated samples in batches, scheduling a fraction of the difference between current scheduled and newly estimated targets. Note: n_estimated may be unreliable if per-level n_ops are similar across levels. :param n_estimated: array-like, estimated target samples per level :param sleep: float, time to sleep while waiting for results :param add_coeff: float in (0,1], fraction of the difference to schedule each iteration (default 0.1) :param timeout: float, timeout passed to ask_sampling_pool_for_samples() :return: bool, True if scheduled counts reached the estimates for all levels """ # Ensure storage reflects any finished work self.ask_sampling_pool_for_samples(timeout=timeout) # Currently scheduled samples per level n_scheduled = self.l_scheduled_samples() # Compute new scheduled values (add_coeff fraction of the remaining difference) new_scheduled = np.where((n_estimated * add_coeff) > (n_estimated - n_scheduled), n_estimated, n_scheduled + (n_estimated - n_scheduled) * add_coeff) n_scheduled = np.ceil(np.where(n_estimated < n_scheduled, n_scheduled, new_scheduled)) # Levels where estimated > scheduled greater_items = np.where(np.greater(n_estimated, n_scheduled))[0] # Schedule and wait until at least a fraction of newly scheduled samples finish self.set_scheduled_and_wait(n_scheduled, greater_items, sleep, timeout=timeout) return np.all(n_estimated[greater_items] == n_scheduled[greater_items]) def set_scheduled_and_wait(self, n_scheduled, greater_items, sleep, fin_sample_coef=0.5, timeout=1e-7): """ Set scheduled sample targets and wait until a proportion of those samples finish. :param n_scheduled: ndarray, target number of scheduled samples per level :param greater_items: iterable of indices where targets were increased :param sleep: float, time to sleep between polls :param fin_sample_coef: float in (0,1], fraction of scheduled samples that should finish before continuing :param timeout: float, timeout passed to ask_sampling_pool_for_samples() :return: None """ # Update internal targets and schedule required samples self.set_level_target_n_samples(n_scheduled) self.schedule_samples(timeout=timeout) # Current finished counts n_finished = self.n_finished_samples # Wait until at least fin_sample_coef fraction of scheduled samples are finished for affected levels while np.any(n_finished[greater_items] < fin_sample_coef * n_scheduled[greater_items]): time.sleep(sleep) self.ask_sampling_pool_for_samples(timeout=timeout) n_finished = self.n_finished_samples def set_level_target_n_samples(self, n_samples): """ Update the per-level target sample counts to at least the provided values. :param n_samples: iterable of new target samples per level :return: None """ for level, n in enumerate(n_samples): self._n_target_samples[level] = max(self._n_target_samples[level], n) def l_scheduled_samples(self): """ Return the currently scheduled sample counts per level. :return: list or array-like of scheduled sample counts """ return self._n_scheduled_samples def renew_failed_samples(self): """ Reschedule previously failed samples. Retrieves failed sample IDs from storage, re-schedules them in the sampling pool, and clears failed records from storage. :return: None """ failed_samples = self.sample_storage.failed_samples() for level_id, sample_ids in failed_samples.items(): samples = [] level_id = int(level_id) for sample_id in sample_ids: level_sim = self._level_sim_objects[level_id] self._sampling_pool.schedule_sample(sample_id, level_sim) samples.append(sample_id) # Clear failed sample records after rescheduling self.sample_storage.clear_failed()