import itertools
import numpy as np
from abc import ABCMeta, abstractmethod
from typing import List, Dict, Any, Generator, Optional, Tuple
from mlmc.quantity.quantity_spec import QuantitySpec, ChunkSpec
[docs]
class SampleStorage(metaclass=ABCMeta):
"""
Provides methods to store and retrieve sample data.
Abstract base class for all storage backends.
"""
@abstractmethod
def save_samples(self, successful_samples, failed_samples):
"""
Write simulation results to storage.
:param successful_samples: Dict[level_id, List[Tuple[sample_id, (fine, coarse)]]]
:param failed_samples: Dict[level_id, List[Tuple[sample_id, error_message]]]
"""
@abstractmethod
def save_result_format(self, res_spec: List[QuantitySpec]):
"""
Save result format.
:param res_spec: List of quantity specifications describing result structure.
"""
@abstractmethod
def load_result_format(self) -> List[QuantitySpec]:
"""
Load stored result format.
:return: List[QuantitySpec]
"""
@abstractmethod
def save_global_data(self, result_format: List[QuantitySpec], level_parameters=None):
"""
Save global metadata such as result format and level parameters.
:param result_format: List[QuantitySpec]
:param level_parameters: Optional metadata per level
"""
@abstractmethod
def save_scheduled_samples(self, level_id, samples):
"""
Save scheduled sample identifiers.
:param level_id: int
:param samples: List[str]
"""
@abstractmethod
def load_scheduled_samples(self) -> Dict[int, List[str]]:
"""
Load scheduled sample IDs.
:return: Dict[level_id, List[sample_id]]
"""
@abstractmethod
def sample_pairs(self):
"""
Retrieve all stored fine–coarse result pairs.
:return: List[np.ndarray[M, N, 2]]
"""
def chunks(self, level_id: Optional[int] = None, n_samples: Optional[int] = None) -> Generator[ChunkSpec, None, None]:
"""
Create a generator yielding chunk specifications for collected data.
:param level_id: int, if provided, return chunks only for the given level.
:param n_samples: int, maximum number of samples to retrieve.
:return: generator of ChunkSpec objects.
"""
assert isinstance(n_samples, (type(None), int)), "n_samples must be int or None"
level_ids = [level_id] if level_id is not None else self.get_level_ids()
return itertools.chain(*[self._level_chunks(lid, n_samples) for lid in level_ids])
@abstractmethod
def _level_chunks(self, level_id, n_samples=None):
"""
Get chunk information for data collected at a given level.
:param level_id: int
:param n_samples: int
:return: generator of ChunkSpec objects
"""
@abstractmethod
def n_finished(self):
"""
Get number of finished samples on each level.
:return: List[int]
"""
@abstractmethod
def save_n_ops(self, n_ops: Dict[int, Tuple[float, int]]):
"""
Save number of operations (time).
:param n_ops: Dict[level_id, Tuple[total_time, n_valid_samples]]
"""
@abstractmethod
def get_n_ops(self):
"""
Get number of operations per sample for each level.
:return: List[float]
"""
@abstractmethod
def unfinished_ids(self):
"""
Get IDs of unfinished samples.
:return: List[str]
"""
@abstractmethod
def get_level_ids(self):
"""
Get list of available level IDs.
:return: List[int]
"""
@abstractmethod
def get_n_levels(self):
"""
Get total number of levels.
:return: int
"""
@abstractmethod
def get_level_parameters(self):
"""
Get stored level parameters.
:return: List[Any]
"""
@abstractmethod
def get_n_collected(self):
"""
Get number of collected results at each level.
:return: List[int]
"""
[docs]
class Memory(SampleStorage):
"""
Sample's data are stored in the main memory
"""
[docs]
def __init__(self):
self._failed = {}
self._results = {}
self._successful_sample_ids = {}
self._scheduled = {}
self._result_specification = []
self._n_ops = {}
self._n_finished = {}
self._level_parameters = []
super().__init__()
def save_samples(self, successful_samples, failed_samples):
"""
Save successful samples - store result pairs
failed samples - store sample ids and corresponding error messages
:return:
"""
self._save_successful(successful_samples)
self._save_failed(failed_samples)
def save_global_data(self, result_format, level_parameters=None):
self.save_result_format(result_format)
self._level_parameters = level_parameters
def _save_successful(self, samples):
"""
Save successful samples
:param samples: List[Tuple[sample_id: str, Tuple[ndarray, ndarray]]]
:return: None
"""
for level_id, res in samples.items():
res = np.array(res, dtype=object)
fine_coarse_res = res[:, 1]
result_type = np.dtype((float, np.array(fine_coarse_res[0], dtype=object).shape))
results = np.empty(shape=(len(res),), dtype=result_type)
for idx, val in enumerate(fine_coarse_res):
results[idx, 0] = val[0]
results[idx, 1] = val[1]
# Save sample ids
self._successful_sample_ids.setdefault(level_id, []).extend(res[:, 0])
if level_id not in self._n_finished:
self._n_finished[level_id] = 0
self._n_finished[level_id] += results.shape[0]
if level_id not in self._results:
self._results[level_id] = results
else:
self._results[level_id] = np.concatenate((self._results[level_id], results), axis=0)
def _save_failed(self, samples):
"""
Save failed ids and error messages
:param samples: List[Tuple[sample_id: str, error_message: str]]
:return: None
"""
for level_id, res in samples.items():
self._failed.setdefault(level_id, []).extend(res)
if level_id not in self._n_finished:
self._n_finished[level_id] = 0
else:
self._n_finished[level_id] += len(res)
def save_result_format(self, res_spec: List[QuantitySpec]):
"""
Save sample result format
:param res_spec: List[QuantitySpec]
:return: None
"""
self._result_specification = res_spec
def n_finished(self):
"""
Number of finished samples on each level
:return: List
"""
n_finished = np.empty(max(self._n_finished.items(), key=lambda k: k[0])[0]+1)
for level_id, n_fin in self._n_finished.items():
n_finished[level_id] = n_fin
return n_finished
def load_result_format(self) -> List[QuantitySpec]:
"""
Load result format
"""
return self._result_specification
def save_scheduled_samples(self, level_id, samples):
"""
Save scheduled sample ids
:param level_id: int
:param samples: List[str]
:return: None
"""
self._scheduled.setdefault(level_id, []).extend(samples)
def load_scheduled_samples(self):
"""
:return: Dict[_level_id, List[sample_id: str]]
"""
return self._scheduled
def sample_pairs(self):
"""
Sample results split to numpy arrays
:return: List[Array[M, N, 2]]
"""
levels_results = list(np.empty(len(self._results)))
for level_id in self.get_level_ids():
results = self.sample_pairs_level(ChunkSpec(level_id=level_id))
levels_results[level_id] = results
return levels_results
def _level_chunks(self, level_id, n_samples=None):
yield ChunkSpec(chunk_id=0, chunk_slice=slice(0, len(self._results[level_id][:n_samples]), 1), level_id=level_id)
def sample_pairs_level(self, chunk_spec):
"""
Get samples for given level, chunks does not make sense in Memory storage so all data are retrieved at once
:param chunk_spec: object containing chunk identifier level identifier and chunk_slice - slice() object
:return: np.ndarray
"""
results = self._results[int(chunk_spec.level_id)]
if chunk_spec.chunk_slice is not None:
chunk = results[chunk_spec.chunk_slice]
else:
chunk = results
# Handle scalar simulation result
#@TODO: think it over again
if len(chunk.shape) != 3:
chunk = chunk.reshape(chunk.shape[0], chunk.shape[1],
1 if np.prod(chunk.shape) == chunk.shape[0] * chunk.shape[1] else
int(np.prod(chunk.shape) / chunk.shape[0] * chunk.shape[1]))
# Remove auxiliary zeros from level zero sample pairs
if chunk_spec.level_id == 0:
chunk = chunk[:, :1, :]
return chunk.transpose((2, 0, 1)) # [M, chunk size, 2]
def save_n_ops(self, n_ops):
"""
Save number of operations
:param n_ops: Dict[_level_id, List[time, number of valid samples]]
:return: None
"""
for level, (time, n_samples) in n_ops:
if level not in self._n_ops:
self._n_ops[level] = 0
if n_samples != 0:
self._n_ops[level] += time/n_samples
def get_n_ops(self):
"""
Get number of operations on each level
:return: List[float]
"""
n_ops = list(np.empty(len(np.max(self._n_ops.keys()))))
for level, time in self._n_ops.items():
n_ops[level] = time
return n_ops
def unfinished_ids(self):
"""
We finished all samples in memory
:return:
"""
return []
def get_level_ids(self):
return list(self._results.keys())
def get_n_collected(self):
"""
Number of collected samples at each level
:return: List
"""
n_collected = list(np.zeros(len(self._results)))
for level_id in self.get_level_ids():
n_collected[int(level_id)] = len(self._results[int(level_id)])
return n_collected
def get_n_levels(self):
"""
Get number of levels
:return: int
"""
return len(self._results)
def get_level_parameters(self):
return self._level_parameters