Source code for dsipts.models.tft.sub_nn

import torch
import torch.nn as nn
from typing import Union

[docs] class embedding_cat_variables(nn.Module): # at the moment cat_past and cat_fut together
[docs] def __init__(self, seq_len: int, lag: int, d_model: int, emb_dims: list, device): """Class for embedding categorical variables, adding 3 positional variables during forward Args: seq_len (int): length of the sequence (sum of past and future steps) lag (int): number of future step to be predicted hiden_size (int): dimension of all variables after they are embedded emb_dims (list): size of the dictionary for embedding. One dimension for each categorical variable device : - """ super().__init__() self.seq_len = seq_len self.lag = lag self.device = device self.cat_embeds = emb_dims + [seq_len, lag+1, 2] # self.cat_n_embd = nn.ModuleList([ nn.Embedding(emb_dim, d_model) for emb_dim in self.cat_embeds ])
[docs] def forward(self, x: Union[torch.Tensor,int],device:torch.device) -> torch.Tensor: """All components of x are concatenated with 3 new variables for data augmentation, in the order: - pos_seq: assign at each step its time-position - pos_fut: assign at each step its future position. 0 if it is a past step - is_fut: explicit for each step if it is a future(1) or past one(0) Args: x (torch.Tensor): [bs, seq_len, num_vars] Returns: torch.Tensor: [bs, seq_len, num_vars+3, n_embd] """ if isinstance(x, int): no_emb = True B = x else: no_emb = False B, _, _ = x.shape pos_seq = self.get_pos_seq(bs=B).to(device) pos_fut = self.get_pos_fut(bs=B).to(device) is_fut = self.get_is_fut(bs=B).to(device) if no_emb: cat_vars = torch.cat((pos_seq, pos_fut, is_fut), dim=2) else: cat_vars = torch.cat((x, pos_seq, pos_fut, is_fut), dim=2) cat_n_embd = self.get_cat_n_embd(cat_vars) return cat_n_embd
[docs] def get_pos_seq(self, bs): pos_seq = torch.arange(0, self.seq_len) pos_seq = pos_seq.repeat(bs,1).unsqueeze(2).to(self.device) return pos_seq
[docs] def get_pos_fut(self, bs): pos_fut = torch.cat((torch.zeros((self.seq_len-self.lag), dtype=torch.long),torch.arange(1,self.lag+1))) pos_fut = pos_fut.repeat(bs,1).unsqueeze(2).to(self.device) return pos_fut
[docs] def get_is_fut(self, bs): is_fut = torch.cat((torch.zeros((self.seq_len-self.lag), dtype=torch.long),torch.ones((self.lag), dtype=torch.long))) is_fut = is_fut.repeat(bs,1).unsqueeze(2).to(self.device) return is_fut
[docs] def get_cat_n_embd(self, cat_vars): cat_n_embd = torch.Tensor().to(cat_vars.device) for index, layer in enumerate(self.cat_n_embd): emb = layer(cat_vars[:, :, index]) cat_n_embd = torch.cat((cat_n_embd, emb.unsqueeze(2)),dim=2) return cat_n_embd
[docs] class LSTM_Model(nn.Module):
[docs] def __init__(self, num_var: int, d_model: int, pred_step: int, num_layers: int, dropout: float): """LSTM from [..., d_model] to [..., predicted_step, num_of_vars] Args: num_var (int): number of variables encoded in the input tensor d_model (int): encoding dimension of the tensor pred_step (int): step to be predicted by LSTM num_layers (int): number of layers of LSTM dropout (float): """ super().__init__() self.num_var = num_var self.d_model = d_model self.num_layers = num_layers self.pred_step = pred_step self.lstm = nn.LSTM(d_model, d_model, num_layers=num_layers, batch_first=True, dropout=dropout) self.linear = nn.Linear(d_model, pred_step*num_var)
[docs] def forward(self, x): """LSTM process over the x tensor and reshaping according to pred_step and num_var to be predicted Args: x (torch.Tensor): input tensor Returns: torch.Tensor: tensor resized to [B, pred_step, num_var] """ h0 = torch.zeros(self.num_layers, x.size(0), self.d_model).to(x.device) c0 = torch.zeros(self.num_layers, x.size(0), self.d_model).to(x.device) out, _ = self.lstm(x, (h0, c0)) out = self.linear(out[:, -1, :]) # Take the last output of the sequence out = out.view(-1, self.pred_step, self.num_var) return out
[docs] class GLU(nn.Module):
[docs] def __init__(self, d_model: int): """Gated Linear Unit Auxiliary subnet for sigmoid element-wise multiplication Args: d_model (int): dimension of operations """ super().__init__() self.linear1 = nn.Linear(d_model, d_model, bias = False) self.linear2 = nn.Linear(d_model, d_model, bias = False) self.sigmoid = nn.Sigmoid()
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor: x1 = self.sigmoid(self.linear1(x)) x2 = self.linear2(x) out = x1*x2 #element-wise multiplication return out
[docs] class GRN(nn.Module):
[docs] def __init__(self, d_model: int, dropout_rate: float): """Gated Residual Network Auxiliary subnet for gating residual connections Args: d_model (int): dropout_rate (float): """ super().__init__() self.linear1 = nn.Linear(d_model, d_model) self.elu = nn.ELU() self.linear2 = nn.Linear(d_model, d_model) self.res_conn = ResidualConnection(d_model, dropout_rate)
[docs] def forward(self, x: torch.Tensor, using_norm:bool = True) -> torch.Tensor: eta1 = self.elu(self.linear1(x)) eta2 = self.linear2(eta1) out = self.res_conn(eta2, x, using_norm) return out
[docs] class ResidualConnection(nn.Module):
[docs] def __init__(self, d_model, dropout_rate): """Residual Connection of res_conn with GLU(x) Auxiliary subnet for residual connections Args: d_model (int): dropout_rate (float): """ super().__init__() self.dropout = nn.Dropout(dropout_rate) self.glu = GLU(d_model) self.norm = nn.LayerNorm(d_model)
[docs] def forward(self, x: torch.Tensor, res_conn: torch.Tensor, using_norm:bool = True) -> torch.Tensor: """Res Cionnection using normalizing computatiion on 'x' and strict 'res_conn' Args: x (torch.Tensor): GLU(dropout(x)) res_conn (torch.Tensor): tensor summed to x before normalization using_norm (bool, optional): _description_. Defaults to True. Returns: torch.Tensor: """ x = self.glu(self.dropout(x)) out = res_conn + x if using_norm: out = self.norm(out) return out
[docs] class InterpretableMultiHead(nn.Module):
[docs] def __init__(self, d_model, d_head, n_head): """Interpretable MultiHead Attention Similar to canonical MultiHead Attention with Query-Keys-Value structure Particularities are: - Only one common "Value"-Linear layer for all heads - output of all heads are summed together and then rescaled over the number of heads The final output tensor is re-embedded in the initial dimension Args: d_model (int): starting and ending dimension of the net d_head (int): hidden dimension of all heads n_head (int): number of heads """ super().__init__() self.d_head = d_head self.n_head = n_head self.Q_layers = nn.ModuleList([nn.Linear(d_model,d_head) for _ in range(n_head)]) self.K_layers = nn.ModuleList([nn.Linear(d_model,d_head) for _ in range(n_head)]) self.Softmax_layers = nn.ModuleList([nn.Softmax(dim=-1) for _ in range(n_head)]) self.V_layer = nn.Linear(d_model, d_head) self.out_layer = nn.Linear(d_head, d_model)
[docs] def forward(self, query:torch.Tensor, key:torch.Tensor, value:torch.Tensor) -> torch.Tensor: out = torch.Tensor() for (q_layer, k_layer, softmax) in zip(self.Q_layers, self.K_layers, self.Softmax_layers): Q = q_layer(query) K = k_layer(key) wei = Q @ K.transpose(-2,-1) * (self.d_head**-0.5) wei = softmax(wei) V = self.V_layer(value) out_h = wei @ V if out.shape[0]>0: out = out + out_h # sum the result of the head attention else: out = out_h # out is not modifies/initialized yet out = out / self.n_head out = self.out_layer(out) # comeback to d_model dimension return out