Source code for mlmc.sample_storage

import itertools
import numpy as np
from abc import ABCMeta, abstractmethod
from typing import List, Dict, Any, Generator, Optional, Tuple
from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec


[docs] class SampleStorage(metaclass=ABCMeta): """ Provides methods to store and retrieve sample data. Abstract base class for all storage backends. """ @abstractmethod def save_samples(self, successful_samples, failed_samples): """ Write simulation results to storage. :param successful_samples: Dict[level_id, List[Tuple[sample_id, (fine, coarse)]]] :param failed_samples: Dict[level_id, List[Tuple[sample_id, error_message]]] """ @abstractmethod def save_result_format(self, res_spec: List[QuantitySpec]): """ Save result format. :param res_spec: List of quantity specifications describing result structure. """ @abstractmethod def load_result_format(self) -> List[QuantitySpec]: """ Load stored result format. :return: List[QuantitySpec] """ @abstractmethod def save_global_data(self, result_format: List[QuantitySpec], level_parameters=None): """ Save global metadata such as result format and level parameters. :param result_format: List[QuantitySpec] :param level_parameters: Optional metadata per level """ @abstractmethod def save_scheduled_samples(self, level_id, samples): """ Save scheduled sample identifiers. :param level_id: int :param samples: List[str] """ @abstractmethod def load_scheduled_samples(self) -> Dict[int, List[str]]: """ Load scheduled sample IDs. :return: Dict[level_id, List[sample_id]] """ @abstractmethod def sample_pairs(self): """ Retrieve all stored fine–coarse result pairs. :return: List[np.ndarray[M, N, 2]] """ def chunks(self, level_id: Optional[int] = None, n_samples: Optional[int] = None) -> Generator[ChunkSpec, None, None]: """ Create a generator yielding chunk specifications for collected data. :param level_id: int, if provided, return chunks only for the given level. :param n_samples: int, maximum number of samples to retrieve. :return: generator of ChunkSpec objects. """ assert isinstance(n_samples, (type(None), int)), "n_samples must be int or None" level_ids = [level_id] if level_id is not None else self.get_level_ids() return itertools.chain(*[self._level_chunks(lid, n_samples) for lid in level_ids]) @abstractmethod def _level_chunks(self, level_id, n_samples=None): """ Get chunk information for data collected at a given level. :param level_id: int :param n_samples: int :return: generator of ChunkSpec objects """ @abstractmethod def n_finished(self): """ Get number of finished samples on each level. :return: List[int] """ @abstractmethod def save_n_ops(self, n_ops: Dict[int, Tuple[float, int]]): """ Save number of operations (time). :param n_ops: Dict[level_id, Tuple[total_time, n_valid_samples]] """ @abstractmethod def get_n_ops(self): """ Get number of operations per sample for each level. :return: List[float] """ @abstractmethod def unfinished_ids(self): """ Get IDs of unfinished samples. :return: List[str] """ @abstractmethod def get_level_ids(self): """ Get list of available level IDs. :return: List[int] """ @abstractmethod def get_n_levels(self): """ Get total number of levels. :return: int """ @abstractmethod def get_level_parameters(self): """ Get stored level parameters. :return: List[Any] """ @abstractmethod def get_n_collected(self): """ Get number of collected results at each level. :return: List[int] """
[docs] class Memory(SampleStorage): """ Sample's data are stored in the main memory """
[docs] def __init__(self): self._failed = {} self._results = {} self._successful_sample_ids = {} self._scheduled = {} self._result_specification = [] self._n_ops = {} self._n_finished = {} self._level_parameters = [] super().__init__()
def save_samples(self, successful_samples, failed_samples): """ Save successful samples - store result pairs failed samples - store sample ids and corresponding error messages :return: """ self._save_successful(successful_samples) self._save_failed(failed_samples) def save_global_data(self, result_format, level_parameters=None): self.save_result_format(result_format) self._level_parameters = level_parameters def _save_successful(self, samples): """ Save successful samples :param samples: List[Tuple[sample_id: str, Tuple[ndarray, ndarray]]] :return: None """ for level_id, res in samples.items(): res = np.array(res, dtype=object) fine_coarse_res = res[:, 1] result_type = np.dtype((float, np.array(fine_coarse_res[0], dtype=object).shape)) results = np.empty(shape=(len(res),), dtype=result_type) for idx, val in enumerate(fine_coarse_res): results[idx, 0] = val[0] results[idx, 1] = val[1] # Save sample ids self._successful_sample_ids.setdefault(level_id, []).extend(res[:, 0]) if level_id not in self._n_finished: self._n_finished[level_id] = 0 self._n_finished[level_id] += results.shape[0] if level_id not in self._results: self._results[level_id] = results else: self._results[level_id] = np.concatenate((self._results[level_id], results), axis=0) def _save_failed(self, samples): """ Save failed ids and error messages :param samples: List[Tuple[sample_id: str, error_message: str]] :return: None """ for level_id, res in samples.items(): self._failed.setdefault(level_id, []).extend(res) if level_id not in self._n_finished: self._n_finished[level_id] = 0 else: self._n_finished[level_id] += len(res) def save_result_format(self, res_spec: List[QuantitySpec]): """ Save sample result format :param res_spec: List[QuantitySpec] :return: None """ self._result_specification = res_spec def n_finished(self): """ Number of finished samples on each level :return: List """ n_finished = np.empty(max(self._n_finished.items(), key=lambda k: k[0])[0]+1) for level_id, n_fin in self._n_finished.items(): n_finished[level_id] = n_fin return n_finished def load_result_format(self) -> List[QuantitySpec]: """ Load result format """ return self._result_specification def save_scheduled_samples(self, level_id, samples): """ Save scheduled sample ids :param level_id: int :param samples: List[str] :return: None """ self._scheduled.setdefault(level_id, []).extend(samples) def load_scheduled_samples(self): """ :return: Dict[_level_id, List[sample_id: str]] """ return self._scheduled def sample_pairs(self): """ Sample results split to numpy arrays :return: List[Array[M, N, 2]] """ levels_results = list(np.empty(len(self._results))) for level_id in self.get_level_ids(): results = self.sample_pairs_level(ChunkSpec(level_id=level_id)) levels_results[level_id] = results return levels_results def _level_chunks(self, level_id, n_samples=None): yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, len(self._results[level_id][:n_samples]), 1), level_id=level_id) def sample_pairs_level(self, chunk_spec): """ Get samples for given level, chunks does not make sense in Memory storage so all data are retrieved at once :param chunk_spec: object containing chunk identifier level identifier and chunk_slice - slice() object :return: np.ndarray """ results = self._results[int(chunk_spec.level_id)] if chunk_spec.chunk_slice is not None: chunk = results[chunk_spec.chunk_slice] else: chunk = results # Handle scalar simulation result #@TODO: think it over again if len(chunk.shape) != 3: chunk = chunk.reshape(chunk.shape[0], chunk.shape[1], 1 if np.prod(chunk.shape) == chunk.shape[0] * chunk.shape[1] else int(np.prod(chunk.shape) / chunk.shape[0] * chunk.shape[1])) # Remove auxiliary zeros from level zero sample pairs if chunk_spec.level_id == 0: chunk = chunk[:, :1, :] return chunk.transpose((2, 0, 1)) # [M, chunk size, 2] def save_n_ops(self, n_ops): """ Save number of operations :param n_ops: Dict[_level_id, List[time, number of valid samples]] :return: None """ for level, (time, n_samples) in n_ops: if level not in self._n_ops: self._n_ops[level] = 0 if n_samples != 0: self._n_ops[level] += time/n_samples def get_n_ops(self): """ Get number of operations on each level :return: List[float] """ n_ops = list(np.empty(len(np.max(self._n_ops.keys())))) for level, time in self._n_ops.items(): n_ops[level] = time return n_ops def unfinished_ids(self): """ We finished all samples in memory :return: """ return [] def get_level_ids(self): return list(self._results.keys()) def get_n_collected(self): """ Number of collected samples at each level :return: List """ n_collected = list(np.zeros(len(self._results))) for level_id in self.get_level_ids(): n_collected[int(level_id)] = len(self._results[int(level_id)]) return n_collected def get_n_levels(self): """ Get number of levels :return: int """ return len(self._results) def get_level_parameters(self): return self._level_parameters