Source code for HybridQRBM.samplers

"""Samplers for the RBM training."""

__copyright__ = "Dynex Developers, 2023"

import pickle
from pathlib import Path
from hashlib import md5
import numpy as np
import numpy.typing as npt
import dynex
import dimod

[docs]class Sampler: """Base class for samplers.""" def __init__(self, num_gibbs_updates: int): """ Initialize the sampler. Parameters ---------- num_gibbs_updates : int Number of Gibbs updates to perform when sampling from the model distribution. For example, for a sampler implementing contrastive divergence training, setting this value to 1 will result in a standard CD-1 training. """ self.num_gibbs_updates = num_gibbs_updates self.rbm = None @staticmethod def _sigmoid(x): return 1 / (1 + np.exp(-x))
[docs] def infer(self, visible: npt.NDArray): """ Infer the hidden layer given the visible layer. Parameters ---------- visible : numpy.ndarray The visible layer. Shape (num_samples, num_visible). Returns ------- hidden : numpy.ndarray Binary hidden layer values sampled from their probability distribution. Shape (num_samples, num_hidden). prob_hidden : numpy.ndarray The probability for each unit of the hidden layer to be 1. Shape (num_samples, num_hidden). """ prob_hidden = self._sigmoid(visible @ self.rbm.weights + self.rbm.biases_hidden) hidden = (prob_hidden > self.rbm.rnd.random(prob_hidden.shape)).astype(np.int0) return hidden, prob_hidden
[docs] def generate(self, hidden: npt.NDArray): """ Generate the visible layer given the hidden layer. Parameters ---------- hidden : numpy.ndarray The hidden layer. Shape (num_samples, num_hidden). Returns ------- visible : numpy.ndarray Binary visible layer values sampled from their probability distribution. Shape (num_samples, num_visible). prob_visible : numpy.ndarray The probability for each unit of the visible layer to be 1. Shape (num_samples, num_visible). """ prob_visible = self._sigmoid(hidden @ self.rbm.weights.T + self.rbm.biases_visible) visible = (prob_visible > self.rbm.rnd.random(prob_visible.shape)).astype(np.int0) return visible, prob_visible
[docs] def sample(self, visible: npt.NDArray, **kwargs): """ Abstract method for sampling from the model distribution. Possibly with some clamped visible layer values. """ raise NotImplementedError
[docs] def gibbs_updates(self, visible: npt.NDArray, num_gibbs_updates=None): """ Perform Gibbs sampling starting from some visible layer values. Parameters ---------- visible : numpy.ndarray The initial visible layer values. Shape (num_samples, num_visible). num_gibbs_updates : int The number of full updates to perform. If None, the default for the sampler is performed. Returns ------- visible: numpy.ndarray The visible layer after the Gibbs sampling. Shape (num_samples, num_visible). prob_visible: numpy.ndarray The probability for each unit of the visible layer to be 1. Shape (num_samples, num_visible). hidden : numpy.ndarray Binary hidden layer values sampled from their probability distribution. Shape (num_samples, num_hidden). prob_hidden : numpy.ndarray The probability for each unit of the hidden layer to be 1. Shape (num_samples, num_hidden). """ if num_gibbs_updates is None: num_gibbs_updates = self.num_gibbs_updates for _ in range(num_gibbs_updates): hidden, prob_hidden = self.infer(visible) visible, prob_visible = self.generate(hidden) return visible, prob_visible, hidden, prob_hidden
[docs] def predict(self, features: npt.NDArray, num_particles: int = 10, num_gibbs_updates: int = None, **kwargs) -> npt.NDArray: """ Predict the features and labels for the given feature values. Parameters ---------- features : numpy.ndarray[np.int0] The feature values to predict the labels for. Shape (num_samples, num_features). num_particles : int Number of particles to use for the sampling, i.e. how may times to run the label sampling process for each sample. num_gibbs_updates : int, optional Number of Gibbs updates to perform for each particle. If not provided, self.num_gibbs_updates will be used. Returns ------- labels : numpy.ndarray[np.float] The predicted labels. Shape (num_samples, num_labels). features : numpy.ndarray[np.float] The predicted features. Shape (num_samples, num_features). """ if num_gibbs_updates is None: num_gibbs_updates = self.num_gibbs_updates num_samples = features.shape[0] num_features = features.shape[1] num_labels = self.rbm.num_visible - num_features label_predictions = np.zeros((num_samples, num_labels)) features_predictions = np.zeros((num_samples, num_features)) for _ in range(num_particles): output = np.zeros(label_predictions.shape) for _ in range(num_gibbs_updates): visible = np.hstack((features, output)) hidden, _ = self.infer(visible) visible, visible_prob = self.generate(hidden) error = ((visible - visible_prob) ** 2).mean(); output_prob = visible_prob[:,-num_labels:] output = visible[:,-num_labels:] if num_labels>0: label_predictions += output_prob / num_particles features_predictions += visible_prob[:,:num_features] / num_particles return label_predictions, features_predictions
[docs]class ContrastiveDivergenceSampler(Sampler): """Implements sampling for contrastive divergence training."""
[docs] def sample(self, visible: npt.NDArray, **kwargs) -> tuple: """ Implements sampling for contrastive divergence training. The expected use is to start with feature values for the visible layer and then perform a number (defined in the constructor) of Gibbs updates. If the number of Gibbs updates is 1, this implements the standard CD-1 training. Parameters ---------- visible : numpy.ndarray The visible layer values. Shape (num_samples, num_visible). Returns ------- visible, prob_visible, hidden, prob_hidden : tuple See gibbs_updates() method for details. """ return self.gibbs_updates(visible)
[docs]class PersistentContrastiveDivergenceSampler(Sampler): """Implements sampling for persistent contrastive divergence training.""" def __init__(self, *args, **kwargs): """Persistend contrastive divergence training saves the state of the imaginary particles between updates. The batch size is not known at sampler construction, so values will be initialized later.""" super().__init__(*args, **kwargs) self.visible = None
[docs] def sample(self, visible, **kwargs) -> tuple: """ Performs a number of Gibbs updates, starting from the state of the visible layer after the previous update. Parameters ---------- visible : numpy.ndarray The visible layer values are always passed from the RBM. For this sampler it is only used to determine the shape of the sample. Returns ------- visible, prob_visible, hidden, prob_hidden : tuple See gibbs_updates() method for details. """ if self.visible is None: self.visible = (self.rbm.rnd.random(visible.shape) > 0.5).astype(np.int0) visible, prob_visible, hidden, prob_hidden = self.gibbs_updates(self.visible) self.visible = visible return visible, prob_visible, hidden, prob_hidden
[docs]class NaiveSampler(Sampler): """Implements sampling for naive training."""
[docs] def sample(self, visible, **kwargs): """ Implements sampling for naive training. The particles (Markov chains) for the negative phase are initialized to random values at the beginning of each batch update. If the Markov chains are properly burnt in, the sampled particles represent the model distribution better than those from either contrastive divergence or persistent contrastive divergence; howevere the required number of Gibbs updates to burn in the Markov chains is unknown and likely high. Parameters ---------- visible : numpy.ndarray The visible layer values are always passed from the RBM. For this sampler it is only used to determine the shape of the sample. Returns ------- visible, prob_visible, hidden, prob_hidden : tuple See gibbs_updates() method for details. """ visible = (self.rbm.rnd.random(visible.shape) > 0.5).astype(np.int0) return self.gibbs_updates(visible)
[docs]class DynexSampler(Sampler): def __init__(self, num_reads = 100, annealing_time = 300, mainnet=False, clones=1, minimum_stepsize = 0.00000006, logging=True, debugging=False, num_gibbs_updates=0, **kwargs): super().__init__(num_gibbs_updates=num_gibbs_updates, **kwargs) self.mainnet = mainnet; self.logging = logging; self.num_reads = num_reads; self.annealing_time = annealing_time; self.debugging = debugging; self.minimum_stepsize = minimum_stepsize; self.clones = clones; def _sample_dynex(self, qubo, num_particles): # BQM from QUBO: bqm = dimod.BinaryQuadraticModel.from_qubo(qubo, 0); model = dynex.BQM(bqm); dnxsampler = dynex.DynexSampler(model, mainnet=self.mainnet, logging = self.logging, description='PyTorch DNX Layer'); sampleset = dnxsampler.sample(num_reads=self.num_reads, annealing_time = self.annealing_time, clones = self.clones, debugging=self.debugging, minimum_stepsize = self.minimum_stepsize); if len(sampleset)>0: weights = np.array(list(sampleset.first.sample.values())); energy = sampleset.first.energy; num_units = self.rbm.num_visible + self.rbm.num_hidden visible = np.array([weights[:self.rbm.num_visible]]); hidden = np.array([weights[self.rbm.num_visible:num_units]]); self.rbm.logger.info(f"Dynex Platform: sampled response: energy = {energy}") else: # an error occured: visible = self.biases_visible; hidden = self.biases_hidden; self.rbm.logger.info(f"Dynex Platform: invalid sample returned") return visible, hidden
[docs] def sample(self, visible, **kwargs): """ Implements sampling for the Dynex Neuromorphic Platform. Parameters ---------- visible : numpy.ndarray The visible layer values are always passed from the RBM. When sampling from the model distribution, it is only used to determine the shape of the sample. Returns ------- visible, prob_visible, hidden, prob_hidden : tuple visible and hidden are the samples returned by the Dynex sampler. prob_visible and prob_hidden are calculated based on the Dynex sample values. """ qubo = self.rbm.to_qubo_matrix() num_reads = self.num_reads; visible, hidden = self._sample_dynex(qubo, num_reads) _, prob_hidden = self.infer(visible) _, prob_visible = self.generate(hidden) if self.num_gibbs_updates > 0: visible, prob_visible, hidden, prob_hidden = self.gibbs_updates(visible) return visible, prob_visible, hidden, prob_hidden
[docs]class DWaveSampler(Sampler): """Implements sampling for the D-Wave machine.""" def __init__(self, dwave_sampler, num_spin_reversal_transforms, temp=1.0, num_gibbs_updates=0, chain_strength=None, dwave_params=None, **kwargs): """ Initialize the sampler. Parameters ---------- dwave_sampler : dwave.system.samplers.DWaveSampler The D-Wave machine to use. For AWS this would be a braket.ocean_plugin.BraketDWaveSampler or, more likely dwave.system.FixedEmbeddingComposite. The latter allows to fix the embedding, saving a lot of work for each new sample generation, especially for larger RBMs. num_spin_reversal_transforms : int Number of spin-reversal transforms to apply. This mitigates some DWave sampling biases. temp : float, optional Temperature for the sampling. The QUBO matrix is divided by this value before passed to the DWave sampler. Higher temperature lead to more uniform sampling. num_gibbs_updates : int Number of Gibbs updates to perform on top of the samples of the visual layer from D-Wave. chain_strength : float, optional The chain strength for D-Wave sampling. If None, the default value (method) for D-Wave is used. dwave_params : dict, optional Additional parameters to pass to the D-Wave sampler. """ super().__init__(num_gibbs_updates=num_gibbs_updates, **kwargs) self.dwave_params = {} if dwave_params is None else dwave_params self.sampler = dwave_sampler self.num_spin_reversal_transforms = num_spin_reversal_transforms self.temp = temp self.chain_strength = chain_strength def _sample_dwave(self, qubo, num_particles, cached_response=None): if cached_response: with open(cached_response, "rb") as file: response = pickle.load(file) self.rbm.logger.info(f"Loaded cached response from: {cached_response}") else: response = self.sampler.sample_qubo( qubo, num_reads=num_particles, auto_scale=False, num_spin_reversal_transforms=self.num_spin_reversal_transforms, chain_strength=self.chain_strength, **self.dwave_params, ) name = response.info["taskMetadata"]["id"].rsplit("/")[-1] response_path = Path(f"responses/{name}.response") response_path.parent.mkdir(exist_ok=True) with open(response_path, "wb") as file: pickle.dump(response, file) cache_hash = md5(str((qubo.tobytes(), num_particles)).encode("utf-8")).hexdigest() with open(f"responses/{cache_hash}", "w") as file: file.write(str(response_path)) self.rbm.logger.info(f"Loaded D-Wave response: {name}") self.rbm.logger.info(f"Cached the response: {cache_hash}") df = response.to_pandas_dataframe() df = df.loc[df.index.repeat(df["num_occurrences"]),:] num_units = self.rbm.num_visible + self.rbm.num_hidden visible = df.iloc[:,:self.rbm.num_visible].values hidden = df.iloc[:,self.rbm.num_visible:num_units].values return visible, hidden
[docs] def sample(self, visible, **kwargs): """ Implements sampling for the D-Wave machine. Parameters ---------- visible : numpy.ndarray The visible layer values are always passed from the RBM. When sampling from the model distribution, it is only used to determine the shape of the sample. Returns ------- visible, prob_visible, hidden, prob_hidden : tuple visible and hidden are the samples returned by the DWave sampler. prob_visible and prob_hidden are calculated based on the DWave sample values. """ qubo = self.rbm.to_qubo_matrix() / self.temp num_reads = visible.shape[0] visible, hidden = self._sample_dwave(qubo, num_reads) _, prob_hidden = self.infer(visible) _, prob_visible = self.generate(hidden) if self.num_gibbs_updates > 0: visible, prob_visible, hidden, prob_hidden = self.gibbs_updates(visible) return visible, prob_visible, hidden, prob_hidden
[docs]class DWaveInferenceSampler(DWaveSampler): """Extends DWaveSampler with inference / prediction."""
[docs] def predict(self, features: npt.NDArray, num_particles: int = 100, num_gibbs_updates: int = None, use_cache = False, **kwargs) -> npt.NDArray: """ Predict the labels for the given feature values. Parameters ---------- features : numpy.ndarray[np.int0] The feature values to predict the labels for. Shape (num_samples, num_features). num_particles : int Number of particles to use for the sampling, i.e. how may times to run the label sampling process for each sample. num_gibbs_updates : int, optional Number of Gibbs updates to perform for each particle. If not provided, self.num_gibbs_updates will be used. use_cache : bool If set to True, and a cached response is available, it will be used. Returns ------- labels : numpy.ndarray[np.float] The predicted labels. Shape (num_samples, num_label_classes). """ num_samples = features.shape[0] num_features = features.shape[1] num_labels = self.rbm.num_visible - num_features label_predictions = np.zeros((num_samples, num_labels)) if num_gibbs_updates is None: num_gibbs_updates = self.num_gibbs_updates for sample in range(num_samples): qubo = self.rbm.to_qubo_matrix(features[sample]) / self.temp cache_hash = md5(str((qubo.tobytes(), num_particles)).encode("utf-8")).hexdigest() cache_file = Path(f"responses/{cache_hash}") cached_response = None if use_cache and cache_file.exists(): with open(cache_file, "r") as file: cached_response = file.read() visible, _ = self._sample_dwave(qubo, num_particles, cached_response) for _ in range(num_gibbs_updates): visible[:,:num_features] = features[sample] hidden, _ = self.infer(visible) visible, visible_prob = self.generate(hidden) label_predictions[sample] = visible[:,-num_labels:].mean(axis=0) return label_predictions