diff --git a/bloptools/__init__.py b/bloptools/__init__.py index 8ba74e4..80edaf0 100644 --- a/bloptools/__init__.py +++ b/bloptools/__init__.py @@ -1,4 +1,3 @@ -from . import bayesian, devices, tasks # noqa F401 from ._version import get_versions __version__ = get_versions()["version"] diff --git a/bloptools/bayesian/__init__.py b/bloptools/bayesian/__init__.py index 280e509..822f7be 100644 --- a/bloptools/bayesian/__init__.py +++ b/bloptools/bayesian/__init__.py @@ -1,1065 +1,3 @@ -import logging -import os -import time as ttime -import uuid -import warnings -from collections import OrderedDict -from collections.abc import Mapping - -import bluesky.plan_stubs as bps -import bluesky.plans as bp # noqa F401 -import botorch -import gpytorch -import h5py -import IPython as ip -import matplotlib as mpl -import numpy as np -import pandas as pd -import scipy as sp -import torch -from botorch.models.deterministic import GenericDeterministicModel -from botorch.models.model_list_gp_regression import ModelListGP -from matplotlib import pyplot as plt -from matplotlib.patches import Patch - -from .. import utils -from . import acquisition, models -from .acquisition import default_acquisition_plan -from .digestion import default_digestion_function - -os.environ["KMP_DUPLICATE_LIB_OK"] = "True" - -warnings.filterwarnings("ignore", category=botorch.exceptions.warnings.InputDataWarning) - -mpl.rc("image", cmap="coolwarm") - - -MAX_TEST_INPUTS = 2**11 - -TASK_CONFIG = {} - -TASK_TRANSFORMS = {"log": lambda x: np.log(x)} - -DEFAULT_COLOR_LIST = ["dodgerblue", "tomato", "mediumseagreen", "goldenrod"] -DEFAULT_COLORMAP = "viridis" -DEFAULT_SCATTER_SIZE = 16 - -DEFAULT_MINIMUM_SNR = 1e1 - - -def _validate_and_prepare_dofs(dofs): - for i_dof, dof in enumerate(dofs): - if not isinstance(dof, Mapping): - raise ValueError("Supplied dofs must be an iterable of mappings (e.g. a dict)!") - if "device" not in dof.keys(): - raise ValueError("Each DOF must have a device!") - - dof["device"].kind = "hinted" - dof["name"] = dof["device"].name if hasattr(dof["device"], "name") else f"x{i_dof+1}" - - if "limits" not in dof.keys(): - dof["limits"] = (-np.inf, np.inf) - dof["limits"] = tuple(np.array(dof["limits"], dtype=float)) - - if "tags" not in dof.keys(): - dof["tags"] = [] - - # dofs are passive by default - dof["kind"] = dof.get("kind", "passive") - if dof["kind"] not in ["active", "passive"]: - raise ValueError('DOF kinds must be one of "active" or "passive"') - - # active dofs are on by default, passive dofs are off by default - dof["mode"] = dof.get("mode", "on" if dof["kind"] == "active" else "off") - if dof["mode"] not in ["on", "off"]: - raise ValueError('DOF modes must be one of "on" or "off"') - - dof_names = [dof["device"].name for dof in dofs] - - # check that dof names are unique - unique_dof_names, counts = np.unique(dof_names, return_counts=True) - duplicate_dof_names = unique_dof_names[counts > 1] - if len(duplicate_dof_names) > 0: - raise ValueError(f'Duplicate name(s) in supplied dofs: "{duplicate_dof_names}"') - - return list(dofs) - - -def _validate_and_prepare_tasks(tasks): - for task in tasks: - if not isinstance(task, Mapping): - raise ValueError("Supplied tasks must be an iterable of mappings (e.g. a dict)!") - if task["kind"] not in ["minimize", "maximize"]: - raise ValueError('"mode" must be specified as either "minimize" or "maximize"') - if "name" not in task.keys(): - task["name"] = task["key"] - if "weight" not in task.keys(): - task["weight"] = 1 - if "limits" not in task.keys(): - task["limits"] = (-np.inf, np.inf) - if "min_snr" not in task.keys(): - task["min_snr"] = DEFAULT_MINIMUM_SNR - - task_keys = [task["key"] for task in tasks] - unique_task_keys, counts = np.unique(task_keys, return_counts=True) - duplicate_task_keys = unique_task_keys[counts > 1] - if len(duplicate_task_keys) > 0: - raise ValueError(f'Duplicate key(s) in supplied tasks: "{duplicate_task_keys}"') - - return list(tasks) - - -class Agent: - def __init__( - self, - dofs, - tasks, - db, - **kwargs, - ): - """ - A Bayesian optimization self. - - Parameters - ---------- - dofs : iterable of ophyd objects - The degrees of freedom that the agent can control, which determine the output of the model. - bounds : iterable of lower and upper bounds - The bounds on each degree of freedom. This should be an array of shape (n_dofs, 2). - tasks : iterable of tasks - The tasks which the agent will try to optimize. - acquisition : Bluesky plan generator that takes arguments (dofs, inputs, dets) - A plan that samples the beamline for some given inputs. - digestion : function that takes arguments (db, uid) - A function to digest the output of the acquisition. - db : A databroker instance. - """ - - # DOFs are parametrized by kind ("active" or "passive") and mode ("on" or "off") - # - # below are the behaviors of DOFs of each kind and mode: - # - # "read": the agent will read the input on every acquisition (all dofs are always read) - # "move": the agent will try to set and optimize over these (there must be at least one of these) - # "input" means that the agent will use the value to make its posterior - # - # - # active passive - # +---------------------+---------------+ - # on | read, input, move | read, input | - # +---------------------+---------------+ - # off | read | read | - # +---------------------+---------------+ - # - # - - self.dofs = _validate_and_prepare_dofs(np.atleast_1d(dofs)) - self.tasks = _validate_and_prepare_tasks(np.atleast_1d(tasks)) - self.db = db - - self.verbose = kwargs.get("verbose", False) - self.allow_acquisition_errors = kwargs.get("allow_acquisition_errors", True) - self.initialization = kwargs.get("initialization", None) - self.acquisition_plan = kwargs.get("acquisition_plan", default_acquisition_plan) - self.digestion = kwargs.get("digestion", default_digestion_function) - self.dets = list(np.atleast_1d(kwargs.get("dets", []))) - - self.trigger_delay = kwargs.get("trigger_delay", 0) - - self.acq_func_config = kwargs.get("acq_func_config", acquisition.config) - - self.sample_center_on_init = kwargs.get("sample_center_on_init", False) - - self.table = pd.DataFrame() - - self.initialized = False - self._train_models = True - self.a_priori_hypers = None - - self.plots = {"tasks": {}} - - def tell(self, new_table=None, append=True, train=True, **kwargs): - """ - Inform the agent about new inputs and targets for the model. - """ - - new_table = pd.DataFrame() if new_table is None else new_table - self.table = pd.concat([self.table, new_table]) if append else new_table - self.table.index = np.arange(len(self.table)) - - skew_dims = self.latent_dim_tuples - - if self.initialized: - cached_hypers = self.hypers - - inputs = self.model_inputs.values - - for i, task in enumerate(self.tasks): - self.table.loc[:, f"{task['key']}_fitness"] = targets = self._get_task_fitness(i) - train_index = ~np.isnan(targets) - - if not train_index.sum() >= 2: - raise ValueError("There must be at least two valid data points per task!") - - train_inputs = torch.tensor(inputs[train_index]).double() - train_targets = torch.tensor(targets[train_index]).double().unsqueeze(-1) # .unsqueeze(0) - - # for constructing the log normal noise prior - # target_snr = 2e2 - # scale = 2e0 - # loc = np.log(1 / target_snr**2) + scale**2 - - likelihood = gpytorch.likelihoods.GaussianLikelihood( - noise_constraint=gpytorch.constraints.Interval( - torch.tensor(1e-4).square(), - torch.tensor(1 / task["min_snr"]).square(), - ), - # noise_prior=gpytorch.priors.torch_priors.LogNormalPrior(loc=loc, scale=scale), - ).double() - - outcome_transform = botorch.models.transforms.outcome.Standardize(m=1) # , batch_shape=torch.Size((1,))) - - task["model"] = models.LatentGP( - train_inputs=train_inputs, - train_targets=train_targets, - likelihood=likelihood, - skew_dims=skew_dims, - input_transform=self._subset_input_transform(mode="on"), - outcome_transform=outcome_transform, - ).double() - - dirichlet_likelihood = gpytorch.likelihoods.DirichletClassificationLikelihood( - self.all_tasks_valid.long(), learn_additional_noise=True - ).double() - - self.classifier = models.LatentDirichletClassifier( - train_inputs=torch.tensor(inputs).double(), - train_targets=dirichlet_likelihood.transformed_targets.transpose(-1, -2).double(), - skew_dims=skew_dims, - likelihood=dirichlet_likelihood, - input_transform=self._subset_input_transform(mode="on"), - ).double() - - if self.a_priori_hypers is not None: - self._set_hypers(self.a_priori_hypers) - elif not train: - self._set_hypers(cached_hypers) - else: - try: - self.train_models() - except botorch.exceptions.errors.ModelFittingError: - if self.initialized: - self._set_hypers(cached_hypers) - else: - raise RuntimeError("Could not fit model on initialization!") - - self.constraint = GenericDeterministicModel(f=lambda x: self.classifier.probabilities(x)[..., -1]) - - def ask(self, acq_func_identifier="qei", n=1, route=True, sequential=True, **acq_func_kwargs): - """ - Ask the agent for the best point to sample, given an acquisition function. - - acq_func_identifier: which acquisition function to use - n: how many points to get - """ - - acq_func_name = acquisition.parse_acq_func(acq_func_identifier) - acq_func_type = acquisition.config[acq_func_name]["type"] - - start_time = ttime.monotonic() - - if acq_func_type in ["analytic", "monte_carlo"]: - if not self.initialized: - raise RuntimeError( - f'Can\'t construct non-trivial acquisition function "{acq_func_identifier}"' - f" (the agent is not initialized!)" - ) - - if acq_func_type == "analytic" and n > 1: - raise ValueError("Can't generate multiple design points for analytic acquisition functions.") - - acq_func, acq_func_meta = acquisition.get_acquisition_function(self, acq_func_identifier=acq_func_identifier) - - NUM_RESTARTS = 8 - RAW_SAMPLES = 1024 - - candidates, _ = botorch.optim.optimize_acqf( - acq_function=acq_func, - bounds=self.acq_func_bounds, - q=n, - sequential=sequential, - num_restarts=NUM_RESTARTS, - raw_samples=RAW_SAMPLES, # used for intialization heuristic - ) - - x = candidates.numpy().astype(float) - - active_X = x[..., [dof["kind"] == "active" for dof in self._subset_dofs(mode="on")]] - passive_X = x[..., [dof["kind"] != "active" for dof in self._subset_dofs(mode="on")]] - acq_func_meta["passive_values"] = passive_X - - else: - if acq_func_name == "quasi-random": - active_X = self._subset_inputs_sampler(n=n, kind="active", mode="on").squeeze(1).numpy() - acq_func_meta = {"name": "quasi-random", "args": {}} - - elif acq_func_name == "grid": - n_active_dims = self._len_subset_dofs(kind="active", mode="on") - active_X = self.test_inputs_grid(max_inputs=n).reshape(-1, n_active_dims).numpy() - acq_func_meta = {"name": "grid", "args": {}} - - else: - raise ValueError() - - acq_func_meta["duration"] = duration = ttime.monotonic() - start_time - - if self.verbose: - print(f"found points {active_X} in {duration:.01f} seconds") - - if route and n > 1: - active_X = active_X[utils.route(self._read_subset_devices(kind="active", mode="on"), active_X)] - - return active_X, acq_func_meta - - def acquire(self, active_inputs): - """ - Acquire and digest according to the self's acquisition and digestion plans. - - This should yield a table of sampled tasks with the same length as the sampled inputs. - """ - try: - active_devices = self._subset_devices(kind="active", mode="on") - passive_devices = [*self._subset_devices(kind="passive"), *self._subset_devices(kind="active", mode="off")] - - uid = yield from self.acquisition_plan( - active_devices, - active_inputs.astype(float), - [*self.dets, *passive_devices], - delay=self.trigger_delay, - ) - - products = self.digestion(self.db, uid) - - # compute the fitness for each task - # for index, entry in products.iterrows(): - # for task in self.tasks: - # products.loc[index, task["key"]] = getattr(entry, task["key"]) - - except Exception as error: - if not self.allow_acquisition_errors: - raise error - logging.warning(f"Error in acquisition/digestion: {repr(error)}") - products = pd.DataFrame(active_inputs, columns=self._subset_dof_names(kind="active", mode="on")) - for task in self.tasks: - products.loc[:, task["key"]] = np.nan - - if not len(active_inputs) == len(products): - raise ValueError("The table returned by the digestion must be the same length as the sampled inputs!") - - return products - - def learn( - self, - acq_func=None, - n=1, - iterations=1, - upsample=1, - train=True, - data=None, - **kwargs, - ): - """ - This iterates the learning algorithm, looping over ask -> acquire -> tell. - It should be passed to a Bluesky RunEngine. - """ - - if data is not None: - if type(data) == str: - self.tell(new_table=pd.read_hdf(data, key="table")) - else: - self.tell(new_table=data) - - if self.sample_center_on_init and not self.initialized: - new_table = yield from self.acquire(self.acq_func_bounds.mean(axis=0)) - new_table.loc[:, "acq_func"] = "sample_center_on_init" - self.tell(new_table=new_table, train=False) - - if acq_func is not None: - for i in range(iterations): - x, acq_func_meta = self.ask(n=n, acq_func_identifier=acq_func, **kwargs) - - new_table = yield from self.acquire(x) - new_table.loc[:, "acq_func"] = acq_func_meta["name"] - self.tell(new_table=new_table, train=train) - - self.initialized = True - - def reset(self): - """ - Reset the agent. - """ - self.table = pd.DataFrame() - self.initialized = False - - def benchmark( - self, output_dir="./", runs=16, n_init=64, learning_kwargs_list=[{"acq_func": "qei", "n": 4, "iterations": 16}] - ): - cache_limits = {dof["name"]: dof["limits"] for dof in self.dofs} - - for run in range(runs): - for dof in self.dofs: - dof["limits"] = cache_limits[dof["name"]] + 0.25 * np.ptp(dof["limits"]) * np.random.uniform(low=-1, high=1) - - self.reset() - - yield from self.learn("qr", n=n_init) - - for kwargs in learning_kwargs_list: - yield from self.learn(**kwargs) - - self.save_data(output_dir + f"benchmark-{int(ttime.time())}.h5") - - ip.display.clear_output(wait=True) - - @property - def model(self): - """ - A model encompassing all the tasks. A single GP in the single-task case, or a model list. - """ - return ModelListGP(*[task["model"] for task in self.tasks]) if self.num_tasks > 1 else self.tasks[0]["model"] - - def _get_task_fitness(self, task_index): - """ - Returns the fitness for a task given the task index. - """ - task = self.tasks[task_index] - - targets = self.table.loc[:, task["key"]].values.copy() - - # check that task values are inside acceptable values - valid = (targets > task["limits"][0]) & (targets < task["limits"][1]) - targets = np.where(valid, targets, np.nan) - - # transform if needed - if "transform" in task.keys(): - if task["transform"] == "log": - targets = np.where(targets > 0, np.log(targets), np.nan) - - if task["kind"] == "minimize": - targets *= -1 - - return targets - - @property - def fitnesses(self): - """ - Returns a (num_tasks x n_obs) array of fitnesses - """ - return torch.cat([torch.tensor(self._get_task_fitness(i))[..., None] for i in range(self.num_tasks)], dim=1) - - @property - def scalarized_fitness(self): - return (self.fitnesses * self.task_weights).sum(axis=-1) - - @property - def best_scalarized_fitness(self): - f = self.scalarized_fitness - return np.where(np.isnan(f), -np.inf, f).max() - - @property - def all_tasks_valid(self): - return ~torch.isnan(self.scalarized_fitness) - - @property - def target_names(self): - return [f'{task["key"]}_fitness' for task in self.tasks] - - def test_inputs_grid(self, max_inputs=MAX_TEST_INPUTS): - n_side = int(np.power(max_inputs, self._len_subset_dofs(kind="active", mode="on") ** -1)) - return torch.tensor( - np.r_[ - np.meshgrid( - *[ - np.linspace(*dof["limits"], n_side) - if dof["kind"] == "active" - else dof["device"].read()[dof["device"].name]["value"] - for dof in self._subset_dofs(mode="on") - ] - ) - ] - ).swapaxes(0, -1) - - @property - def acq_func_bounds(self): - return torch.tensor( - [ - dof["limits"] if dof["kind"] == "active" else tuple(2 * [dof["device"].read()[dof["device"].name]["value"]]) - for dof in self.dofs - if dof["mode"] == "on" - ] - ).T - - @property - def num_tasks(self): - return len(self.tasks) - - @property - def det_names(self): - return [det.name for det in self.dets] - - @property - def task_keys(self): - return [task["key"] for task in self.tasks] - - @property - def task_models(self): - return [task["model"] for task in self.tasks] - - @property - def task_weights(self): - return torch.tensor([task["weight"] for task in self.tasks], dtype=torch.float64) - - @property - def task_signs(self): - return torch.tensor([(1 if task["kind"] == "maximize" else -1) for task in self.tasks], dtype=torch.long) - - def _dof_kind_mask(self, kind=None): - return [dof["kind"] == kind if kind is not None else True for dof in self.dofs] - - def _dof_mode_mask(self, mode=None): - return [dof["mode"] == mode if mode is not None else True for dof in self.dofs] - - def _dof_tags_mask(self, tags=[]): - return [np.isin(dof["tags"], tags).any() if tags else True for dof in self.dofs] - - def _dof_mask(self, kind=None, mode=None, tags=[]): - return [ - (k and m and t) - for k, m, t in zip(self._dof_kind_mask(kind), self._dof_mode_mask(mode), self._dof_tags_mask(tags)) - ] - - def activate_dofs(self, kind=None, mode=None, tags=[]): - for dof in self._subset_dofs(kind, mode, tags): - dof["mode"] = "on" - - def deactivate_dofs(self, kind=None, mode=None, tags=[]): - for dof in self._subset_dofs(kind, mode, tags): - dof["mode"] = "off" - - def _subset_dofs(self, kind=None, mode=None, tags=[]): - return [dof for dof, m in zip(self.dofs, self._dof_mask(kind, mode, tags)) if m] - - def _len_subset_dofs(self, kind=None, mode=None, tags=[]): - return len(self._subset_dofs(kind, mode, tags)) - - def _subset_devices(self, kind=None, mode=None, tags=[]): - return [dof["device"] for dof in self._subset_dofs(kind, mode, tags)] - - def _read_subset_devices(self, kind=None, mode=None, tags=[]): - return [device.read()[device.name]["value"] for device in self._subset_devices(kind, mode, tags)] - - def _subset_dof_names(self, kind=None, mode=None, tags=[]): - return [device.name for device in self._subset_devices(kind, mode, tags)] - - def _subset_dof_limits(self, kind=None, mode=None, tags=[]): - dofs_subset = self._subset_dofs(kind, mode, tags) - if len(dofs_subset) > 0: - return torch.tensor([dof["limits"] for dof in dofs_subset], dtype=torch.float64).T - return torch.empty((2, 0)) - - @property - def latent_dim_tuples(self): - """ - Returns a list of tuples, where each tuple represent a group of dimension to find a latent representation of. - """ - - latent_dim_labels = [dof.get("latent_group", str(uuid.uuid4())) for dof in self._subset_dofs(mode="on")] - u, uinv = np.unique(latent_dim_labels, return_inverse=True) - - return [tuple(np.where(uinv == i)[0]) for i in range(len(u))] - - def test_inputs(self, n=MAX_TEST_INPUTS): - return utils.sobol_sampler(self.acq_func_bounds, n=n) - - def _subset_input_transform(self, kind=None, mode=None, tags=[]): - limits = self._subset_dof_limits(kind, mode, tags) - offset = limits.min(dim=0).values - coefficient = limits.max(dim=0).values - offset - return botorch.models.transforms.input.AffineInputTransform( - d=limits.shape[-1], coefficient=coefficient, offset=offset - ) - - def _subset_inputs_sampler(self, kind=None, mode=None, tags=[], n=MAX_TEST_INPUTS): - """ - Returns $n$ quasi-randomly sampled inputs in the bounded parameter space - """ - transform = self._subset_input_transform(kind, mode, tags) - return transform.untransform(utils.normalized_sobol_sampler(n, d=self._len_subset_dofs(kind, mode, tags))) - - def save_data(self, filepath="./self_data.h5"): - """ - Save the sampled inputs and targets of the agent to a file, which can be used - to initialize a future self. - """ - - self.table.to_hdf(filepath, key="table") - - def forget(self, index): - self.tell(new_table=self.table.drop(index=index), append=False, train=False) - - def sampler(self, n): - """ - Returns $n$ quasi-randomly sampled points on the [0,1] ^ n_active_dof hypercube using Sobol sampling. - """ - min_power_of_two = 2 ** int(np.ceil(np.log(n) / np.log(2))) - subset = np.random.choice(min_power_of_two, size=n, replace=False) - return sp.stats.qmc.Sobol(d=self._len_subset_dofs(kind="active", mode="on"), scramble=True).random( - n=min_power_of_two - )[subset] - - def _set_hypers(self, hypers): - for task in self.tasks: - task["model"].load_state_dict(hypers[task["key"]]) - self.classifier.load_state_dict(hypers["classifier"]) - - @property - def hypers(self): - hypers = {"classifier": {}} - for key, value in self.classifier.state_dict().items(): - hypers["classifier"][key] = value - for task in self.tasks: - hypers[task["key"]] = {} - for key, value in task["model"].state_dict().items(): - hypers[task["key"]][key] = value - - return hypers - - def save_hypers(self, filepath): - hypers = self.hypers - with h5py.File(filepath, "w") as f: - for model_key in hypers.keys(): - f.create_group(model_key) - for param_key, param_value in hypers[model_key].items(): - f[model_key].create_dataset(param_key, data=param_value) - - @staticmethod - def load_hypers(filepath): - hypers = {} - with h5py.File(filepath, "r") as f: - for model_key in f.keys(): - hypers[model_key] = OrderedDict() - for param_key, param_value in f[model_key].items(): - hypers[model_key][param_key] = torch.tensor(np.atleast_1d(param_value[()])) - return hypers - - def train_models(self, **kwargs): - t0 = ttime.monotonic() - for task in self.tasks: - model = task["model"] - botorch.fit.fit_gpytorch_mll(gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood, model), **kwargs) - botorch.fit.fit_gpytorch_mll( - gpytorch.mlls.ExactMarginalLogLikelihood(self.classifier.likelihood, self.classifier), **kwargs - ) - if self.verbose: - print(f"trained models in {ttime.monotonic() - t0:.01f} seconds") - - @property - def acq_func_info(self): - entries = [] - for k, d in self.acq_func_config.items(): - ret = "" - ret += f'{d["pretty_name"].upper()} (identifiers: {d["identifiers"]})\n' - ret += f'-> {d["description"]}' - entries.append(ret) - - print("\n\n".join(entries)) - - @property - def inputs(self): - return self.table.loc[:, self._subset_dof_names()].astype(float) - - @property - def model_inputs(self): - return self.table.loc[:, self._subset_dof_names(mode="on")].astype(float) - - @property - def active_inputs(self): - return self.table.loc[:, self._subset_dof_names(kind="active", mode="on")].astype(float) - - @property - def best_inputs(self): - return self.active_inputs.values[np.nanargmax(self.scalarized_fitness)] - - def go_to(self, active_inputs): - args = [] - for dof, value in zip(self._subset_dofs(kind="active"), np.atleast_1d(active_inputs).T): - args.append(dof["device"]) - args.append(value) - yield from bps.mv(*args) - - def go_to_best(self): - yield from self.go_to(self.best_inputs) - - def plot_tasks(self, **kwargs): - if self._len_subset_dofs(kind="active", mode="on") == 1: - self._plot_tasks_one_dof(**kwargs) - else: - self._plot_tasks_many_dofs(**kwargs) - - def _plot_tasks_one_dof(self, size=16, lw=1e0): - self.task_fig, self.task_axes = plt.subplots( - self.num_tasks, - 1, - figsize=(6, 4 * self.num_tasks), - sharex=True, - constrained_layout=True, - ) - - self.task_axes = np.atleast_1d(self.task_axes) - - for task_index, task in enumerate(self.tasks): - task_plots = self.plots["tasks"][task["name"]] = {} - - task_fitness = self._get_task_fitness(task_index=task_index) - - color = DEFAULT_COLOR_LIST[task_index] - - self.task_axes[task_index].set_ylabel(task["key"]) - - x = self.test_inputs_grid() - task_posterior = task["model"].posterior(x) - task_mean = task_posterior.mean.detach().numpy() - task_sigma = task_posterior.variance.sqrt().detach().numpy() - - sampled_inputs = self.active_inputs.values - task_plots["sampled"] = self.task_axes[task_index].scatter(sampled_inputs, task_fitness, s=size, color=color) - - on_dofs_are_active_mask = [dof["kind"] == "active" for dof in self._subset_dofs(mode="on")] - - for z in [0, 1, 2]: - self.task_axes[task_index].fill_between( - x[..., on_dofs_are_active_mask].squeeze(), - (task_mean - z * task_sigma).squeeze(), - (task_mean + z * task_sigma).squeeze(), - lw=lw, - color=color, - alpha=0.5**z, - ) - - self.task_axes[task_index].set_xlim(self._subset_dofs(kind="active", mode="on")[0]["limits"]) - - def _plot_tasks_many_dofs(self, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, gridded=None, size=16): - if gridded is None: - gridded = self._len_subset_dofs(kind="active", mode="on") == 2 - - self.task_fig, self.task_axes = plt.subplots( - len(self.tasks), - 3, - figsize=(10, 4 * len(self.tasks)), - sharex=True, - sharey=True, - constrained_layout=True, - ) - - self.task_axes = np.atleast_2d(self.task_axes) - # self.task_fig.suptitle(f"(x,y)=({self.dofs[axes[0]].name},{self.dofs[axes[1]].name})") - - for task_index, task in enumerate(self.tasks): - task_fitness = self._get_task_fitness(task_index=task_index) - - task_vmin, task_vmax = np.nanpercentile(task_fitness, q=[1, 99]) - task_norm = mpl.colors.Normalize(task_vmin, task_vmax) - - # if task["transform"] == "log": - # task_norm = mpl.colors.LogNorm(task_vmin, task_vmax) - # else: - - self.task_axes[task_index, 0].set_ylabel(f'{task["key"]}_fitness') - - self.task_axes[task_index, 0].set_title("samples") - self.task_axes[task_index, 1].set_title("posterior mean") - self.task_axes[task_index, 2].set_title("posterior std. dev.") - - data_ax = self.task_axes[task_index, 0].scatter( - *self.active_inputs.values.T[axes], s=size, c=task_fitness, norm=task_norm, cmap=cmap - ) - - x = self.test_inputs_grid().squeeze() if gridded else self.test_inputs(n=MAX_TEST_INPUTS) - - task_posterior = task["model"].posterior(x) - task_mean = task_posterior.mean - task_sigma = task_posterior.variance.sqrt() - - if gridded: - if not x.ndim == 3: - raise ValueError() - self.task_axes[task_index, 1].pcolormesh( - x[..., 0].detach().numpy(), - x[..., 1].detach().numpy(), - task_mean[..., 0].detach().numpy(), - shading=shading, - cmap=cmap, - norm=task_norm, - ) - sigma_ax = self.task_axes[task_index, 2].pcolormesh( - x[..., 0].detach().numpy(), - x[..., 1].detach().numpy(), - task_sigma[..., 0].detach().numpy(), - shading=shading, - cmap=cmap, - ) - - else: - self.task_axes[task_index, 1].scatter( - x.detach().numpy()[..., axes[0]], - x.detach().numpy()[..., axes[1]], - c=task_mean[..., 0].detach().numpy(), - s=size, - norm=task_norm, - cmap=cmap, - ) - sigma_ax = self.task_axes[task_index, 2].scatter( - x.detach().numpy()[..., axes[0]], - x.detach().numpy()[..., axes[1]], - c=task_sigma[..., 0].detach().numpy(), - s=size, - cmap=cmap, - ) - - self.task_fig.colorbar(data_ax, ax=self.task_axes[task_index, :2], location="bottom", aspect=32, shrink=0.8) - self.task_fig.colorbar(sigma_ax, ax=self.task_axes[task_index, 2], location="bottom", aspect=32, shrink=0.8) - - for ax in self.task_axes.ravel(): - ax.set_xlim(*self._subset_dofs(kind="active", mode="on")[axes[0]]["limits"]) - ax.set_ylim(*self._subset_dofs(kind="active", mode="on")[axes[1]]["limits"]) - - def plot_acquisition(self, acq_funcs=["ei"], **kwargs): - if self._len_subset_dofs(kind="active", mode="on") == 1: - self._plot_acq_one_dof(acq_funcs=acq_funcs, **kwargs) - - else: - self._plot_acq_many_dofs(acq_funcs=acq_funcs, **kwargs) - - def _plot_acq_one_dof(self, acq_funcs, lw=1e0, **kwargs): - self.acq_fig, self.acq_axes = plt.subplots( - 1, - len(acq_funcs), - figsize=(4 * len(acq_funcs), 4), - sharex=True, - constrained_layout=True, - ) - - self.acq_axes = np.atleast_1d(self.acq_axes) - - for iacq_func, acq_func_identifier in enumerate(acq_funcs): - color = DEFAULT_COLOR_LIST[iacq_func] - - acq_func, acq_func_meta = acquisition.get_acquisition_function(self, acq_func_identifier) - - x = self.test_inputs_grid() - *input_shape, input_dim = x.shape - obj = acq_func.forward(x.reshape(-1, 1, input_dim)).reshape(input_shape) - - if acq_func_identifier in ["ei", "pi"]: - obj = obj.exp() - - self.acq_axes[iacq_func].set_title(acq_func_meta["name"]) - - on_dofs_are_active_mask = [dof["kind"] == "active" for dof in self._subset_dofs(mode="on")] - self.acq_axes[iacq_func].plot( - x[..., on_dofs_are_active_mask].squeeze(), obj.detach().numpy(), lw=lw, color=color - ) - - self.acq_axes[iacq_func].set_xlim(self._subset_dofs(kind="active", mode="on")[0]["limits"]) - - def _plot_acq_many_dofs( - self, acq_funcs, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, gridded=None, size=16, **kwargs - ): - self.acq_fig, self.acq_axes = plt.subplots( - 1, - len(acq_funcs), - figsize=(4 * len(acq_funcs), 4), - sharex=True, - sharey=True, - constrained_layout=True, - ) - - if gridded is None: - gridded = self._len_subset_dofs(kind="active", mode="on") == 2 - - self.acq_axes = np.atleast_1d(self.acq_axes) - # self.acq_fig.suptitle(f"(x,y)=({self.dofs[axes[0]].name},{self.dofs[axes[1]].name})") - - x = self.test_inputs_grid().squeeze() if gridded else self.test_inputs(n=MAX_TEST_INPUTS) - *input_shape, input_dim = x.shape - - for iacq_func, acq_func_identifier in enumerate(acq_funcs): - acq_func, acq_func_meta = acquisition.get_acquisition_function(self, acq_func_identifier) - - obj = acq_func.forward(x.reshape(-1, 1, input_dim)).reshape(input_shape) - if acq_func_identifier in ["ei", "pi"]: - obj = obj.exp() - - if gridded: - self.acq_axes[iacq_func].set_title(acq_func_meta["name"]) - obj_ax = self.acq_axes[iacq_func].pcolormesh( - x[..., 0].detach().numpy(), - x[..., 1].detach().numpy(), - obj.detach().numpy(), - shading=shading, - cmap=cmap, - ) - - self.acq_fig.colorbar(obj_ax, ax=self.acq_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) - - else: - self.acq_axes[iacq_func].set_title(acq_func_meta["name"]) - obj_ax = self.acq_axes[iacq_func].scatter( - x.detach().numpy()[..., axes[0]], - x.detach().numpy()[..., axes[1]], - c=obj.detach().numpy(), - ) - - self.acq_fig.colorbar(obj_ax, ax=self.acq_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) - - for ax in self.acq_axes.ravel(): - ax.set_xlim(*self._subset_dofs(kind="active", mode="on")[axes[0]]["limits"]) - ax.set_ylim(*self._subset_dofs(kind="active", mode="on")[axes[1]]["limits"]) - - def plot_validity(self, **kwargs): - if self._len_subset_dofs(kind="active", mode="on") == 1: - self._plot_valid_one_dof(**kwargs) - - else: - self._plot_valid_many_dofs(**kwargs) - - def _plot_valid_one_dof(self, size=16, lw=1e0): - self.valid_fig, self.valid_ax = plt.subplots(1, 1, figsize=(4, 4), sharex=True, constrained_layout=True) - - x = self.test_inputs_grid() - *input_shape, input_dim = x.shape - constraint = self.classifier.probabilities(x.reshape(-1, 1, input_dim))[..., -1].reshape(input_shape) - - self.valid_ax.scatter(self.active_inputs.values, self.all_tasks_valid, s=size) - - on_dofs_are_active_mask = [dof["kind"] == "active" for dof in self._subset_dofs(mode="on")] - - self.valid_ax.plot(x[..., on_dofs_are_active_mask].squeeze(), constraint.detach().numpy(), lw=lw) - - self.valid_ax.set_xlim(*self._subset_dofs(kind="active", mode="on")[0]["limits"]) - - def _plot_valid_many_dofs(self, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, size=16, gridded=None): - self.valid_fig, self.valid_axes = plt.subplots( - 1, 2, figsize=(8, 4), sharex=True, sharey=True, constrained_layout=True - ) - - if gridded is None: - gridded = self._len_subset_dofs(kind="active", mode="on") == 2 - - data_ax = self.valid_axes[0].scatter( - *self.active_inputs.values.T[:2], - c=self.all_tasks_valid, - s=size, - vmin=0, - vmax=1, - cmap=cmap, - ) - - x = self.test_inputs_grid().squeeze() if gridded else self.test_inputs(n=MAX_TEST_INPUTS) - *input_shape, input_dim = x.shape - constraint = self.classifier.probabilities(x.reshape(-1, 1, input_dim))[..., -1].reshape(input_shape) - - if gridded: - self.valid_axes[1].pcolormesh( - x[..., 0].detach().numpy(), - x[..., 1].detach().numpy(), - constraint.detach().numpy(), - shading=shading, - cmap=cmap, - vmin=0, - vmax=1, - ) - - # self.acq_fig.colorbar(obj_ax, ax=self.valid_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) - - else: - # self.valid_axes.set_title(acq_func_meta["name"]) - self.valid_axes[1].scatter( - x.detach().numpy()[..., axes[0]], - x.detach().numpy()[..., axes[1]], - c=constraint.detach().numpy(), - ) - - self.valid_fig.colorbar(data_ax, ax=self.valid_axes[:2], location="bottom", aspect=32, shrink=0.8) - - for ax in self.valid_axes.ravel(): - ax.set_xlim(*self._subset_dofs(kind="active", mode="on")[axes[0]]["limits"]) - ax.set_ylim(*self._subset_dofs(kind="active", mode="on")[axes[1]]["limits"]) - - def inspect_beam(self, index, border=None): - im = self.images[index] - - x_min, x_max, y_min, y_max, width_x, width_y = self.table.loc[ - index, ["x_min", "x_max", "y_min", "y_max", "width_x", "width_y"] - ] - - bbx = np.array([x_min, x_max])[[0, 0, 1, 1, 0]] - bby = np.array([y_min, y_max])[[0, 1, 1, 0, 0]] - - plt.figure() - plt.imshow(im, cmap="gray_r") - plt.plot(bbx, bby, lw=4e0, c="r") - - if border is not None: - plt.xlim(x_min - border * width_x, x_min + border * width_x) - plt.ylim(y_min - border * width_y, y_min + border * width_y) - - def plot_history(self, x_key="index", show_all_tasks=False): - x = getattr(self.table, x_key).values - - num_task_plots = 1 - if show_all_tasks: - num_task_plots = self.num_tasks + 1 - - self.num_tasks + 1 if self.num_tasks > 1 else 1 - - hist_fig, hist_axes = plt.subplots( - num_task_plots, 1, figsize=(6, 4 * num_task_plots), sharex=True, constrained_layout=True, dpi=200 - ) - hist_axes = np.atleast_1d(hist_axes) - - unique_strategies, acq_func_index, acq_func_inverse = np.unique( - self.table.acq_func, return_index=True, return_inverse=True - ) - - sample_colors = np.array(DEFAULT_COLOR_LIST)[acq_func_inverse] - - if show_all_tasks: - for task_index, task in enumerate(self.tasks): - y = self.table.loc[:, f'{task["key"]}_fitness'].values - hist_axes[task_index].scatter(x, y, c=sample_colors) - hist_axes[task_index].plot(x, y, lw=5e-1, c="k") - hist_axes[task_index].set_ylabel(task["key"]) - - y = self.scalarized_fitness - - cummax_y = np.array([np.nanmax(y[: i + 1]) for i in range(len(y))]) - - hist_axes[-1].scatter(x, y, c=sample_colors) - hist_axes[-1].plot(x, y, lw=5e-1, c="k") - - hist_axes[-1].plot(x, cummax_y, lw=5e-1, c="k", ls=":") - - hist_axes[-1].set_ylabel("total_fitness") - hist_axes[-1].set_xlabel(x_key) - - handles = [] - for i_acq_func, acq_func in enumerate(unique_strategies): - # i_acq_func = np.argsort(acq_func_index)[i_handle] - handles.append(Patch(color=DEFAULT_COLOR_LIST[i_acq_func], label=acq_func)) - legend = hist_axes[0].legend(handles=handles, fontsize=8) - legend.set_title("acquisition function") - - # plot_history(self, x_key="time") - - # plt.savefig("bo-history.pdf", dpi=256) +from .agent import * # noqa F401 +from .devices import * # noqa F401 +from .objective import * # noqa F401 diff --git a/bloptools/bayesian/acquisition/__init__.py b/bloptools/bayesian/acquisition/__init__.py index f1dc7bb..5a1bb72 100644 --- a/bloptools/bayesian/acquisition/__init__.py +++ b/bloptools/bayesian/acquisition/__init__.py @@ -30,7 +30,7 @@ def parse_acq_func(acq_func_identifier): return acq_func_name -def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs): +def get_acquisition_function(agent, acq_func_identifier="qei", return_metadata=True, **acq_func_kwargs): """ Generates an acquisition function from a supplied identifier. """ @@ -46,8 +46,8 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs acq_func = analytic.ConstrainedLogExpectedImprovement( constraint=agent.constraint, model=agent.model, - best_f=agent.best_scalarized_fitness, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + best_f=agent.best_scalarized_objective, + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {}} @@ -55,8 +55,8 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs acq_func = monte_carlo.qConstrainedExpectedImprovement( constraint=agent.constraint, model=agent.model, - best_f=agent.best_scalarized_fitness, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + best_f=agent.best_scalarized_objective, + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {}} @@ -64,8 +64,8 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs acq_func = analytic.ConstrainedLogProbabilityOfImprovement( constraint=agent.constraint, model=agent.model, - best_f=agent.best_scalarized_fitness, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + best_f=agent.best_scalarized_objective, + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {}} @@ -73,8 +73,8 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs acq_func = monte_carlo.qConstrainedProbabilityOfImprovement( constraint=agent.constraint, model=agent.model, - best_f=agent.best_scalarized_fitness, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + best_f=agent.best_scalarized_objective, + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {}} @@ -103,7 +103,7 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs constraint=agent.constraint, model=agent.model, beta=beta, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {"beta": beta}} @@ -114,7 +114,7 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs constraint=agent.constraint, model=agent.model, beta=beta, - posterior_transform=ScalarizedPosteriorTransform(weights=agent.task_weights, offset=0), + posterior_transform=ScalarizedPosteriorTransform(weights=agent.objective_weights_torch, offset=0), ) acq_func_meta = {"name": acq_func_name, "args": {"beta": beta}} @@ -126,4 +126,4 @@ def get_acquisition_function(agent, acq_func_identifier="qei", **acq_func_kwargs acq_func, _ = get_acquisition_function(agent, acq_func_identifier="qucb", beta=0, return_metadata=False) acq_func_meta = {"name": acq_func_name, "args": {}} - return acq_func, acq_func_meta + return (acq_func, acq_func_meta) if return_metadata else acq_func diff --git a/bloptools/bayesian/acquisition/analytic.py b/bloptools/bayesian/acquisition/analytic.py index 87bf2cd..8fa6f03 100644 --- a/bloptools/bayesian/acquisition/analytic.py +++ b/bloptools/bayesian/acquisition/analytic.py @@ -78,7 +78,7 @@ def __init__(self, constraint, *args, **kwargs): self.constraint = constraint def forward(self, x): - return super().forward(x) + self.constraint(x).log().squeeze(-1) + return (super().forward(x) + self.constraint(x).log().squeeze(-1)).exp() class ConstrainedLogProbabilityOfImprovement(LogProbabilityOfImprovement): @@ -87,4 +87,4 @@ def __init__(self, constraint, *args, **kwargs): self.constraint = constraint def forward(self, x): - return super().forward(x) + self.constraint(x).log().squeeze(-1) + return (super().forward(x) + self.constraint(x).log().squeeze(-1)).exp() diff --git a/bloptools/bayesian/agent.py b/bloptools/bayesian/agent.py new file mode 100644 index 0000000..0191694 --- /dev/null +++ b/bloptools/bayesian/agent.py @@ -0,0 +1,691 @@ +import logging +import os +import time as ttime +import warnings +from collections import OrderedDict + +import bluesky.plan_stubs as bps # noqa F401 +import bluesky.plans as bp # noqa F401 +import botorch +import gpytorch +import h5py +import IPython as ip +import matplotlib as mpl +import numpy as np +import pandas as pd +import scipy as sp +import torch +from botorch.models.deterministic import GenericDeterministicModel +from botorch.models.model_list_gp_regression import ModelListGP + +from .. import utils +from . import acquisition, models, plotting +from .acquisition import default_acquisition_plan +from .devices import DOFList +from .digestion import default_digestion_function +from .objective import ObjectiveList + +os.environ["KMP_DUPLICATE_LIB_OK"] = "True" + +warnings.filterwarnings("ignore", category=botorch.exceptions.warnings.InputDataWarning) + +mpl.rc("image", cmap="coolwarm") + +MAX_TEST_INPUTS = 2**11 + +os.environ["KMP_DUPLICATE_LIB_OK"] = "True" + + +class Agent: + def __init__( + self, + dofs, + objectives, + db, + **kwargs, + ): + """ + A Bayesian optimization self. + + Parameters + ---------- + dofs : iterable of ophyd objects + The degrees of freedom that the agent can control, which determine the output of the model. + bounds : iterable of lower and upper bounds + The bounds on each degree of freedom. This should be an array of shape (n_dofs, 2). + objectives : iterable of objectives + The objectives which the agent will try to optimize. + acquisition : Bluesky plan generator that takes arguments (dofs, inputs, dets) + A plan that samples the beamline for some given inputs. + digestion : function that takes arguments (db, uid) + A function to digest the output of the acquisition. + db : A databroker instance. + """ + + # DOFs are parametrized by whether they are active and whether they are read-only + # + # below are the behaviors of DOFs of each kind and mode: + # + # "read": the agent will read the input on every acquisition (all dofs are always read) + # "move": the agent will try to set and optimize over these (there must be at least one of these) + # "input" means that the agent will use the value to make its posterior + # + # + # not read-only read-only + # +---------------------+---------------+ + # active | read, input, move | read, input | + # +---------------------+---------------+ + # inactive | read | read | + # +---------------------+---------------+ + # + # + + self.dofs = DOFList(list(np.atleast_1d(dofs))) + self.objectives = ObjectiveList(list(np.atleast_1d(objectives))) + self.db = db + + self.verbose = kwargs.get("verbose", False) + self.allow_acquisition_errors = kwargs.get("allow_acquisition_errors", True) + self.initialization = kwargs.get("initialization", None) + self.acquisition_plan = kwargs.get("acquisition_plan", default_acquisition_plan) + self.digestion = kwargs.get("digestion", default_digestion_function) + self.dets = list(np.atleast_1d(kwargs.get("dets", []))) + + self.trigger_delay = kwargs.get("trigger_delay", 0) + self.acq_func_config = kwargs.get("acq_func_config", acquisition.config) + self.sample_center_on_init = kwargs.get("sample_center_on_init", False) + + self.table = pd.DataFrame() + + self.initialized = False + self._train_models = True + self.a_priori_hypers = None + + self.plots = {"objectives": {}} + + def tell(self, new_table=None, append=True, train=True, **kwargs): + """ + Inform the agent about new inputs and targets for the model. + """ + + new_table = pd.DataFrame() if new_table is None else new_table + self.table = pd.concat([self.table, new_table]) if append else new_table + self.table.index = np.arange(len(self.table)) + + skew_dims = self.latent_dim_tuples + + if self.initialized: + cached_hypers = self.hypers + + inputs = self.table.loc[:, self.dofs.subset(active=True).names].values.astype(float) + + for i, obj in enumerate(self.objectives): + self.table.loc[:, f"{obj.key}_fitness"] = targets = self._get_objective_targets(i) + train_index = ~np.isnan(targets) + + if not train_index.sum() >= 2: + raise ValueError("There must be at least two valid data points per objective!") + + train_inputs = torch.tensor(inputs[train_index]).double() + train_targets = torch.tensor(targets[train_index]).double().unsqueeze(-1) # .unsqueeze(0) + + # for constructing the log normal noise prior + # target_snr = 2e2 + # scale = 2e0 + # loc = np.log(1 / target_snr**2) + scale**2 + + likelihood = gpytorch.likelihoods.GaussianLikelihood( + noise_constraint=gpytorch.constraints.Interval( + torch.tensor(1e-4).square(), + torch.tensor(1 / obj.min_snr).square(), + ), + # noise_prior=gpytorch.priors.torch_priors.LogNormalPrior(loc=loc, scale=scale), + ).double() + + outcome_transform = botorch.models.transforms.outcome.Standardize(m=1) # , batch_shape=torch.Size((1,))) + + obj.model = models.LatentGP( + train_inputs=train_inputs, + train_targets=train_targets, + likelihood=likelihood, + skew_dims=skew_dims, + input_transform=self._subset_input_transform(active=True), + outcome_transform=outcome_transform, + ).double() + + dirichlet_likelihood = gpytorch.likelihoods.DirichletClassificationLikelihood( + self.all_objectives_valid.long(), learn_additional_noise=True + ).double() + + self.classifier = models.LatentDirichletClassifier( + train_inputs=torch.tensor(inputs).double(), + train_targets=dirichlet_likelihood.transformed_targets.transpose(-1, -2).double(), + skew_dims=skew_dims, + likelihood=dirichlet_likelihood, + input_transform=self._subset_input_transform(active=True), + ).double() + + if self.a_priori_hypers is not None: + self._set_hypers(self.a_priori_hypers) + elif not train: + self._set_hypers(cached_hypers) + else: + try: + self.train_models() + except botorch.exceptions.errors.ModelFittingError: + if self.initialized: + self._set_hypers(cached_hypers) + else: + raise RuntimeError("Could not fit model on initialization!") + + self.constraint = GenericDeterministicModel(f=lambda x: self.classifier.probabilities(x)[..., -1]) + + def ask(self, acq_func_identifier="qei", n=1, route=True, sequential=True, **acq_func_kwargs): + """ + Ask the agent for the best point to sample, given an acquisition function. + + acq_func_identifier: which acquisition function to use + n: how many points to get + """ + + acq_func_name = acquisition.parse_acq_func(acq_func_identifier) + acq_func_type = acquisition.config[acq_func_name]["type"] + + start_time = ttime.monotonic() + + if acq_func_type in ["analytic", "monte_carlo"]: + if not self.initialized: + raise RuntimeError( + f'Can\'t construct non-trivial acquisition function "{acq_func_identifier}"' + f" (the agent is not initialized!)" + ) + + if acq_func_type == "analytic" and n > 1: + raise ValueError("Can't generate multiple design points for analytic acquisition functions.") + + acq_func, acq_func_meta = self.get_acquisition_function( + acq_func_identifier=acq_func_identifier, return_metadata=True + ) + + NUM_RESTARTS = 8 + RAW_SAMPLES = 1024 + + candidates, _ = botorch.optim.optimize_acqf( + acq_function=acq_func, + bounds=self._active_bounds_torch, + q=n, + sequential=sequential, + num_restarts=NUM_RESTARTS, + raw_samples=RAW_SAMPLES, # used for intialization heuristic + ) + + x = candidates.numpy().astype(float) + + active_dofs_are_read_only = np.array([dof.read_only for dof in self.dofs.subset(active=True)]) + + acquisition_X = x[..., ~active_dofs_are_read_only] + read_only_X = x[..., active_dofs_are_read_only] + acq_func_meta["read_only_values"] = read_only_X + + else: + if acq_func_name == "quasi-random": + acquisition_X = self._subset_inputs_sampler(n=n, active=True, read_only=False).squeeze(1).numpy() + acq_func_meta = {"name": "quasi-random", "args": {}} + + elif acq_func_name == "grid": + n_active_dims = len(self.dofs.subset(active=True, read_only=False)) + acquisition_X = self.test_inputs_grid(max_inputs=n).reshape(-1, n_active_dims).numpy() + acq_func_meta = {"name": "grid", "args": {}} + + else: + raise ValueError() + + acq_func_meta["duration"] = duration = ttime.monotonic() - start_time + + if self.verbose: + print(f"found points {acquisition_X} in {duration:.01f} seconds") + + if route and n > 1: + routing_index = utils.route(self.dofs.subset(active=True, read_only=False).readback, acquisition_X) + acquisition_X = acquisition_X[routing_index] + + return acquisition_X, acq_func_meta + + def acquire(self, acquisition_inputs): + """ + Acquire and digest according to the self's acquisition and digestion plans. + + This should yield a table of sampled objectives with the same length as the sampled inputs. + """ + try: + acquisition_devices = self.dofs.subset(active=True, read_only=False).devices + read_only_devices = self.dofs.subset(active=True, read_only=True).devices + + # the acquisition plan always takes as arguments: + # (things to move, where to move them, things to trigger once you get there) + # with some other kwargs + + uid = yield from self.acquisition_plan( + acquisition_devices, + acquisition_inputs.astype(float), + [*self.dets, *read_only_devices], + delay=self.trigger_delay, + ) + + products = self.digestion(self.db, uid) + + # compute the fitness for each objective + # for index, entry in products.iterrows(): + # for obj in self.objectives: + # products.loc[index, objective["key"]] = getattr(entry, objective["key"]) + + except Exception as error: + if not self.allow_acquisition_errors: + raise error + logging.warning(f"Error in acquisition/digestion: {repr(error)}") + products = pd.DataFrame(acquisition_inputs, columns=self.dofs.subset(active=True, read_only=False).names) + for obj in self.objectives: + products.loc[:, obj.key] = np.nan + + if not len(acquisition_inputs) == len(products): + raise ValueError("The table returned by the digestion function must be the same length as the sampled inputs!") + + return products + + def learn( + self, + acq_func=None, + n=1, + iterations=1, + upsample=1, + train=True, + data=None, + **kwargs, + ): + """ + This iterates the learning algorithm, looping over ask -> acquire -> tell. + It should be passed to a Bluesky RunEngine. + """ + + if data is not None: + if type(data) == str: + self.tell(new_table=pd.read_hdf(data, key="table")) + else: + self.tell(new_table=data) + + if self.sample_center_on_init and not self.initialized: + new_table = yield from self.acquire(self.dofs.subset(active=True, read_only=False).limits.mean(axis=1)) + new_table.loc[:, "acq_func"] = "sample_center_on_init" + self.tell(new_table=new_table, train=False) + + if acq_func is not None: + for i in range(iterations): + x, acq_func_meta = self.ask(n=n, acq_func_identifier=acq_func, **kwargs) + + new_table = yield from self.acquire(x) + new_table.loc[:, "acq_func"] = acq_func_meta["name"] + self.tell(new_table=new_table, train=train) + + self.initialized = True + + def get_acquisition_function(self, acq_func_identifier, return_metadata=False): + return acquisition.get_acquisition_function( + self, acq_func_identifier=acq_func_identifier, return_metadata=return_metadata + ) + + def reset(self): + """ + Reset the agent. + """ + self.table = pd.DataFrame() + self.initialized = False + + def benchmark( + self, output_dir="./", runs=16, n_init=64, learning_kwargs_list=[{"acq_func": "qei", "n": 4, "iterations": 16}] + ): + cache_limits = {dof["name"]: dof["limits"] for dof in self.dofs} + + for run in range(runs): + for dof in self.dofs: + dof["limits"] = cache_limits[dof["name"]] + 0.25 * np.ptp(dof["limits"]) * np.random.uniform(low=-1, high=1) + + self.reset() + + yield from self.learn("qr", n=n_init) + + for kwargs in learning_kwargs_list: + yield from self.learn(**kwargs) + + self.save_data(output_dir + f"benchmark-{int(ttime.time())}.h5") + + ip.display.clear_output(wait=True) + + @property + def model(self): + """ + A model encompassing all the objectives. A single GP in the single-objective case, or a model list. + """ + return ModelListGP(*[obj.model for obj in self.objectives]) if len(self.objectives) > 1 else self.objectives[0].model + + @property + def objective_weights_torch(self): + return torch.tensor(self.objectives.weights, dtype=torch.float) + + def _get_objective_targets(self, i): + """ + Returns the targets (what we fit to) for an objective, given the objective index. + """ + obj = self.objectives[i] + + targets = self.table.loc[:, obj.key].values.copy() + + # check that targets values are inside acceptable values + valid = (targets > obj.limits[0]) & (targets < obj.limits[1]) + targets = np.where(valid, targets, np.nan) + + # transform if needed + if obj.log: + targets = np.where(valid, np.log(targets), np.nan) + + if obj.minimize: + targets *= -1 + + return targets + + # @property + # def acquisition_dofs(self): + # """ + # Returns the acquisition DOFs, which are the DOFs to optimize over (that is, active and not read-only). + # """ + # return self.dofs.subset(active=True, read_only=False) + + # @property + # def acquisition_dof_limits(self): + # """ + # Returns the acquisition limits, which are the ranges optimize over (that is, active and not read-only). + # This has shape (n_acq_dofs, 2). + # """ + # acq_dofs = self.dofs.subset(active=True, read_only=False) + # return np.c_[acq_dofs.summary.lower_limit.values, acq_dofs.summary.upper_limit.values] + + @property + def n_objs(self): + """ + Returns a (num_objectives x n_observations) array of objectives + """ + return len(self.objectives) + + @property + def objectives_targets(self): + """ + Returns a (num_objectives x n_obs) array of objectives + """ + return torch.cat([torch.tensor(self._get_objective_targets(i))[..., None] for i in range(self.n_objs)], dim=1) + + @property + def scalarized_objectives(self): + return (self.objectives_targets * self.objectives.weights).sum(axis=-1) + + @property + def best_scalarized_objective(self): + f = self.scalarized_objectives + return np.where(np.isnan(f), -np.inf, f).max() + + @property + def all_objectives_valid(self): + return ~torch.isnan(self.scalarized_objectives) + + @property + def target_names(self): + return [f"{obj.key}_fitness" for obj in self.objectives] + + def test_inputs_grid(self, max_inputs=MAX_TEST_INPUTS): + """ + Returns a (n_side, ..., n_side, 1, n_active_dof) grid of test_inputs + """ + n_acq_dofs = len(self.dofs.subset(active=True, read_only=False)) + n_side = int(np.power(max_inputs, n_acq_dofs**-1)) + return torch.cat( + [ + tensor.unsqueeze(-1) + for tensor in torch.meshgrid( + *[ + torch.linspace(dof.lower_limit, dof.upper_limit, n_side) if not dof.read_only else dof.readback + for dof in self.dofs.subset(active=True) + ], + indexing="ij", + ) + ], + dim=-1, + ).unsqueeze(-2) + + def test_inputs(self, n=MAX_TEST_INPUTS): + """ + Returns a (n, 1, n_active_dof) grid of test_inputs + """ + return utils.sobol_sampler(self._active_bounds_torch, n=n) + + @property + def _active_bounds_torch(self): + return torch.tensor( + [dof.limits if not dof.read_only else tuple(2 * [dof.readback]) for dof in self.dofs.subset(active=True)] + ).T + + # @property + # def num_objectives(self): + # return len(self.objectives) + + # @property + # def det_names(self): + # return [det.name for det in self.dets] + + # @property + # def objective_keys(self): + # return [obj.key for obj in self.objectives] + + # @property + # def objective_models(self): + # return [obj.model for obj in self.objectives] + + # @property + # def objective_weights(self): + # return torch.tensor([objective["weight"] for obj in self.objectives], dtype=torch.float64) + + # @property + # def objective_signs(self): + # return torch.tensor([(-1 if objective["minimize"] else +1) for obj in self.objectives], dtype=torch.long) + + @property + def latent_dim_tuples(self): + """ + Returns a list of tuples, where each tuple represent a group of dimension to find a latent representation of. + """ + + latent_dim_labels = [dof.latent_group for dof in self.dofs.subset(active=True)] + u, uinv = np.unique(latent_dim_labels, return_inverse=True) + + return [tuple(np.where(uinv == i)[0]) for i in range(len(u))] + + def _subset_input_transform(self, active=None, read_only=None, tags=[]): + # torch likes limits to be (2, n_dof) and not (n_dof, 2) + torch_limits = torch.tensor(self.dofs.subset(active, read_only, tags).limits.T, dtype=torch.double) + offset = torch_limits.min(dim=0).values + coefficient = torch_limits.max(dim=0).values - offset + return botorch.models.transforms.input.AffineInputTransform( + d=torch_limits.shape[-1], coefficient=coefficient, offset=offset + ) + + def _subset_inputs_sampler(self, active=None, read_only=None, tags=[], n=MAX_TEST_INPUTS): + """ + Returns $n$ quasi-randomly sampled inputs in the bounded parameter space + """ + transform = self._subset_input_transform(active, read_only, tags) + return transform.untransform(utils.normalized_sobol_sampler(n, d=len(self.dofs.subset(active, read_only, tags)))) + + def save_data(self, filepath="./self_data.h5"): + """ + Save the sampled inputs and targets of the agent to a file, which can be used + to initialize a future self. + """ + + self.table.to_hdf(filepath, key="table") + + def forget(self, index): + self.tell(new_table=self.table.drop(index=index), append=False, train=False) + + def sampler(self, n, d): + """ + Returns $n$ quasi-randomly sampled points on the [0,1] ^ d hypercube using Sobol sampling. + """ + min_power_of_two = 2 ** int(np.ceil(np.log(n) / np.log(2))) + subset = np.random.choice(min_power_of_two, size=n, replace=False) + return sp.stats.qmc.Sobol(d=d, scramble=True).random(n=min_power_of_two)[subset] + + def _set_hypers(self, hypers): + for obj in self.objectives: + obj.model.load_state_dict(hypers[obj.key]) + self.classifier.load_state_dict(hypers["classifier"]) + + @property + def hypers(self): + hypers = {"classifier": {}} + for key, value in self.classifier.state_dict().items(): + hypers["classifier"][key] = value + for obj in self.objectives: + hypers[obj.key] = {} + for key, value in obj.model.state_dict().items(): + hypers[obj.key][key] = value + + return hypers + + def save_hypers(self, filepath): + hypers = self.hypers + with h5py.File(filepath, "w") as f: + for model_key in hypers.keys(): + f.create_group(model_key) + for param_key, param_value in hypers[model_key].items(): + f[model_key].create_dataset(param_key, data=param_value) + + @staticmethod + def load_hypers(filepath): + hypers = {} + with h5py.File(filepath, "r") as f: + for model_key in f.keys(): + hypers[model_key] = OrderedDict() + for param_key, param_value in f[model_key].items(): + hypers[model_key][param_key] = torch.tensor(np.atleast_1d(param_value[()])) + return hypers + + def train_models(self, **kwargs): + t0 = ttime.monotonic() + for obj in self.objectives: + model = obj.model + botorch.fit.fit_gpytorch_mll(gpytorch.mlls.ExactMarginalLogLikelihood(model.likelihood, model), **kwargs) + botorch.fit.fit_gpytorch_mll( + gpytorch.mlls.ExactMarginalLogLikelihood(self.classifier.likelihood, self.classifier), **kwargs + ) + if self.verbose: + print(f"trained models in {ttime.monotonic() - t0:.01f} seconds") + + @property + def acq_func_info(self): + entries = [] + for k, d in self.acq_func_config.items(): + ret = "" + ret += f'{d["pretty_name"].upper()} (identifiers: {d["identifiers"]})\n' + ret += f'-> {d["description"]}' + entries.append(ret) + + print("\n\n".join(entries)) + + @property + def inputs(self): + return self.table.loc[:, self.dofs.device_names].astype(float) + + @property + def active_inputs(self): + return self.table.loc[:, self.dofs.subset(active=True).device_names].astype(float) + + # @property + # def acquisition_inputs(self): + # return self.table.loc[:, self.dofs.subset(active=True, read_only=False).names].astype(float) + + # @property + # def best_inputs(self): + # return self.acquisition_inputs.values[np.nanargmax(self.scalarized_objectives)] + + # def go_to(self, acquisition_inputs): + # args = [] + # for dof, value in zip(self.dofs.subset(read_only=False), np.atleast_1d(acquisition_inputs).T): + # args.append(dof.device) + # args.append(value) + # yield from bps.mv(*args) + + def go_to_best(self): + yield from self.go_to(self.best_inputs) + + def plot_objectives(self, **kwargs): + if len(self.dofs.subset(active=True, read_only=False)) == 1: + plotting._plot_objs_one_dof(self, **kwargs) + else: + plotting._plot_objs_many_dofs(self, **kwargs) + + def plot_acquisition(self, acq_funcs=["ei"], **kwargs): + if len(self.dofs.subset(active=True, read_only=False)) == 1: + plotting._plot_acq_one_dof(self, acq_funcs=acq_funcs, **kwargs) + + else: + plotting._plot_acq_many_dofs(self, acq_funcs=acq_funcs, **kwargs) + + def plot_validity(self, **kwargs): + if len(self.dofs.subset(active=True, read_only=False)) == 1: + plotting._plot_valid_one_dof(self, **kwargs) + + else: + plotting._plot_valid_many_dofs(self, **kwargs) + + # def plot_history(self, x_key="index", show_all_objs=False): + # x = getattr(self.table, x_key).values + + # num_obj_plots = 1 + # if show_all_objs: + # num_obj_plots = self.n_objs + 1 + + # self.n_objs + 1 if self.n_objs > 1 else 1 + + # hist_fig, hist_axes = plt.subplots( + # num_obj_plots, 1, figsize=(6, 4 * num_obj_plots), sharex=True, constrained_layout=True, dpi=200 + # ) + # hist_axes = np.atleast_1d(hist_axes) + + # unique_strategies, acq_func_index, acq_func_inverse = np.unique( + # self.table.acq_func, return_index=True, return_inverse=True + # ) + + # sample_colors = np.array(DEFAULT_COLOR_LIST)[acq_func_inverse] + + # if show_all_objs: + # for obj_index, obj in enumerate(self.objectives): + # y = self.table.loc[:, f"{obj.key}_fitness"].values + # hist_axes[obj_index].scatter(x, y, c=sample_colors) + # hist_axes[obj_index].plot(x, y, lw=5e-1, c="k") + # hist_axes[obj_index].set_ylabel(obj.key) + + # y = self.scalarized_objectives + + # cummax_y = np.array([np.nanmax(y[: i + 1]) for i in range(len(y))]) + + # hist_axes[-1].scatter(x, y, c=sample_colors) + # hist_axes[-1].plot(x, y, lw=5e-1, c="k") + + # hist_axes[-1].plot(x, cummax_y, lw=5e-1, c="k", ls=":") + + # hist_axes[-1].set_ylabel("total_fitness") + # hist_axes[-1].set_xlabel(x_key) + + # handles = [] + # for i_acq_func, acq_func in enumerate(unique_strategies): + # # i_acq_func = np.argsort(acq_func_index)[i_handle] + # handles.append(Patch(color=DEFAULT_COLOR_LIST[i_acq_func], label=acq_func)) + # legend = hist_axes[0].legend(handles=handles, fontsize=8) + # legend.set_title("acquisition function") diff --git a/bloptools/bayesian/devices.py b/bloptools/bayesian/devices.py new file mode 100644 index 0000000..af08bfe --- /dev/null +++ b/bloptools/bayesian/devices.py @@ -0,0 +1,239 @@ +import time as ttime +import uuid +from collections.abc import Sequence +from typing import Tuple, Union + +import numpy as np +import pandas as pd +from ophyd import Signal, SignalRO + +DEFAULT_BOUNDS = (-5.0, +5.0) +DOF_FIELDS = ["name", "readback", "lower_limit", "upper_limit", "units", "active", "read_only", "tags"] + +numeric = Union[float, int] + + +class ReadOnlyError(Exception): + ... + + +def _validate_dofs(dofs): + dof_names = [dof.name for dof in dofs] + + # check that dof names are unique + unique_dof_names, counts = np.unique(dof_names, return_counts=True) + duplicate_dof_names = unique_dof_names[counts > 1] + if len(duplicate_dof_names) > 0: + raise ValueError(f'Duplicate name(s) in supplied dofs: "{duplicate_dof_names}"') + + return list(dofs) + + +class DOF: + def __init__( + self, + device: Signal = None, + limits: Tuple[numeric, numeric] = (-10.0, 10.0), + name: str = None, + units: str = None, + read_only: bool = None, + active: bool = True, + tags: list = [], + latent_group=None, + ): + self.uuid = str(uuid.uuid4()) + self.name = name if name is not None else device.name if hasattr(device, "name") else self.uuid + self.device = device if device is not None else Signal(name=self.name) + self.limits = limits + self.read_only = read_only if read_only is not None else True if isinstance(device, SignalRO) else False + self.units = units + self.tags = tags + self.active = active + self.latent_group = latent_group if latent_group is not None else str(uuid.uuid4()) + + @property + def lower_limit(self): + return self.limits[0] + + @property + def upper_limit(self): + return self.limits[1] + + @property + def readback(self): + return self.device.read()[self.device.name]["value"] + + @property + def summary(self) -> pd.Series: + series = pd.Series(index=DOF_FIELDS) + for attr in series.index: + series[attr] = getattr(self, attr) + return series + + @property + def label(self) -> str: + return f"{self.name}{f' [{self.units}]' if self.units is not None else ''}" + + @property + def has_model(self): + return hasattr(self, "model") + + +class DOFList(Sequence): + def __init__(self, dofs: list = []): + _validate_dofs(dofs) + self.dofs = dofs + + def __getitem__(self, i): + return self.dofs[i] + + def __len__(self): + return len(self.dofs) + + def __repr__(self): + return self.summary.__repr__() + + # def _repr_html_(self): + # return self.summary._repr_html_() + + @property + def summary(self) -> pd.DataFrame: + table = pd.DataFrame(columns=DOF_FIELDS) + for i, dof in enumerate(self.dofs): + for attr in table.columns: + table.loc[i, attr] = getattr(dof, attr) + + # convert dtypes + for attr in ["readback", "lower_limit", "upper_limit"]: + table[attr] = table[attr].astype(float) + + for attr in ["read_only", "active"]: + table[attr] = table[attr].astype(bool) + + return table + + @property + def names(self) -> list: + return [dof.name for dof in self.dofs] + + @property + def devices(self) -> list: + return [dof.device for dof in self.dofs] + + @property + def device_names(self) -> list: + return [dof.device.name for dof in self.dofs] + + @property + def lower_limits(self) -> np.array: + return np.array([dof.lower_limit for dof in self.dofs]) + + @property + def upper_limits(self) -> np.array: + return np.array([dof.upper_limit for dof in self.dofs]) + + @property + def limits(self) -> np.array: + """ + Returns a (n_dof, 2) array of bounds. + """ + return np.c_[self.lower_limits, self.upper_limits] + + @property + def readback(self) -> np.array: + return np.array([dof.readback for dof in self.dofs]) + + def add(self, dof): + _validate_dofs([*self.dofs, dof]) + self.dofs.append(dof) + + def _dof_read_only_mask(self, read_only=None): + return [dof.read_only == read_only if read_only is not None else True for dof in self.dofs] + + def _dof_active_mask(self, active=None): + return [dof.active == active if active is not None else True for dof in self.dofs] + + def _dof_tags_mask(self, tags=[]): + return [np.isin(dof["tags"], tags).any() if tags else True for dof in self.dofs] + + def _dof_mask(self, active=None, read_only=None, tags=[]): + return [ + (k and m and t) + for k, m, t in zip(self._dof_read_only_mask(read_only), self._dof_active_mask(active), self._dof_tags_mask(tags)) + ] + + def subset(self, active=None, read_only=None, tags=[]): + return DOFList([dof for dof, m in zip(self.dofs, self._dof_mask(active, read_only, tags)) if m]) + + # def _subset_devices(self, read_only=None, active=None, tags=[]): + # return [dof["device"] for dof in self._subset_dofs(read_only, active, tags)] + + # def _read_subset_devices(self, read_only=None, active=None, tags=[]): + # return [device.read()[device.name]["value"] for device in self._subset_devices(read_only, active, tags)] + + # def _subset_dof_names(self, read_only=None, active=None, tags=[]): + # return [device.name for device in self._subset_devices(read_only, active, tags)] + + # def _subset_dof_limits(self, read_only=None, active=None, tags=[]): + # dofs_subset = self._subset_dofs(read_only, active, tags) + # if len(dofs_subset) > 0: + # return torch.tensor([dof["limits"] for dof in dofs_subset], dtype=torch.float64).T + # return torch.empty((2, 0)) + + def activate(self, read_only=None, active=None, tags=[]): + for dof in self._subset_dofs(read_only, active, tags): + dof.active = True + + def deactivate(self, read_only=None, active=None, tags=[]): + for dof in self._subset_dofs(read_only, active, tags): + dof.active = False + + +class BrownianMotion(SignalRO): + """ + Read-only degree of freedom simulating brownian motion + """ + + def __init__(self, name=None, theta=0.95, *args, **kwargs): + name = name if name is not None else str(uuid.uuid4()) + + super().__init__(name=name, *args, **kwargs) + + self.theta = theta + self.old_t = ttime.monotonic() + self.old_y = 0.0 + + def get(self): + new_t = ttime.monotonic() + alpha = self.theta ** (new_t - self.old_t) + new_y = alpha * self.old_y + np.sqrt(1 - alpha**2) * np.random.standard_normal() + + self.old_t = new_t + self.old_y = new_y + return new_y + + +class TimeReadback(SignalRO): + """ + Returns the current timestamp. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get(self): + return ttime.time() + + +class ConstantReadback(SignalRO): + """ + Returns a constant every time you read it (more useful than you'd think). + """ + + def __init__(self, constant=1, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.constant = constant + + def get(self): + return self.constant diff --git a/bloptools/bayesian/objective.py b/bloptools/bayesian/objective.py new file mode 100644 index 0000000..0636dcd --- /dev/null +++ b/bloptools/bayesian/objective.py @@ -0,0 +1,107 @@ +from collections.abc import Sequence +from typing import Tuple, Union + +import numpy as np +import pandas as pd + +numeric = Union[float, int] + +DEFAULT_MINIMUM_SNR = 1e1 +OBJ_FIELDS = ["name", "key", "limits", "weight", "minimize", "log"] + + +class DuplicateKeyError(ValueError): + ... + + +def _validate_objectives(objectives): + keys = [obj.key for obj in objectives] + unique_keys, counts = np.unique(keys, return_counts=True) + duplicate_keys = unique_keys[counts > 1] + if len(duplicate_keys) > 0: + raise DuplicateKeyError(f'Duplicate key(s) in supplied objectives: "{duplicate_keys}"') + + +class Objective: + def __init__( + self, + key: str, + name: str = None, + minimize: bool = False, + log: bool = False, + weight: numeric = 1.0, + limits: Tuple[numeric, numeric] = None, + min_snr: numeric = DEFAULT_MINIMUM_SNR, + ): + self.name = name if name is not None else key + self.key = key + self.minimize = minimize + self.log = log + self.weight = weight + self.min_snr = min_snr + + if limits is not None: + self.limits = limits + elif self.log: + self.limits = (0, np.inf) + else: + self.limits = (-np.inf, np.inf) + + @property + def label(self): + return f"{'neg ' if self.minimize else ''}{'log ' if self.log else ''}{self.name}" + + @property + def summary(self): + series = pd.Series() + for col in OBJ_FIELDS: + series[col] = getattr(self, col) + return series + + def __repr__(self): + return self.params.__repr__() + + @property + def has_model(self): + return hasattr(self, "model") + + +class ObjectiveList(Sequence): + def __init__(self, objectives: list = []): + _validate_objectives(objectives) + self.objectives = objectives + + def __getitem__(self, i): + return self.objectives[i] + + def __len__(self): + return len(self.objectives) + + @property + def summary(self): + summary = pd.DataFrame(columns=OBJ_FIELDS) + for i, obj in enumerate(self.objectives): + for col in summary.columns: + summary.loc[i, col] = getattr(obj, col) + + # convert dtypes + for attr in ["minimize", "log"]: + summary[attr] = summary[attr].astype(bool) + + def __repr__(self): + return self.summary.__repr__() + + @property + def names(self) -> list: + return [obj.name for obj in self.objectives] + + @property + def weights(self) -> np.array: + """ + Returns an array of the objective weights. + """ + return np.array([obj.weight for obj in self.objectives]) + + def add(self, objective): + _validate_objectives([*self.objectives, objective]) + self.objectives.append(objective) diff --git a/bloptools/bayesian/plotting.py b/bloptools/bayesian/plotting.py new file mode 100644 index 0000000..57e1998 --- /dev/null +++ b/bloptools/bayesian/plotting.py @@ -0,0 +1,376 @@ +import matplotlib as mpl +import matplotlib.pyplot as plt +import numpy as np + +from . import acquisition + +DEFAULT_COLOR_LIST = ["dodgerblue", "tomato", "mediumseagreen", "goldenrod"] +DEFAULT_COLORMAP = "magma" +DEFAULT_SCATTER_SIZE = 16 + +MAX_TEST_INPUTS = 2**11 + + +def _plot_objs_one_dof(agent, size=16, lw=1e0): + agent.obj_fig, agent.obj_axes = plt.subplots( + agent.n_objs, + 1, + figsize=(6, 4 * agent.n_objs), + sharex=True, + constrained_layout=True, + ) + + agent.obj_axes = np.atleast_1d(agent.obj_axes) + + x_dof = agent.dofs[0] + x_values = agent.table.loc[:, x_dof.device.name].values + + for obj_index, obj in enumerate(agent.objectives): + obj_fitness = agent._get_objective_targets(obj_index) + + color = DEFAULT_COLOR_LIST[obj_index] + + test_inputs = agent.test_inputs_grid() + test_x = test_inputs[..., 0].detach().numpy() + + test_posterior = obj.model.posterior(test_inputs) + test_mean = test_posterior.mean[..., 0].detach().numpy() + test_sigma = test_posterior.variance.sqrt()[..., 0].detach().numpy() + + agent.obj_axes[obj_index].scatter(x_values, obj_fitness, s=size, color=color) + + for z in [0, 1, 2]: + agent.obj_axes[obj_index].fill_between( + test_x.ravel(), + (test_mean - z * test_sigma).ravel(), + (test_mean + z * test_sigma).ravel(), + lw=lw, + color=color, + alpha=0.5**z, + ) + + agent.obj_axes[obj_index].set_xlim(*x_dof.limits) + agent.obj_axes[obj_index].set_xlabel(x_dof.label) + agent.obj_axes[obj_index].set_ylabel(obj.label) + + +def _plot_objs_many_dofs(agent, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, gridded=None, size=32, grid_zoom=1): + if gridded is None: + gridded = len(agent.dofs.subset(active=True, read_only=False)) == 2 + + agent.obj_fig, agent.obj_axes = plt.subplots( + len(agent.objectives), + 3, + figsize=(10, 4 * len(agent.objectives)), + constrained_layout=True, + dpi=256, + ) + + agent.obj_axes = np.atleast_2d(agent.obj_axes) + + x_dof, y_dof = agent.dofs[axes[0]], agent.dofs[axes[1]] + x_values = agent.table.loc[:, x_dof.device.name].values + y_values = agent.table.loc[:, y_dof.device.name].values + + # test_inputs has shape (*input_shape, 1, n_active_dofs) + test_inputs = agent.test_inputs_grid() if gridded else agent.test_inputs(n=1024) + test_x = test_inputs[..., 0, axes[0]].detach().numpy() + test_y = test_inputs[..., 0, axes[1]].detach().numpy() + + for obj_index, obj in enumerate(agent.objectives): + obj_values = agent._get_objective_targets(obj_index) + + obj_vmin, obj_vmax = np.nanpercentile(obj_values, q=[1, 99]) + obj_norm = mpl.colors.Normalize(obj_vmin, obj_vmax) + + data_ax = agent.obj_axes[obj_index, 0].scatter(x_values, y_values, c=obj_values, s=size, norm=obj_norm, cmap=cmap) + + # mean and sigma will have shape (*input_shape,) + test_posterior = obj.model.posterior(test_inputs) + test_mean = test_posterior.mean[..., 0, 0].detach().numpy() + test_sigma = test_posterior.variance.sqrt()[..., 0, 0].detach().numpy() + + if gridded: + _ = agent.obj_axes[obj_index, 1].pcolormesh( + test_x, + test_y, + test_mean, + shading=shading, + cmap=cmap, + norm=obj_norm, + ) + sigma_ax = agent.obj_axes[obj_index, 2].pcolormesh( + test_x, + test_y, + test_sigma, + shading=shading, + cmap=cmap, + norm=mpl.colors.LogNorm(), + ) + + else: + _ = agent.obj_axes[obj_index, 1].scatter( + test_x, + test_y, + c=test_mean, + s=size, + norm=obj_norm, + cmap=cmap, + ) + sigma_ax = agent.obj_axes[obj_index, 2].scatter( + test_x, + test_y, + c=test_sigma, + s=size, + cmap=cmap, + norm=mpl.colors.LogNorm(), + ) + + obj_cbar = agent.obj_fig.colorbar( + data_ax, ax=agent.obj_axes[obj_index, :2], location="bottom", aspect=32, shrink=0.8 + ) + err_cbar = agent.obj_fig.colorbar( + sigma_ax, ax=agent.obj_axes[obj_index, 2], location="bottom", aspect=32, shrink=0.8 + ) + obj_cbar.set_label(obj.label) + err_cbar.set_label(f"{obj.label} error") + + col_names = ["samples", "posterior mean", "posterior std. dev."] + + pad = 5 + + for column_index, ax in enumerate(agent.obj_axes[0]): + ax.annotate( + col_names[column_index], + xy=(0.5, 1), + xytext=(0, pad), + color="k", + xycoords="axes fraction", + textcoords="offset points", + size="large", + ha="center", + va="baseline", + ) + + if agent.n_objs > 1: + for row_index, ax in enumerate(agent.obj_axes[:, 0]): + ax.annotate( + agent.objectives[row_index].name, + xy=(0, 0.5), + xytext=(-ax.yaxis.labelpad - pad, 0), + color="k", + xycoords=ax.yaxis.label, + textcoords="offset points", + size="large", + ha="right", + va="center", + rotation=90, + ) + + for ax in agent.obj_axes.ravel(): + ax.set_xlabel(x_dof.label) + ax.set_ylabel(y_dof.label) + ax.set_xlim(*x_dof.limits) + ax.set_ylim(*y_dof.limits) + + +def _plot_acq_one_dof(agent, acq_funcs, lw=1e0, **kwargs): + agent.acq_fig, agent.acq_axes = plt.subplots( + 1, + len(acq_funcs), + figsize=(4 * len(acq_funcs), 4), + sharex=True, + constrained_layout=True, + ) + + agent.acq_axes = np.atleast_1d(agent.acq_axes) + x_dof = agent.dofs[0] + + test_inputs = agent.test_inputs_grid() + + for iacq_func, acq_func_identifier in enumerate(acq_funcs): + color = DEFAULT_COLOR_LIST[iacq_func] + + acq_func, acq_func_meta = acquisition.get_acquisition_function(agent, acq_func_identifier) + test_acqf = acq_func(test_inputs).detach().numpy() + + agent.acq_axes[iacq_func].plot(test_inputs.squeeze(), test_acqf, lw=lw, color=color) + + agent.acq_axes[iacq_func].set_xlim(*x_dof.limits) + agent.acq_axes[iacq_func].set_xlabel(x_dof.label) + agent.acq_axes[iacq_func].set_ylabel(acq_func_meta["name"]) + + +def _plot_acq_many_dofs( + agent, acq_funcs, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, gridded=None, size=16, **kwargs +): + agent.acq_fig, agent.acq_axes = plt.subplots( + 1, + len(acq_funcs), + figsize=(4 * len(acq_funcs), 4), + sharex=True, + sharey=True, + constrained_layout=True, + ) + + if gridded is None: + gridded = len(agent.dofs.subset(active=True, read_only=False)) == 2 + + agent.acq_axes = np.atleast_1d(agent.acq_axes) + + x_dof, y_dof = agent.dofs[axes[0]], agent.dofs[axes[1]] + + # test_inputs has shape (..., 1, n_active_dofs) + test_inputs = agent.test_inputs_grid() if gridded else agent.test_inputs(n=1024) + test_x = test_inputs[..., 0, axes[0]].detach().numpy() + test_y = test_inputs[..., 0, axes[1]].detach().numpy() + + for iacq_func, acq_func_identifier in enumerate(acq_funcs): + acq_func, acq_func_meta = acquisition.get_acquisition_function(agent, acq_func_identifier) + + test_acqf = acq_func(test_inputs).detach().numpy() + + if gridded: + agent.acq_axes[iacq_func].set_title(acq_func_meta["name"]) + obj_ax = agent.acq_axes[iacq_func].pcolormesh( + test_x, + test_y, + test_acqf, + shading=shading, + cmap=cmap, + ) + + agent.acq_fig.colorbar(obj_ax, ax=agent.acq_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) + + else: + agent.acq_axes[iacq_func].set_title(acq_func_meta["name"]) + obj_ax = agent.acq_axes[iacq_func].scatter( + test_x, + test_y, + c=test_acqf, + ) + + agent.acq_fig.colorbar(obj_ax, ax=agent.acq_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) + + for ax in agent.acq_axes.ravel(): + ax.set_xlabel(x_dof.label) + ax.set_ylabel(y_dof.label) + ax.set_xlim(*x_dof.limits) + ax.set_ylim(*y_dof.limits) + + +def _plot_valid_one_dof(agent, size=16, lw=1e0): + agent.valid_fig, agent.valid_ax = plt.subplots(1, 1, figsize=(6, 4 * agent.n_objs), constrained_layout=True) + + x_dof = agent.dofs[0] + x_values = agent.table.loc[:, x_dof.device.name].values + + test_inputs = agent.test_inputs_grid() + constraint = agent.constraint(test_inputs)[..., 0] + + agent.valid_ax.scatter(x_values, agent.all_objectives_valid, s=size) + agent.valid_ax.plot(test_inputs.squeeze(), constraint, lw=lw) + agent.valid_ax.set_xlim(*x_dof.limits) + + +def _plot_valid_many_dofs(agent, axes=[0, 1], shading="nearest", cmap=DEFAULT_COLORMAP, size=16, gridded=None): + agent.valid_fig, agent.valid_axes = plt.subplots(1, 2, figsize=(8, 4 * agent.n_objs), constrained_layout=True) + + if gridded is None: + gridded = len(agent.dofs.subset(active=True, read_only=False)) == 2 + + x_dof, y_dof = agent.dofs[axes[0]], agent.dofs[axes[1]] + + # test_inputs has shape (..., 1, n_active_dofs) + test_inputs = agent.test_inputs_grid() if gridded else agent.test_inputs(n=1024) + test_x = test_inputs[..., 0, axes[0]].detach().numpy() + test_y = test_inputs[..., 0, axes[1]].detach().numpy() + + constraint = agent.constraint(test_inputs)[..., 0] + + if gridded: + _ = agent.valid_axes[1].pcolormesh( + test_x, + test_y, + constraint, + shading=shading, + cmap=cmap, + vmin=0, + vmax=0, + ) + + else: + _ = agent.valid_axes[1].scatter( + test_x, + test_y, + c=constraint, + s=size, + cmap=cmap, + vmin=0, + vmax=0, + ) + + for ax in agent.acq_axes.ravel(): + ax.set_xlabel(x_dof.label) + ax.set_ylabel(y_dof.label) + ax.set_xlim(*x_dof.limits) + ax.set_ylim(*y_dof.limits) + + # data_ax = agent.valid_axes[0].scatter( + # *agent.acquisition_inputs.values.T[:2], + # c=agent.all_objectives_valid, + # s=size, + # vmin=0, + # vmax=1, + # cmap=cmap, + # ) + + # x = agent.test_inputs_grid().squeeze() if gridded else agent.test_inputs(n=MAX_TEST_INPUTS) + # *input_shape, input_dim = x.shape + # constraint = agent.classifier.probabilities(x.reshape(-1, 1, input_dim))[..., -1].reshape(input_shape) + + # if gridded: + # agent.valid_axes[1].pcolormesh( + # x[..., 0].detach().numpy(), + # x[..., 1].detach().numpy(), + # constraint.detach().numpy(), + # shading=shading, + # cmap=cmap, + # vmin=0, + # vmax=1, + # ) + + # # agent.acq_fig.colorbar(obj_ax, ax=agent.valid_axes[iacq_func], location="bottom", aspect=32, shrink=0.8) + + # else: + # # agent.valid_axes.set_title(acq_func_meta["name"]) + # agent.valid_axes[1].scatter( + # x.detach().numpy()[..., axes[0]], + # x.detach().numpy()[..., axes[1]], + # c=constraint.detach().numpy(), + # ) + + # agent.valid_fig.colorbar(data_ax, ax=agent.valid_axes[:2], location="bottom", aspect=32, shrink=0.8) + + # for ax in agent.valid_axes.ravel(): + # ax.set_xlim(*agent.dofs.subset(active=True, read_only=False)[axes[0]].limits) + # ax.set_ylim(*agent.dofs.subset(active=True, read_only=False)[axes[1]].limits) + + +def inspect_beam(agent, index, border=None): + im = agent.images[index] + + x_min, x_max, y_min, y_max, width_x, width_y = agent.table.loc[ + index, ["x_min", "x_max", "y_min", "y_max", "width_x", "width_y"] + ] + + bbx = np.array([x_min, x_max])[[0, 0, 1, 1, 0]] + bby = np.array([y_min, y_max])[[0, 1, 1, 0, 0]] + + plt.figure() + plt.imshow(im, cmap="gray_r") + plt.plot(bbx, bby, lw=4e0, c="r") + + if border is not None: + plt.xlim(x_min - border * width_x, x_min + border * width_x) + plt.ylim(y_min - border * width_y, y_min + border * width_y) diff --git a/bloptools/devices.py b/bloptools/devices.py deleted file mode 100644 index 0afc20e..0000000 --- a/bloptools/devices.py +++ /dev/null @@ -1,78 +0,0 @@ -import time as ttime - -import numpy as np -from ophyd import Signal, SignalRO - -DEFAULT_BOUNDS = (-5.0, +5.0) - - -class ReadOnlyError(Exception): - ... - - -class DOF(Signal): - """ - Degree of freedom - """ - - ... - - -class DOFRO(DOF): - """ - Read-only degree of freedom - """ - - def put(self, value, *, timestamp=None, force=False): - raise ReadOnlyError(f'Cannot put, DOF "{self.name}" is read-only!') - - def set(self, value, *, timestamp=None, force=False): - raise ReadOnlyError(f'Cannot set, DOF "{self.name}" is read-only!') - - -class BrownianMotion(DOFRO): - """ - Read-only degree of freedom simulating brownian motion - """ - - def __init__(self, theta=0.95, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.theta = theta - self.old_t = ttime.monotonic() - self.old_y = 0.0 - - def get(self): - new_t = ttime.monotonic() - alpha = self.theta ** (new_t - self.old_t) - new_y = alpha * self.old_y + np.sqrt(1 - alpha**2) * np.random.standard_normal() - - self.old_t = new_t - self.old_y = new_y - return new_y - - -class TimeReadback(SignalRO): - """ - Returns the current timestamp. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def get(self): - return ttime.time() - - -class ConstantReadback(SignalRO): - """ - Returns a constant every time you read it (more useful than you'd think). - """ - - def __init__(self, constant=1, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.constant = constant - - def get(self): - return self.constant diff --git a/bloptools/tests/conftest.py b/bloptools/tests/conftest.py index 07cc958..33b3fbc 100644 --- a/bloptools/tests/conftest.py +++ b/bloptools/tests/conftest.py @@ -8,9 +8,8 @@ from databroker import Broker from ophyd.utils import make_dir_tree -from bloptools.bayesian import Agent - -from .. import devices, test_functions +from bloptools import test_functions +from bloptools.bayesian import DOF, Agent, Objective @pytest.fixture(scope="function") @@ -44,46 +43,6 @@ def RE(db): return RE -@pytest.fixture(scope="function") -def db_with_bluesky(): - from sirepo_bluesky.madx_handler import MADXFileHandler - from sirepo_bluesky.shadow_handler import ShadowFileHandler - from sirepo_bluesky.srw_handler import SRWFileHandler - - """Return a data broker""" - # MongoDB backend: - db = Broker.named("local") # mongodb backend - try: - databroker.assets.utils.install_sentinels(db.reg.config, version=1) - except Exception: - pass - - db.reg.register_handler("srw", SRWFileHandler, overwrite=True) - db.reg.register_handler("shadow", ShadowFileHandler, overwrite=True) - db.reg.register_handler("SIREPO_FLYER", SRWFileHandler, overwrite=True) - db.reg.register_handler("madx", MADXFileHandler, overwrite=True) - - return db - - -@pytest.fixture(scope="function") -def RE_with_bluesky(db_with_bluesky): - loop = asyncio.new_event_loop() - loop.set_debug(True) - RE = RunEngine({}, loop=loop) - RE.subscribe(db_with_bluesky.insert) - - bec = best_effort.BestEffortCallback() - RE.subscribe(bec) - - bec.disable_baseline() - bec.disable_heading() - bec.disable_table() - bec.disable_plots() - - return RE - - @pytest.fixture(scope="function") def agent(db): """ @@ -91,17 +50,15 @@ def agent(db): """ dofs = [ - {"device": devices.DOF(name="x1"), "limits": (-5, 5), "kind": "active"}, - {"device": devices.DOF(name="x2"), "limits": (-5, 5), "kind": "active"}, + DOF(name="x1", limits=(-8.0, 8.0)), + DOF(name="x2", limits=(-8.0, 8.0)), ] - tasks = [ - {"key": "himmelblau", "kind": "minimize"}, - ] + objectives = [Objective(key="himmelblau", minimize=True)] agent = Agent( dofs=dofs, - tasks=tasks, + objectives=objectives, digestion=test_functions.constrained_himmelblau_digestion, db=db, verbose=True, @@ -127,18 +84,15 @@ def digestion(db, uid): return products dofs = [ - {"device": devices.DOF(name="x1"), "limits": (-5, 5), "kind": "active"}, - {"device": devices.DOF(name="x2"), "limits": (-5, 5), "kind": "active"}, + DOF(name="x1", limits=(-5.0, 5.0)), + DOF(name="x2", limits=(-5.0, 5.0)), ] - tasks = [ - {"key": "ST1", "kind": "minimize"}, - {"key": "ST2", "kind": "minimize"}, - ] + objectives = [Objective(key="ST1", minimize=True), Objective(key="ST2", minimize=True)] agent = Agent( dofs=dofs, - tasks=tasks, + objectives=objectives, digestion=digestion, db=db, verbose=True, @@ -152,19 +106,3 @@ def digestion(db, uid): def make_dirs(): root_dir = "/tmp/sirepo-bluesky-data" _ = make_dir_tree(datetime.datetime.now().year, base_path=root_dir) - - -# @pytest.fixture(scope="function") -# def srw_tes_simulation(make_dirs): -# from sirepo_bluesky.sirepo_bluesky import SirepoBluesky -# connection = SirepoBluesky("http://localhost:8000") -# data, _ = connection.auth("srw", "00000002") -# return connection - - -# @pytest.fixture(scope="function") -# def shadow_tes_simulation(make_dirs): -# from sirepo_bluesky.sirepo_bluesky import SirepoBluesky -# connection = SirepoBluesky("http://localhost:8000") -# data, _ = connection.auth("shadow", "00000002") -# return connection diff --git a/bloptools/tests/test_acq_funcs.py b/bloptools/tests/test_acq_funcs.py index a65f48f..cd824b3 100644 --- a/bloptools/tests/test_acq_funcs.py +++ b/bloptools/tests/test_acq_funcs.py @@ -1,28 +1,9 @@ import pytest -from bloptools import devices, test_functions -from bloptools.bayesian import Agent - @pytest.mark.test_func -def test_acq_funcs_single_task(RE, db): - dofs = [ - {"device": devices.DOF(name="x1"), "limits": (-8, 8), "kind": "active"}, - {"device": devices.DOF(name="x2"), "limits": (-8, 8), "kind": "active"}, - ] - - tasks = [ - {"key": "himmelblau", "kind": "minimize"}, - ] - - agent = Agent( - dofs=dofs, - tasks=tasks, - digestion=test_functions.constrained_himmelblau_digestion, - db=db, - ) - - RE(agent.learn("qr", n=64)) +def test_acq_funcs_single_task(agent, RE, db): + RE(agent.learn("qr", n=32)) # analytic methods RE(agent.learn("ei", n=1)) diff --git a/bloptools/tests/test_passive_dofs.py b/bloptools/tests/test_passive_dofs.py index cf81020..d4216dc 100644 --- a/bloptools/tests/test_passive_dofs.py +++ b/bloptools/tests/test_passive_dofs.py @@ -1,25 +1,25 @@ import pytest -from bloptools import devices, test_functions -from bloptools.bayesian import Agent +from bloptools import test_functions +from bloptools.bayesian import DOF, Agent, BrownianMotion, Objective @pytest.mark.test_func def test_passive_dofs(RE, db): dofs = [ - {"device": devices.DOF(name="x1"), "limits": (-5, 5), "kind": "active"}, - {"device": devices.DOF(name="x2"), "limits": (-5, 5), "kind": "active"}, - {"device": devices.BrownianMotion(name="brownian1"), "limits": (-2, 2), "kind": "passive"}, - {"device": devices.BrownianMotion(name="brownian2"), "limits": (-2, 2), "kind": "passive"}, + DOF(name="x1", limits=(-5.0, 5.0)), + DOF(name="x2", limits=(-5.0, 5.0)), + DOF(BrownianMotion(name="brownian1"), read_only=True), + DOF(BrownianMotion(name="brownian1"), read_only=True), ] - tasks = [ - {"key": "himmelblau", "kind": "minimize"}, + objectives = [ + Objective(key="himmelblau", minimize=True), ] agent = Agent( dofs=dofs, - tasks=tasks, + objectives=objectives, digestion=test_functions.constrained_himmelblau_digestion, db=db, verbose=True,