Source code for dsipts.models.Diffusion

import torch
import torch.nn as nn
import numpy as np
from .tft import sub_nn
from .base import  Base
from typing import List, Union
from ..data_structure.utils import beauty_string
from .utils import  get_scope

[docs] class Diffusion(Base): handle_multivariate = False handle_future_covariates = True handle_categorical_variables = True handle_quantile_loss = False description = get_scope(handle_multivariate,handle_future_covariates,handle_categorical_variables,handle_quantile_loss)
[docs] def __init__(self, d_model: int, out_channels: int, past_steps: int, future_steps: int, past_channels: int, future_channels: int, embs: List[int], learn_var:bool, cosine_alpha: bool, diffusion_steps: int, beta: float, gamma:float, #for subnet n_layers_RNN: int, d_head: int, n_head: int, dropout_rate: float, activation: str, subnet:int, perc_subnet_learning_for_step:float, persistence_weight:float=0.0, loss_type: str='l1', quantiles:List[float]=[], optim:Union[str,None]=None, optim_config:Union[dict,None]=None, scheduler_config:Union[dict,None]=None, **kwargs)->None: """Denoising Diffusion Probabilistic Model Args: d_model (int): out_channels (int): number of target variables past_steps (int): size of past window future_steps (int): size of future window to be predicted past_channels (int): number of variables available for the past context future_channels (int): number of variables known in the future, available for forecasting embs (list[int]): categorical variables dimensions for embeddings learn_var (bool): Flag to make the model train the posterior variance (if True) or use the variance of posterior distribution cosine_alpha (bool): Flag for the generation of alphas and betas diffusion_steps (int): number of noising steps for the initial sample beta (float): starting variable to generate the diffusion perturbations. Ignored if cosine_alpha == True gamma (float): trade_off variable to balance loss over noise prediction and NegativeLikelihood/KL_Divergence. n_layers_RNN (int): param for subnet d_head (int): param for subnet n_head (int): param for subnet dropout_rate (float): param for subnet activation (str): param for subnet subnet (int): =1 for attention subnet, =2 for linear subnet. Others can be added(wait for Black Friday for discounts) perc_subnet_learning_for_step (float): percentage to choose how many subnet has to be trained for every batch. Decrease this value if the loss blows up. persistence_weight (float, optional): Defaults to 0.0. loss_type (str, optional): Defaults to 'l1'. quantiles (List[float], optional): Only [] accepted. Defaults to []. optim (Union[str,None], optional): Defaults to None. optim_config (Union[dict,None], optional): Defaults to None. scheduler_config (Union[dict,None], optional): Defaults to None. """ super().__init__(**kwargs) self.save_hyperparameters(logger=False) self.dropout = dropout_rate self.persistence_weight = persistence_weight self.loss_type = loss_type self.optim = optim self.optim_config = optim_config self.scheduler_config = scheduler_config #* HANDLING LOSSES # With respect to other models, here quantiles are not used # Here we define the loss used for noise predicted and actual noise_loss # Losses for distribution are defined as functions below. # trade off for noise loss and distribution loss self.gamma = gamma ## can not handle quantile if len(quantiles)>0: quantiles = [] assert len(quantiles) ==0 self.mul = 1 self.use_quantiles = False if self.loss_type == 'mse': self.loss = nn.MSELoss() else: self.loss = nn.L1Loss() #* >>>>>>>>>>>>> canonical data parameters # dimension of the model, number of variables and sequence length info self.d_model = d_model self.past_steps = past_steps self.future_steps = future_steps self.past_channels = past_channels self.future_channels = future_channels self.output_channels = out_channels #* >>>>>>>>>>>>> specific model parameters # if we want to learn also the variance, instead of using the standard posterior variance of Diffusion NN self.learn_var = learn_var # number of noising steps self.T = diffusion_steps # distribution weigths to avoid less trained subnet self.multinomial_step_weights = np.ones(diffusion_steps) # % of all subnets trained every batch of every epoch # this percentage is controlled by the parameter 'perc_subnet_learning_for_step': # - decrease or increase according to the efficiency of your machine self.simultaneous_steps = max(int(diffusion_steps*perc_subnet_learning_for_step), 1) #* >>>>>>>>>>>>> specific diffusion setup self.s = (100*self.T)**(-1) # offset variable to avoid problems with computations near 0 # value found by try and error # betas and cumulative products of alphas are the main values for the diffusion model, both in (0,1) at each step t: # - betas_t -> variance added at t-th step # - alphas_t = 1 - betas_t # - alphas_cumprod_t -> accumulation of alphas up to step t. # - - It can be considered as the remaining signal of the starting input at t-th step!! # according to the flag below we can choose how to generate them! if cosine_alpha: # COSINE ALPHA Computation # aux_perc = 0.05 # avoid_comp_err_norm = self.T*(1+aux_perc) # enlarging self.T to avoid errors in computations using cos^2 # the t-th cumulative product of alphas is the 'forgetting' schedule of the inital sample after t diffusion step # in this procedure we use the function below to produce all the cumulative products of alphas f_cos_t = [(np.cos( (t/self.T +self.s)/(1+self.s) * np.pi/2 ))**2 for t in range(self.T)] self.alphas_cumprod = np.append(1-self.s, f_cos_t[1:]/f_cos_t[0]) # computed as scaled cumulative product of alphas f_cos_t[1:]/f_cos_t[0] self.alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1]) # auxiliar vector to use the same index to access alpha_cumprod_t and alpha_cumprod_{t-1} self.alphas = self.alphas_cumprod * (self.alphas_cumprod_prev)**(-1) self.betas = 1 - self.alphas else: # STANDARD ALPHA Computation # beta is considered constant in [0,1) for all time steps. Good values near 0.03 # Unlike before, here we generate all needed values starting from betas self.betas = np.array([beta]*self.T) self.alphas = 1 - self.betas self.alphas_cumprod = np.cumprod(self.alphas) self.alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1]) # auxiliar vector to use the same index to access alpha_cumprod_t and alpha_cumprod_{t-1} # values for posterior distribution, id est the target distribution of each subnet # All these values will be casted to tensors during computations using the function _extract_into_tensor self.posterior_mean_coef1 = self.betas * np.sqrt(self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod) self.posterior_mean_coef2 = (1.0 - self.alphas_cumprod_prev) * np.sqrt(self.alphas) / (1.0 - self.alphas_cumprod) self.posterior_variance = np.append(self.s, self.betas[1:] * (1.0 - self.alphas_cumprod_prev[1:]) / (1.0 - self.alphas_cumprod[1:])) self.posterior_log_variance = np.log(self.posterior_variance) #* >>>>>>>>>>>>> LAYERS # for other numerical variables in the past self.aux_past_channels = past_channels - out_channels self.linear_aux_past = nn.ModuleList([nn.Linear(1, d_model) for _ in range(self.aux_past_channels)]) # for numerical variables in the future self.aux_fut_channels = future_channels self.linear_aux_fut = nn.ModuleList([nn.Linear(1, d_model) for _ in range(self.aux_fut_channels)]) # embedding categorical for both past and future (ASSUMING BOTH AVAILABLE OR NO ONE) self.seq_len = past_steps + future_steps self.emb_cat_var = sub_nn.embedding_cat_variables(self.seq_len, future_steps, d_model, embs, self.device) # diffusion sub nets, one subnet for each step if subnet == 1: self.sub_nets = nn.ModuleList([ SubNet1(self.aux_past_channels, self.aux_fut_channels, learn_var, out_channels, d_model, d_head, n_head, activation, dropout_rate) for _ in range(diffusion_steps) ]) elif subnet == 2: self.sub_nets = nn.ModuleList([ SubNet2(self.aux_past_channels, self.aux_fut_channels, learn_var, past_steps, future_steps, out_channels, d_model, activation, dropout_rate) for _ in range(diffusion_steps) ]) elif subnet ==3 : aux_num_available = self.aux_past_channels>0 or self.aux_fut_channels>0 # if we have numerical vars, use it self.sub_nets = nn.ModuleList([ SubNet3(learn_var, aux_num_available, out_channels, d_model, future_steps, n_layers_RNN, d_head, n_head, dropout_rate) for _ in range(diffusion_steps) ]) else: raise ValueError("Wrong number for Subnet. Not yet implemented!")
[docs] def forward(self, batch:dict)-> float: """training process of the diffusion network Args: batch (dict): variables loaded Returns: float: total loss about the prediction of the noises over all subnets extracted """ # LOADING TARGET VARIABLES y_to_be_pred = batch['y'].to(self.device) # LOADING AUTOREGRESSIVE CONTEXT OF TARGET VARIABLES num_past = batch['x_num_past'].to(self.device) idx_target = batch['idx_target'][0] y_past = num_past[:,:,idx_target] # LOADING EMBEDDING CATEGORICAL VARIABLES emb_cat_past, emb_cat_fut = self.cat_categorical_vars(batch) emb_cat_past = torch.mean(emb_cat_past, dim = 2) emb_cat_fut = torch.mean(emb_cat_fut, dim = 2) ### LOADING PAST AND FUTURE NUMERICAL VARIABLES # load in the model auxiliar numerical variables if self.aux_past_channels>0: # if we have more numerical variables about past aux_num_past = self.remove_var(num_past, idx_target, 2) # remove the autoregressive variable assert self.aux_past_channels == aux_num_past.size(2), beauty_string(f"{self.aux_past_channels} LAYERS FOR PAST VARS AND {aux_num_past.size(2)} VARS",'section',True) # to check if we are using the expected number of variables about past # concat all embedded vars and mean of them aux_emb_num_past = torch.Tensor().to(self.device) for i, layer in enumerate(self.linear_aux_past): aux_emb_past = layer(aux_num_past[:,:,[i]]).unsqueeze(2) aux_emb_num_past = torch.cat((aux_emb_num_past, aux_emb_past), dim=2) aux_emb_num_past = torch.mean(aux_emb_num_past, dim = 2) else: aux_emb_num_past = None # non available vars if self.aux_fut_channels>0: # if we have more numerical variables about future # AUX means AUXILIARY variables aux_num_fut = batch['x_num_future'].to(self.device) assert self.aux_fut_channels == aux_num_fut.size(2), beauty_string(f"{self.aux_fut_channels} LAYERS FOR PAST VARS AND {aux_num_fut.size(2)} VARS",'section',True) # to check if we are using the expected number of variables about fut # concat all embedded vars and mean of them aux_emb_num_fut = torch.Tensor().to(self.device) for j, layer in enumerate(self.linear_aux_fut): aux_emb_fut = layer(aux_num_fut[:,:,[j]]).unsqueeze(2) aux_emb_num_fut = torch.cat((aux_emb_num_fut, aux_emb_fut), dim=2) aux_emb_num_fut = torch.mean(aux_emb_num_fut, dim = 2) else: aux_emb_num_fut = None # non available vars ### actual DIFFUSION process ---------------------------------------------- ##* CHOOSE THE t SUBNET # We have T subnets: [0, 1, ..., T-1]. values = list(range(self.T)) ## Probabilistic way to choose the subnet properly # avoid exploding step_weights going on with trainings self.improving_weight_during_training() # normalizing weights for a more stable subnet training t_wei = self.multinomial_step_weights/np.sum(self.multinomial_step_weights) # extract times t drawn_t = np.random.choice(values, size=self.simultaneous_steps, replace=False, p=t_wei) # type: ignore if 0 not in drawn_t: drawn_t = np.append(drawn_t, 0) # update weights non_draw_val = np.delete(values, drawn_t) # type: ignore self.multinomial_step_weights[non_draw_val] += 1 # increase weights of non-extracted subnet # init negative loss for the first step tot_loss = -1 for t in drawn_t: # LOADING THE SUBNET sub_net = self.sub_nets[t] # Get y and noise it y_noised, true_mean, true_log_var, actual_noise = self.q_sample(y_to_be_pred, t) # compute the output from that network using the sample with noises # output composed of: noise predicted and, if learn_var=True, vector for variances if self.learn_var: #predict the noise! eps_pred, var_aux_out = sub_net(y_noised, y_past, emb_cat_past, emb_cat_fut, aux_emb_num_past, aux_emb_num_fut) # compute posterior variance of NN (using interpolation) pre_var_t = self._extract_into_tensor(self.betas, t, eps_pred.shape) post_var_t = self._extract_into_tensor(self.posterior_variance, t, eps_pred.shape) post_sigma = torch.exp( var_aux_out*torch.log(pre_var_t) + (1-var_aux_out)*torch.log(post_var_t) ) # variance, not log_var else: eps_pred = sub_net(y_noised, y_past, emb_cat_past, emb_cat_fut, aux_emb_num_past, aux_emb_num_fut) post_sigma = self._extract_into_tensor(self.posterior_variance, t, eps_pred.shape) # posterior mean assuming the predicted noise is the actual one out_mean = self._extract_into_tensor(np.sqrt(1/self.alphas), t, eps_pred.shape) * ( y_noised - self._extract_into_tensor(self.betas/np.sqrt(1-self.alphas_cumprod), t, eps_pred.shape) * eps_pred ) # # At the first timestep return the negative likelihood, if t==0: # post_var = self._extract_into_tensor(self.posterior_variance, t, y_to_be_pred.shape) neg_likelihoods = -torch.log(self.gaussian_likelihood(y_to_be_pred, out_mean, post_sigma)) #! (values to be predicted, mean of values predicted, variance) distribution_loss = torch.mean(neg_likelihoods) # # otherwise return KL( q(x_{t-1}|x_t, x_0) || p(x_{t-1}|x_t) ) else: # COMPUTE LOSS between TRUE eps and DRAWN eps_pred kl_divergence = self.normal_kl(true_mean, true_log_var, out_mean, torch.log(post_sigma)) # (true mean, true log var, mean of values predicted, log var predicted) distribution_loss = torch.mean(kl_divergence) # always compute the loss about the straight prediction of the noise noise_loss = self.loss(eps_pred, actual_noise) # if tot_loss == -1: # beauty_string(f'NOISE LOSS: {noise_loss.item()}','info',True) # beauty_string(f'ACTUAL NOISE: {actual_noise[0].min()}, {actual_noise[0].max()}, {actual_noise[0].mean()}, {actual_noise[0].var()}','info',True) # beauty_string(f'PREDICTED NOISE: {eps_pred[0].min()}, {eps_pred[0].max()}, {eps_pred[0].mean()}, {eps_pred[0].var()}','info',True) noise_loss += self.gamma*distribution_loss # add, scaled according to gamma, the distribution_loss # update the total loss if tot_loss==-1: tot_loss = noise_loss else: tot_loss += noise_loss return tot_loss
def training_step(self, batch, batch_idx): # the training loss is already computed in the forward method loss_eps = self(batch) return loss_eps
[docs] def inference(self, batch:dict) -> torch.Tensor: """Inference process to forecast future y Args: batch (dict): Keys checked ['x_num_past, 'idx_target', 'x_num_future', 'x_cat_past', 'x_cat_future'] Returns: torch.Tensor: generated sequence [batch_size, future_steps, num_var] """ # LOADING AUTOREGRESSIVE CONTEXT OF TARGET VARIABLES num_past = batch['x_num_past'].to(self.device) batch_size = num_past.shape[0] idx_target = batch['idx_target'][0] y_past = num_past[:,:,idx_target] # LOADING EMBEDDING CATEGORICAL VARIABLES emb_cat_past, emb_cat_fut = self.cat_categorical_vars(batch) emb_cat_past = torch.mean(emb_cat_past, dim = 2) emb_cat_fut = torch.mean(emb_cat_fut, dim = 2) ### LOADING PAST AND FUTURE NUMERICAL VARIABLES # this check is done simultaneously # because in the model we use auxiliar numerical variables # only if we have both them in the past and in the future ### LOADING PAST AND FUTURE NUMERICAL VARIABLES # load in the model auxiliar numerical variables if self.aux_past_channels>0: # if we have more numerical variables about past aux_num_past = self.remove_var(num_past, idx_target, 2) # remove the autoregressive variable assert self.aux_past_channels == aux_num_past.size(2), beauty_string(f"{self.aux_past_channels} LAYERS FOR PAST VARS AND {aux_num_past.size(2)} VARS",'section',True) # to check if we are using the expected number of variables about past # concat all embedded vars and mean of them aux_emb_num_past = torch.Tensor().to(self.device) for i, layer in enumerate(self.linear_aux_past): aux_emb_past = layer(aux_num_past[:,:,[i]]).unsqueeze(2) aux_emb_num_past = torch.cat((aux_emb_num_past, aux_emb_past), dim=2) aux_emb_num_past = torch.mean(aux_emb_num_past, dim = 2) else: aux_emb_num_past = None # non available vars if self.aux_fut_channels>0: # if we have more numerical variables about future # AUX means AUXILIARY variables aux_num_fut = batch['x_num_future'].to(self.device) assert self.aux_fut_channels == aux_num_fut.size(2), beauty_string(f"{self.aux_fut_channels} LAYERS FOR PAST VARS AND {aux_num_fut.size(2)} VARS",'section',True) # to check if we are using the expected number of variables about fut # concat all embedded vars and mean of them aux_emb_num_fut = torch.Tensor().to(self.device) for j, layer in enumerate(self.linear_aux_fut): aux_emb_fut = layer(aux_num_fut[:,:,[j]]).unsqueeze(2) aux_emb_num_fut = torch.cat((aux_emb_num_fut, aux_emb_fut), dim=2) aux_emb_num_fut = torch.mean(aux_emb_num_fut, dim = 2) else: aux_emb_num_fut = None # non available vars # DIFFUSION INFERENCE # import pdb; pdb.set_trace() # can use also torch.normal(0, 1, size=y_noised.shape) y_noised = torch.randn((batch_size, self.future_steps, self.output_channels)).to(self.device) # pass the white noise in sub nets for t in range(self.T-1, -1, -1): # INVERSE cycle over all subnets, but not the last one sub_net = self.sub_nets[t] # load the subnet ## CHECK THE NUMBER OF PARAMS # model_parameters = filter(lambda p: p.requires_grad, model.parameters()) # params = sum([np.prod(p.size()) for p in model_parameters]) -> 13K if self.learn_var: eps_pred, var_aux_out = sub_net(y_noised, y_past, emb_cat_past, emb_cat_fut, aux_emb_num_past, aux_emb_num_fut) # interpolazion of variance pre_var_t = self._extract_into_tensor(self.betas, t, eps_pred.shape) post_var_t = self._extract_into_tensor(self.posterior_variance, t, eps_pred.shape) post_sigma = torch.exp(var_aux_out*torch.log(pre_var_t) + (1-var_aux_out)*torch.log(post_var_t)) else: eps_pred = sub_net(y_noised, y_past, emb_cat_past, emb_cat_fut, aux_emb_num_past, aux_emb_num_fut) post_sigma = self._extract_into_tensor(self.posterior_variance, t, eps_pred.shape) # Sample x_{t-1} from the model at the given timestep. # y_noised = self._extract_into_tensor(1/np.sqrt(self.alphas), t, y_noised.shape)*( y_noised - self._extract_into_tensor(np.sqrt(self.betas), t, eps_pred.shape)*eps_pred ) y_noised = 1/torch.sqrt(1-post_sigma)*(y_noised - torch.sqrt(post_sigma)*eps_pred) # if t>0 : # noise = torch.rand_like(y_noised).to(self.device) # y_noised = y_noised + torch.sqrt(post_sigma)*noise out = y_noised.view(-1, self.future_steps, self.output_channels, 1) return out
# for validation extract the output from the self.inference method def validation_step(self, batch, batch_idx): out = self.inference(batch) loss = self.compute_loss(batch,out) return loss # function to concat embedded categorical variables
[docs] def cat_categorical_vars(self, batch:dict): """Extracting categorical context about past and future Args: batch (dict): Keys checked -> ['x_cat_past', 'x_cat_future'] Returns: List[torch.Tensor, torch.Tensor]: cat_emb_past, cat_emb_fut """ # GET AVAILABLE CATEGORICAL CONTEXT if 'x_cat_past' in batch.keys() and 'x_cat_future' in batch.keys(): # if we have both # HERE WE ASSUME SAME NUMBER AND KIND OF VARIABLES IN PAST AND FUTURE cat_past = batch['x_cat_past'].to(self.device) cat_fut = batch['x_cat_future'].to(self.device) cat_full = torch.cat((cat_past, cat_fut), dim = 1) # EMB CATEGORICAL VARIABLES AND THEN SPLIT IN PAST AND FUTURE emb_cat_full = self.emb_cat_var(cat_full,self.device) else: emb_cat_full = self.emb_cat_var(batch['x_num_past'].shape[0],self.device) # CONCAT THEM, according to self.emb_cat_var usage cat_full = torch.cat((cat_past, cat_fut), dim = 1) # actual embedding emb_cat_full = self.emb_cat_var(cat_full,self.device) # split past and future categorical embedded variables cat_emb_past = emb_cat_full[:,:self.past_steps,:,:] cat_emb_fut = emb_cat_full[:,-self.future_steps:,:,:] return cat_emb_past, cat_emb_fut
#function to extract from batch['x_num_past'] all variables except the one autoregressive
[docs] def remove_var(self, tensor: torch.Tensor, indexes_to_exclude: list, dimension: int)-> torch.Tensor: """Function to remove variables from tensors in chosen dimension and position Args: tensor (torch.Tensor): starting tensor indexes_to_exclude (list): index of the chosen dimension we want t oexclude dimension (int): dimension of the tensor on which we want to work (not list od dims!!) Returns: torch.Tensor: new tensor without the chosen variables """ remaining_idx = torch.tensor([i for i in range(tensor.size(dimension)) if i not in indexes_to_exclude]).to(tensor.device) # Select the desired sub-tensor extracted_subtensors = torch.index_select(tensor, dim=dimension, index=remaining_idx) return extracted_subtensors
[docs] def improving_weight_during_training(self): """ Each time we sample from multinomial we subtract the minimum for more precise sampling, avoiding great learning differences among subnets This lead to more stable inference also in early training, mainly for common context embedding. For probabilistic reason, weights has to be >0, so we subtract min-1 """ self.multinomial_step_weights -= (self.multinomial_step_weights.min()-1) return
### >>>>>>>>>>>>> AUXILIARY MODEL FUNCS
[docs] def q_sample(self, x_start: torch.Tensor, t: int)-> List[torch.Tensor]: """Diffuse x_start for t diffusion steps. In other words, sample from q(x_t | x_0). Also, compute the mean and variance of the diffusion posterior: q(x_{t-1} | x_t, x_0) Posterior mean and variance are the ones to be predicted Args: x_start (torch.Tensor): values to be predicted t (int): diffusion step Returns: List[torch.Tensor, torch.Tensor, torch.Tensor]: q_sample, posterior mean, posterior log variance and the actual noise """ # noise from normal distribution noise = torch.randn_like(x_start) # direct diffusion at t-th step q_sample = self._extract_into_tensor(np.sqrt(self.alphas_cumprod), t, x_start.shape) * x_start + self._extract_into_tensor(np.sqrt(1 - self.alphas_cumprod), t, x_start.shape) * noise # compute meean and variance q_mean = self._extract_into_tensor(self.posterior_mean_coef1, t, q_sample.shape) * x_start + self._extract_into_tensor(self.posterior_mean_coef2, t, q_sample.shape) * q_sample q_log_var = self._extract_into_tensor( self.posterior_log_variance, t, q_sample.shape ) # return, the sample, its posterior mean and log_variance, the noise used return [q_sample, q_mean, q_log_var, noise]
[docs] def normal_kl(self, mean1, logvar1, mean2, logvar2): """ Compute the KL divergence between two gaussians. Also called relative entropy. KL divergence of P from Q is the expected excess surprise from using Q as a model when the actual distribution is P. KL(P||Q) = P*log(P/Q) or -P*log(Q/P) # In the context of machine learning, KL(P||Q) is often called the 'information gain' # achieved if P would be used instead of Q which is currently used. Shapes are automatically broadcasted, so batches can be compared to scalars, among other use cases. """ # -1/2 + log(sigma2/sigma1) + sigma1^2/2sigma2^2 + (mu1-mu2)^2/2sigma2^2 return 0.5 * ( -1.0 + logvar2 - logvar1 + torch.exp(logvar1 - logvar2) + ((mean1 - mean2) ** 2) * torch.exp(-logvar2) )
[docs] def gaussian_likelihood(self, x, mean, var): term1 = 1.0 / torch.sqrt(2 * np.pi * var) term2 = torch.exp(-0.5 * ((x - mean)**2 / var)) likelihood = term1 * term2 return likelihood
[docs] def gaussian_log_likelihood(self, x, mean, var): term1 = -0.5 * ((x - mean) / torch.sqrt(var))**2 term2 = -0.5 * torch.log(2 * torch.tensor(np.pi) * var) log_likelihood = term1 + term2 return log_likelihood
def _extract_into_tensor(self, arr, timesteps, broadcast_shape): """ Extract values from a 1-D numpy array for a batch of indices. :param arr: the 1-D numpy array. :param timesteps: a tensor of indices into the array to extract. :param broadcast_shape: a larger shape of K dimensions with the batch dimension equal to the length of timesteps. :return: a tensor of shape 'broadcast_shape' where the shape has K dims. """ ten = torch.tensor(arr[timesteps]) return ten.expand(broadcast_shape).to(self.device)
### >>>>>>>>>>>>> SUB NET
[docs] class SubNet1(nn.Module):
[docs] def __init__(self, aux_past_ch, aux_fut_ch, learn_var:bool, output_channel:int, d_model:int, d_head:int, n_head:int, activation:str, dropout_rate:float) -> None: """ -> SUBNET of the DIFFUSION MODEL (DDPM) It starts with an autoregressive LSTM Network computation of epsilon, then subtracted to 'y_noised' tensor. This is always possible! Now we have an approximation of our 'eps_hat', that at the end will pass in a residual connection with its embedded version 'emb_eps_hat'. 'emb_eps_hat' will be update with respect to available info about categorical values of our serie: Through an ATTENTION Network we compare past categorical with future categorical to update the embedded noise predicted. Also, if we have values about auxiliary numerical variables both in past and future, the changes of these variables will be fetched by another ATTENTION Network. The goal is ensure valuable computations for 'eps' always, and then updating things if we have enough data. Both attentions uses { Q = *_future, K = *_past, V = y_past } using as much as possible context variables for better updates. Args: learn_var (bool): set if the network has to learn the optim variance of each step output_channel (int): number of variables to be predicted future_steps (int): number of step in the future, so the number of timesstep to be predicted d_model (int): hidden dimension of the model num_layers_RNN (int): number of layers for autoregressive prediction d_head (int): number of heads for Attention Networks n_head (int): hidden dimension of heads for Attention Networks dropout_rate (float): """ super().__init__() self.aux_past_channels = aux_past_ch self.aux_fut_channels = aux_fut_ch self.learn_var = learn_var activation_fun = eval(activation) self.y_noised_linear = nn.Linear(output_channel, d_model) self.y_past_linear = nn.Linear(output_channel, d_model) self.past_sequential = nn.Sequential( nn.Linear(d_model*3, d_model*2) if self.aux_past_channels>0 else nn.Linear(d_model*2, d_model*2), activation_fun(), nn.Linear(d_model*2, d_model) ) self.fut_sequential = nn.Sequential( nn.Linear(d_model*3, d_model*2) if self.aux_fut_channels>0 else nn.Linear(d_model*2, d_model*2), activation_fun(), nn.Linear(d_model*2, d_model) ) self.y_sequential = nn.Sequential( nn.Linear(d_model*2, d_model), activation_fun(), nn.Linear(d_model, d_model) ) self.attention = sub_nn.InterpretableMultiHead(d_model, d_head, n_head) # if learn_var == True, we want to predict an additional variable for he variance # just an intermediate dimension for linears hidden_size = int(d_model/3) self.eps_out_sequential = nn.Sequential( nn.Linear(d_model, hidden_size), activation_fun(), nn.Linear(hidden_size, output_channel) ) self.var_out_sequential = nn.Sequential( nn.Linear(output_channel, hidden_size), nn.Linear(hidden_size, d_model), activation_fun(), nn.Linear(d_model, d_model), activation_fun(), nn.Linear(d_model, hidden_size), nn.Linear(hidden_size, output_channel) )
[docs] def forward(self, y_noised:torch.Tensor, y_past:torch.Tensor, cat_past:torch.Tensor, cat_fut:torch.Tensor, num_past:Union[torch.Tensor,None] = None, num_fut:Union[torch.Tensor,None] = None): """'DIFFUSION SUBNET Args: y_noised (torch.Tensor): [B, future_step, num_var] y_past (torch.Tensor): [B, past_step, num_var] cat_past (torch.Tensor, optional): [B, past_step, d_model]. Defaults to None. cat_fut (torch.Tensor, optional): [B, future_step, d_model]. Defaults to None. num_past (torch.Tensor, optional): [B, past_step, d_model]. Defaults to None. num_fut (torch.Tensor, optional): [B, future_step, d_model]. Defaults to None. Returns: torch.Tensor: predicted noise [B, future_step, num_var]. According to 'learn_var' param in initialization, the subnet returns another tensor of same size about the variance """ emb_y_noised = self.y_noised_linear(y_noised.float()) emb_y_past = self.y_past_linear(y_past) # LIN FOR PAST past = [emb_y_past, cat_past] if self.aux_past_channels>0: past.append(num_past) past_seq_input = torch.cat(past, dim=2) # type: ignore past_seq = self.past_sequential(past_seq_input) # -> [B, future_step, d_model] # LIN FOR FUT fut = [emb_y_noised, cat_fut] if self.aux_fut_channels>0: fut.append(num_fut) fut_seq_input = torch.cat(fut, dim=2) # type: ignore fut_seq = self.fut_sequential(fut_seq_input) # -> [B, future_step, d_model] # ATTENTION attention = self.attention(fut_seq, past_seq, emb_y_past) # OUTPUT eps_out = self.eps_out_sequential(attention) # if LEARN_VAR if self.learn_var: var_out = eps_out.detach() var_out = self.var_out_sequential(var_out) return eps_out, var_out return eps_out
[docs] class SubNet2(nn.Module):
[docs] def __init__(self, aux_past_ch, aux_fut_ch, learn_var:bool, past_steps, future_steps, output_channel:int, d_model:int, activation:str, dropout_rate:float): super().__init__() self.aux_past_channels = aux_past_ch self.aux_fut_channels = aux_fut_ch self.learn_var = learn_var # in_size changes wrt numerical vars in_size = ( past_steps*(2+bool(aux_past_ch)) + future_steps*(2 + bool(aux_fut_ch)) ) * d_model out_size = output_channel * future_steps activation_fun = eval(activation) self.y_noised_linear = nn.Linear(output_channel, d_model) self.y_past_linear = nn.Linear(output_channel, d_model) hidden_size = int( (output_channel + d_model)/2 ) self.eps_out_sequential = nn.Sequential( nn.Linear(in_size, hidden_size), nn.Dropout(dropout_rate), nn.Linear(hidden_size, d_model), activation_fun(), nn.Dropout(dropout_rate), nn.Linear(d_model, hidden_size), activation_fun(), nn.Dropout(dropout_rate), nn.Linear(hidden_size, out_size) ) self.var_out_sequential = nn.Sequential( nn.Linear(in_size, hidden_size), nn.Dropout(dropout_rate), nn.Linear(hidden_size, hidden_size), activation_fun(), nn.Dropout(dropout_rate), nn.Linear(hidden_size, out_size) )
[docs] def forward(self, y_noised:torch.Tensor, y_past:torch.Tensor, cat_past:torch.Tensor, cat_fut:torch.Tensor, num_past:Union[torch.Tensor,None] = None, num_fut:Union[torch.Tensor,None] = None): B, fut_step, n_var = y_noised.shape emb_y_noised = self.y_noised_linear(y_noised.float()).view(B, -1) emb_y_past = self.y_past_linear(y_past).view(B, -1) # concat auroregressive variables and categorical ones that are always available full_concat = torch.cat((emb_y_noised, emb_y_past, cat_past.view(B, -1), cat_fut.view(B, -1)), dim=1) # concat numerical vars when available if num_past is not None: assert self.aux_past_channels>0 # check with flag in subnet init full_concat = torch.cat((full_concat, num_past.view(B, -1)), dim = 1) if num_fut is not None: assert self.aux_fut_channels>0 # check with flag in subnet init full_concat = torch.cat((full_concat, num_fut.view(B, -1)), dim = 1) eps_out = self.eps_out_sequential(full_concat).view(B, fut_step, n_var) if self.learn_var: var_out = self.var_out_sequential(full_concat.detach()).view(B, fut_step, n_var) return eps_out, var_out return eps_out
[docs] class SubNet3(nn.Module):
[docs] def __init__(self, learn_var, flag_aux_num, num_var, d_model, pred_step, num_layers, d_head, n_head, dropout): super().__init__() self.learn_var = learn_var self.flag_aux_num = flag_aux_num # Autoregressive with RNN (y NOT embedded as inpute) self.y_d_model = nn.Linear(num_var, d_model) self.rnn = sub_nn.LSTM_Model(num_var, d_model, pred_step, num_layers, dropout) self.eps_pred_grn = sub_nn.GRN(d_model, dropout) #categorical self.cat_MHA = sub_nn.InterpretableMultiHead(d_model, d_head, n_head) self.cat_grn = sub_nn.GRN(d_model, dropout) self.cat_res_conn = sub_nn.ResidualConnection(d_model, dropout) #numerical if flag_aux_num: self.num_MHA = sub_nn.InterpretableMultiHead(d_model, d_head, n_head) self.num_grn = sub_nn.GRN(d_model, dropout) self.num_res_conn = sub_nn.ResidualConnection(d_model, dropout) # EPS PREDICTION self.eps_final_grn = sub_nn.GRN(d_model, dropout) self.eps_out_linear = nn.Linear(d_model, num_var) if learn_var: self.emb_eps_pred = nn.Linear(num_var, d_model) self.var_att = sub_nn.InterpretableMultiHead(d_model, d_head, n_head) self.var_grn = sub_nn.GRN(d_model, dropout) self.var_out = nn.Linear(d_model, num_var)
[docs] def forward(self, y_noised:torch.Tensor, y_past:torch.Tensor, cat_past:torch.Tensor, cat_fut:torch.Tensor, num_past:Union[torch.Tensor,None] = None, num_fut:Union[torch.Tensor,None] = None): # Autoregressive emb_y_past = self.y_d_model(y_past) pred_y_fut = self.rnn(emb_y_past) #re-embedding future emb_pred_y_fut = self.y_d_model(pred_y_fut) emb_y_noised = self.y_d_model(y_noised.float()) eps_pred = self.eps_pred_grn(emb_pred_y_fut - emb_y_noised, using_norm=False) # Categorical contribute cat_att = self.cat_MHA(cat_fut, cat_past, emb_y_past) cat_att = self.cat_grn(cat_att, using_norm=False) eps_pred = self.cat_res_conn(cat_att, eps_pred, using_norm=False) # Numerical contribute if self.flag_aux_num: if num_past is None: num_past = torch.ones_like(cat_past) if num_fut is None: num_fut = torch.ones_like(cat_fut) num_att = self.num_MHA(num_fut, cat_past, emb_y_past) num_att = self.num_grn(num_att, using_norm=False) eps_pred = self.cat_res_conn(num_att, eps_pred, using_norm=False) eps_pred = self.eps_final_grn(eps_pred, using_norm=False) eps_pred = self.eps_out_linear(eps_pred) if self.learn_var: emb_eps_pred = self.emb_eps_pred(eps_pred.detach()) emb_eps_pred = self.var_att(emb_y_noised.detach(), emb_pred_y_fut.detach(), emb_eps_pred) emb_var_pred = self.var_grn(emb_eps_pred, using_norm=False) var_pred = self.var_out(emb_var_pred) return eps_pred, var_pred return eps_pred