Source code for DynexQRBM.QRBM

import copy
import operator
import random
from tqdm import tqdm
import sys
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt

# for sampler:
#from pyqubo import Binary
from dimod import BinaryQuadraticModel, SimulatedAnnealingSampler, BINARY
import dynex

[docs]def sigmoid(x): result = 1 / (1 + np.exp(-x)) return result
[docs]class DYNEX_QRBM: def __init__(self, n_visible, n_hidden, num_reads = 1000, annealing_time = 300, clones = 1, mainnet=False, minimum_stepsize = 0.00000006, logging=False, debugging=False, rnd = None, seed = None, description = 'Parallel QRBM'): self.n_visible = n_visible self.n_hidden = n_hidden self.num_reads = num_reads self.annealing_time = annealing_time self.clones = clones self.mainnet = mainnet self.minimum_stepsize = minimum_stepsize self.logging = logging self.debugging = debugging if seed != None: np.random.seed(seed); print('SEED SET TO',seed) self.w = (np.random.rand(self.n_visible, self.n_hidden) * 2 - 1) * 1 self.visible_bias = (np.random.rand(self.n_visible) * 2 - 1) * 1 self.hidden_bias = (np.random.rand(self.n_hidden) * 2 - 1) * 1 self.n_epoch = 0 self.mse = []; self.num_features = 0; self.num_labels = 0; if rnd is None: rnd = np.random.RandomState(); self.rnd = rnd; self.description = description;
[docs] def sample_opposite_layer_pyqubo(self, v, layer, weights, opposite_layer): """ Generates Qubo model based on two layers and samples them on the Dynex platform Parameters ---------- v : numpy.ndarray The first layer. Shape (num_features+num_labels). """ # initialize Hamiltonian #H = 0 H_vars = [] bqm = BinaryQuadraticModel.empty(vartype=BINARY) # initialize all variables (one for each node in the opposite layer) for j in range(len(opposite_layer)): #H_vars.append(Binary('DNX'+str(j))) H_vars.append('DNX'+str(j)) for i, bias in enumerate(layer): # filter only chosen nodes in the first layer if not v[i]: continue # add reward to every connection for j, opp_bias in enumerate(opposite_layer): #H += -1 * weights[i][j] * H_vars[j] bqm.add_linear(H_vars[j], -1 * weights[i][j]) for j, opp_bias in enumerate(opposite_layer): #H += -1 * opp_bias * H_vars[j] bqm.add_linear(H_vars[j], -1 * opp_bias) #model = H.compile() #bqm = model.to_bqm() # use the dynex sampler: dnxmodel = dynex.BQM(bqm, logging=self.logging); dnxsampler = dynex.DynexSampler(dnxmodel, logging = self.logging, mainnet = self.mainnet, description = self.description); sampleset = dnxsampler.sample(num_reads = self.num_reads, annealing_time = self.annealing_time, clones = self.clones, debugging = self.debugging, minimum_stepsize = self.minimum_stepsize); solution1 = sampleset.first.sample; solution1_list = [(k[3:], v) for k, v in solution1.items()] solution1_list.sort(key=lambda tup: int(tup[0])) # sorts in place solution1_list_final = [v for (k, v) in solution1_list] return solution1_list_final
[docs] def sample_opposite_layer_pyqubo_batch(self, v_batch, layer, weights, opposite_layer): """ Generates Qubo model based on a batch of layers and samples them on the Dynex platform Parameters ---------- v : numpy.ndarray The first layer. Shape (batch_size, num_features+num_labels). """ # initialize all variables (one for each node per batch in the opposite layer) H_vars = []; for batch in range(0, len(v_batch)): H_vars_item = []; for j in range(len(opposite_layer)): binstring = 'DNX_batch_'+str(batch)+'_'; #H_vars_item.append(Binary(binstring+str(j))); H_vars_item.append(binstring+str(j)); H_vars.append(H_vars_item); # initialize Hamiltonian #H = 0 bqm = BinaryQuadraticModel.empty(vartype=BINARY) for batch in range(0, len(v_batch)): #for batch in tqdm(range(len(v_batch))): v = v_batch[batch]; for i, bias in enumerate(layer): # filter only chosen nodes in the first layer if not v[i]: continue # add reward to every connection for j, opp_bias in enumerate(opposite_layer): #H += -1 * weights[i][j] * H_vars[batch][j] bqm.add_linear(H_vars[batch][j], -1 * weights[i][j]) for j, opp_bias in enumerate(opposite_layer): #H += -1 * opp_bias * H_vars[batch][j] bqm.add_linear(H_vars[batch][j], -1 * opp_bias) #model = H.compile() #bqm = model.to_bqm() # use the dynex sampler: dnxmodel = dynex.BQM(bqm, logging=self.logging); dnxsampler = dynex.DynexSampler(dnxmodel, logging = self.logging, mainnet = self.mainnet, description = self.description); sampleset = dnxsampler.sample(num_reads = self.num_reads, annealing_time = self.annealing_time, clones = self.clones, debugging = self.debugging, minimum_stepsize = self.minimum_stepsize); solution1 = sampleset.first.sample; solutions = []; for batch in range(0, len(v_batch)): filter_string = 'DNX_batch_'+str(batch)+'_'; solution1_batch = {k:v for (k,v) in solution1.items() if filter_string in k}; # filter vars of current batch solution1_list = [(k[len(filter_string):], v) for k, v in solution1_batch.items()] # remove binstring solution1_list.sort(key=lambda tup: int(tup[0])) # sorts in place solution1_list_final = [v for (k, v) in solution1_list] solutions.append(solution1_list_final); return solutions
@staticmethod def _sigmoid(x): return 1 / (1 + np.exp(-x))
[docs] def infer(self, visible: npt.NDArray): """ Infer the hidden layer given the visible layer. Parameters ---------- visible : numpy.ndarray The visible layer. Shape (num_samples, num_visible). Returns ------- hidden : numpy.ndarray Binary hidden layer values sampled from their probability distribution. Shape (num_samples, num_hidden). prob_hidden : numpy.ndarray The probability for each unit of the hidden layer to be 1. Shape (num_samples, num_hidden). """ prob_hidden = self._sigmoid(visible @ self.w + self.hidden_bias) hidden = (prob_hidden > self.rnd.random(prob_hidden.shape)).astype(np.int0) return hidden, prob_hidden
[docs] def generate_visible_layer(self, hidden: npt.NDArray): """ Generate the visible layer given the hidden layer. Parameters ---------- hidden : numpy.ndarray The hidden layer. Shape (num_samples, num_hidden). Returns ------- visible : numpy.ndarray Binary visible layer values sampled from their probability distribution. Shape (num_samples, num_visible). prob_visible : numpy.ndarray The probability for each unit of the visible layer to be 1. Shape (num_samples, num_visible). """ prob_visible = self._sigmoid(hidden @ self.w.T + self.visible_bias) visible = (prob_visible > self.rnd.random(prob_visible.shape)).astype(np.int0) return visible, prob_visible
[docs] def predict(self, features, num_particles = 100, num_gibbs_updates = 10): """ Predict the labels for the given feature values using CD. Parameters ---------- features : numpy.ndarray[np.int0] The feature values to predict the labels for. Shape (num_samples, num_features). num_particles : int Number of particles to use for the sampling, i.e. how may times to run the label sampling process for each sample. num_gibbs_updates : int, optional Number of Gibbs updates to perform for each particle. If not provided, self.num_gibbs_updates will be used. Returns ------- labels : numpy.ndarray[np.float] The predicted labels. Shape (num_samples, num_label_classes). features: numpy.ndarray[np.float] The reconstructed features. Shape (num_samples, num_features). """ num_samples = features.shape[0]; num_features = features.shape[1]; num_labels = self.num_labels; label_predictions = np.zeros((num_samples, num_labels)) features_predictions = np.zeros((num_samples, num_features)) for _ in range(num_particles): output = np.zeros(label_predictions.shape) for _ in range(num_gibbs_updates): visible = np.hstack((features, output)) hidden, _ = self.infer(visible) visible, visible_prob = self.generate_visible_layer(hidden) output_prob = visible_prob[:,-num_labels:] output = visible[:,-num_labels:] if num_gibbs_updates > 0: label_predictions += output_prob / num_particles features_predictions += visible_prob[:,:num_features] / num_particles return label_predictions, features_predictions
[docs] def generate(self, v): """Generates data and labels based on features ONLY (no labels to be submitted, these are generated) This function invokes two sampling processes on the Dynex platform """ # add v values for labels, these are not chosen in layer: v = np.hstack((v,np.zeros(self.num_features))); sample_h = self.sample_opposite_layer_pyqubo(v, self.visible_bias, self.w, self.hidden_bias, ) sample_output = self.sample_opposite_layer_pyqubo(sample_h, self.hidden_bias, self.w.T, self.visible_bias, ) gen_data = sample_output[:self.num_features]; gen_labels = np.array([]); if self.num_labels > 0: gen_labels = sample_output[self.num_features:] return gen_data, gen_labels
[docs] def get_weights(self): return self.w, \ self.visible_bias, \ self.hidden_bias
[docs] def set_weights(self, w, visible_bias, hidden_bias): self.w = w self.visible_bias = visible_bias self.hidden_bias = hidden_bias
[docs] def train(self, training_data, training_labels = None, epochs=50, lr=0.1, lr_decay=0.1, epoch_drop = None, momentum = 0, batch_size = None, autosave = False, stop_at_mse = 0.0, plot_reconstructed_images = False, mse_every_epochs = 1, early_stop_after = 1e99 ): """ maximize the product of probabilities assigned to some training set V optimize the weight vector single-step contrastive divergence (CD-1): 1. Take a training sample v, compute the probabilities of the hidden units and sample a hidden activation vector h from this probability distribution. 2. Compute the outer product of v and h and call this the positive gradient. 3. From h, sample a reconstruction v' of the visible units, then resample the hidden activations h' from this. (Gibbs sampling step) 4. Compute the outer product of v' and h' and call this the negative gradient. 5. Let the update to the weight matrix W be the positive gradient minus the negative gradient, times some learning rate 6. Update the biases a and b analogously: a=epsilon (v-v'), b=epsilon (h-h') https://en.wikipedia.org/wiki/Restricted_Boltzmann_machine """ # labels provided? self.num_features = training_data.shape[1]; if type(training_labels) != type(None): self.num_labels = training_labels.shape[1]; if epoch_drop == None: epoch_drop = epochs / 5 # initial momentum velocity value momentum_w = np.zeros((len(self.visible_bias), len(self.hidden_bias))) momentum_v = np.zeros(len(self.visible_bias)) momentum_h = np.zeros(len(self.hidden_bias)) best_mse = 1e10; mse = None; last_mse = None; early_stop_cnt = 0; pbar = tqdm(range(epochs)); for epoch in pbar: # single step # 1 # 1.1 Take a training sample v (if labels provided, add them to v) random_selected_training_data_idx = 0, #epoch % len(training_data); # MULTI PROCESSING TAKES ALWAYS THE SAME idx FOR COMPARISON! if self.num_labels == 0: v = training_data[random_selected_training_data_idx]; else: v = np.hstack((training_data[random_selected_training_data_idx], training_labels[random_selected_training_data_idx])) old_v = v; """ if batch_size is not None: if epoch % batch_size != 0: old_v = v_prim; print('old_v set to v_prim') """ # Sample every item in batch: for i in range(0, len(training_data)): #print('DEBUG:',i,'/',len(training_data)); if self.num_labels == 0: v = training_data[i]; else: v = np.hstack((training_data[i], training_labels[i])) # sample: pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"["+str(i+1)+"/"+str(len(training_data))+"][1/3] sampling probabilities of hidden units"}); h = self.sample_opposite_layer_pyqubo(v, self.visible_bias, self.w, self.hidden_bias); pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"["+str(i+1)+"/"+str(len(training_data))+"][2/3] sampling reconstruction v' from visible units"}); v_prim = self.sample_opposite_layer_pyqubo(h, self.hidden_bias, self.w.T, self.visible_bias); pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"["+str(i+1)+"/"+str(len(training_data))+"][3/3] resampling hidden activations h' from v'"}); h_prim = self.sample_opposite_layer_pyqubo(v_prim, self.visible_bias, self.w, self.hidden_bias) # update weights: momentum_w = np.zeros((len(self.visible_bias), len(self.hidden_bias))) momentum_v = np.zeros(len(self.visible_bias)) momentum_h = np.zeros(len(self.hidden_bias)) pos_grad = np.outer(v, h); neg_grad = np.outer(v_prim, h_prim); momentum_w += momentum * momentum_w + lr * (pos_grad - neg_grad) momentum_v += momentum * momentum_v + lr * (np.array(v) - np.array(v_prim)) momentum_h += momentum * momentum_h + lr * (np.array(h) - np.array(h_prim)) self.w += momentum_w; self.visible_bias += momentum_v; self.hidden_bias += momentum_h; """ # Parallel sampling of one entire batch: v_tmp = []; for batch in range(0,batch_size): v_tmp.append(np.hstack((training_data[batch], training_labels[batch]))) pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"[1/3] sampling probabilities of hidden units"}); h_tmp = self.sample_opposite_layer_pyqubo_batch(v_tmp, self.visible_bias, self.w, self.hidden_bias) pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"[2/3] sampling reconstruction v' from visible units"}); v_prim_tmp = self.sample_opposite_layer_pyqubo_batch(h_tmp, self.hidden_bias, self.w.T, self.visible_bias) pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"[3/3] resampling hidden activations h' from v'"}); h_prim_tmp = self.sample_opposite_layer_pyqubo_batch(v_prim_tmp, self.visible_bias, self.w, self.hidden_bias) """ """ # testing parallel processing, but sampling step by step: v_tmp = []; h_tmp = []; v_prim_tmp = []; h_prim_tmp = []; for batch in tqdm(range(batch_size), leave=False): v_ = np.hstack((training_data[batch], training_labels[batch])); h_ = self.sample_opposite_layer_pyqubo(v_, self.visible_bias, self.w, self.hidden_bias) v_prim_ = self.sample_opposite_layer_pyqubo(h_, self.hidden_bias, self.w.T, self.visible_bias) h_prim_ = self.sample_opposite_layer_pyqubo(v_prim_, self.visible_bias, self.w, self.hidden_bias) v_tmp.append(v_); h_tmp.append(h_); v_prim_tmp.append(v_prim_); h_prim_tmp.append(h_prim_); """ """ # weights and biases are updated with the average of the entire batch in one step: pbar.set_postfix({'MSE': mse, 'MSE(best)':best_mse,'INFO':"updating weights and biases"}); momentum_w = np.zeros((len(self.visible_bias), len(self.hidden_bias))) momentum_v = np.zeros(len(self.visible_bias)) momentum_h = np.zeros(len(self.hidden_bias)) for i in range(0,batch_size): pos_grad = np.outer(v_tmp[i], h_tmp[i]); neg_grad = np.outer(v_prim_tmp[i], h_prim_tmp[i]); momentum_w += momentum * momentum_w + lr * (pos_grad - neg_grad) momentum_v += momentum * momentum_v + lr * (np.array(v_tmp[i]) - np.array(v_prim_tmp[i])) momentum_h += momentum * momentum_h + lr * (np.array(h_tmp[i]) - np.array(h_prim_tmp[i])) self.w += momentum_w / batch_size; #average of batch self.visible_bias += momentum_v / batch_size; #average of batch self.hidden_bias += momentum_h / batch_size; #average of batch """ """ # # 1.2 compute the probabilities of the hidden units # persistent CD takes v from previous iterations infomsg = 'sampling probabilities of hidden units'; msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); h = self.sample_opposite_layer_pyqubo(old_v, self.visible_bias, self.w, self.hidden_bias, ) # 2 Compute the outer product of v and h and call this the positive gradient. pos_grad = np.outer(v, h) # 3 # 3.1 From h, sample a reconstruction v' of the visible units, infomsg = "sampling reconstruction v' from visible units"; msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); v_prim = self.sample_opposite_layer_pyqubo(h, self.hidden_bias, self.w.T, self.visible_bias, ) # 3.2 then resample the hidden activations h' from this. (Gibbs sampling step) infomsg = "resampling hidden activations h' from v'"; msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); h_prim = self.sample_opposite_layer_pyqubo(v_prim, self.visible_bias, self.w, self.hidden_bias, ) # 4 Compute the outer product of v' and h' and call this the negative gradient. neg_grad = np.outer(v_prim, h_prim) # 5 Let the update to the weight matrix W be the positive gradient minus the negative gradient, # times some learning rate #this is for momentum (default value 0 doesn't change anything) momentum_w = momentum * momentum_w + lr * (pos_grad - neg_grad) self.w += momentum_w # 6 Update the biases a and b analogously: a=epsilon (v-v'), b=epsilon (h-h') # momentum here momentum_v = momentum * momentum_v + lr * (np.array(v) - np.array(v_prim)) momentum_h = momentum * momentum_h + lr * (np.array(h) - np.array(h_prim)) self.visible_bias += momentum_v self.hidden_bias += momentum_h """ if epoch % epoch_drop == (epoch_drop-1): #learning_rate_decay lr *= (1 - lr_decay) infomsg = "learning rate set to "+str(lr); msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); if epoch % mse_every_epochs == 0: # Reconstruct data and calculate MSE: infomsg = "reconstructing data to calculate MSE"; msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); gen_data, gen_labels = self.generate(v[:self.num_features]); if plot_reconstructed_images: plt.figure() plt.axis('off') plt.title("Image reconstructed after training", y=1.03) plt.imshow(np.array(gen_data).reshape(28, -1)) # MSE (based on data): mse = np.sum((np.array(v[:self.num_features]) - np.array(gen_data))**2 ) / (self.num_features); # normalized self.mse.append(mse); # better MSE? if mse < best_mse: best_mse = mse; if autosave: self.save('model_epoch_'+str(epoch)+'_mse_'+str(mse)+'.model'); # Check for early stop: if mse == last_mse: early_stop_cnt *= 1; else: early_stop_cnt = 0; last_mse = mse; if early_stop_cnt > early_stop_after: infomsg = "TRAINING STOPPED WITH EARLY STOP EPOCHS "+str(early_stop_cnt); msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); break; # update progress bar: infomsg=""; msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); # MSE stop? if mse <= stop_at_mse: infomsg = "TRAINING STOPPED WITH MSE "+str(stop_at_mse); msg = {'MSE': mse, 'MSE(best)':best_mse,'INFO':infomsg}; pbar.set_postfix(msg); break; return
[docs] def save(self, filename): """Save the model""" with np.printoptions(threshold=sys.maxsize): parameters = [str(self.n_hidden), str(self.n_visible), np.array_repr(self.visible_bias), np.array_repr(self.hidden_bias), np.array_repr(self.w)] with open(filename, 'w') as file: file.write('#'.join(parameters))
[docs] def load(self, filename): """Load the model""" with open(filename) as file: res = file.read() parameters = res.split('#') self.n_hidden = eval(parameters[0]) self.n_visible = eval(parameters[1]) self.visible_bias = eval('np.'+parameters[2]) self.hidden_bias = eval('np.'+parameters[3]) self.w = eval('np.'+parameters[4])