Source code for dsipts.data_structure.modifiers
from abc import abstractmethod,ABC
from sklearn.cluster import BisectingKMeans
from scipy.stats import bootstrap
from torch.utils.data import Dataset
import torch
import numpy as np
import logging
from .utils import MyDataset
[docs]
class VVADataset(Dataset):
[docs]
def __init__(self,x,y,y_orig,t,length_in,length_out, num_digits):
self.length_in = length_in
self.length_out = length_out
self.num_digits = num_digits
self.x_emb = torch.tensor(x).long()
self.y_emb = torch.tensor(y).long()
self.y = torch.tensor(y_orig)
self.t = t
def get_vocab_size(self):
"""
:meta private:
"""
return self.num_digits
def get_block_size(self):
"""
:meta private:
"""
return self.length * 2 - 1
[docs]
def __getitem__(self, idx):
"""
:meta private:
"""
inp = self.x_emb[idx]
sol = self.y_emb[idx]
cat = torch.cat((inp, sol), dim=0)
# the inputs to the transformer will be the offset sequence
x = cat[:-1].clone()
y = cat[1:].clone()
# we only want to predict at output locations, mask out the loss at the input locations
y[:self.length_out-1] = -1
return {'x_emb':x, 'y_emb':y, 'y':self.y[idx]}
[docs]
class Modifier(ABC):
[docs]
def __init__(self,**kwargs):
"""In the constructor you can store some parameters of the modifier. It will be saved when the timeseries is saved.
"""
super(Modifier, self).__init__()
self.__dict__.update(kwargs)
[docs]
@abstractmethod
def fit_transform(self,train:MyDataset,val:MyDataset)->[Dataset,Dataset]:
"""This funtion is called before the training procedure and it should tasnform the standard Dataset into the new Dataset
Args:
train (MyDataset): initial train `Dataset`
val (MyDataset): initial validation `Dataset`
Returns:
Dataset, Dataset: transformed train and validation `Datasets`
"""
return train,val
[docs]
@abstractmethod
def transform(self,test:MyDataset)->Dataset:
"""Similar to `fit_transform` but only transformation task will be performed, it is used in the inference function before calling the inference method
Args:
test (MyDataset): initial test `Dataset`
Returns:
Dataset: transformed test `Dataset`
"""
return test
[docs]
@abstractmethod
def inverse_transform(self,res:np.array,real:np.array)->[np.array,np.array]:
"""The results must be reverted respect to the prediction task
Args:
res (np.array): raw prediction
real (np.array): raw real data
Returns:
[np.array, np.array] : inverse transfrmation of the predictions and the real data
"""
return res
[docs]
class ModifierVVA(Modifier):
"""This modifiers is used for the custom model VVA. The initial data are divided in smaller segments and then tokenized using a clustering procedure (fit_trasform).
The centroids of the clusters are stored. A GPT model is then trained on the tokens an the predictions are reverted using the centroid information.
"""
[docs]
def fit_transform(self,train:MyDataset,val:MyDataset)->[Dataset,Dataset]:
"""BisectingKMeans is used on segments of length `token_split`
Args:
train (MyDataset): initial train `Dataset`
val (MyDataset): initial validation `Dataset`
Returns:
Dataset, Dataset: transformed train and validation `Datasets`
"""
idx_target = train.idx_target
assert len(idx_target)==1, print('This works only with single channel prediction')
samples,length,_ = train.data['y'].shape
tmp = train.data['x_num_past'][:,:,idx_target[0]].reshape(samples,-1,self.token_split)
_,length_in, _ = tmp.shape
length_out = length//self.token_split
tmp = tmp.reshape(-1,self.token_split)
cl = BisectingKMeans(n_clusters=self.max_voc_size)
clusters = cl.fit_predict(tmp)
self.cl = cl
self.centroids = []
cls, counts = np.unique(clusters,return_counts=True)
logging.info(counts)
for i in cls:
res = []
data = tmp[np.where(clusters==i)[0]]
if len(data)>1:
for j in range(data.shape[1]):
bootstrap_ci = bootstrap((data[:,j],), np.median,n_resamples=50, confidence_level=0.9,random_state=1, method='percentile')
res.append([bootstrap_ci.confidence_interval.low,np.median(data[:,j]),bootstrap_ci.confidence_interval.high])
self.centroids.append(np.array(res))
else:
self.centroids.append(np.repeat(data.T,3,axis=1))
self.centroids = np.array(self.centroids) ##clusters x length x 3
x_train = clusters.reshape(-1,length_in)
samples = train.data['y'].shape[0]
y_train = cl.predict(train.data['y'].squeeze().reshape(samples,-1,self.token_split).reshape(-1,self.token_split)).reshape(-1,length_out)
samples = val.data['y'].shape[0]
y_validation = cl.predict(val.data['y'].squeeze().reshape(samples,-1,self.token_split).reshape(-1,self.token_split)).reshape(-1,length_out)
x_validation = cl.predict(val.data['x_num_past'][:,:,idx_target[0]].reshape(samples,-1,self.token_split).reshape(-1,self.token_split)).reshape(-1,length_in)
train_dataset = VVADataset(x_train,y_train,train.data['y'].squeeze(),train.t,length_in,length_out,self.max_voc_size)
validation_dataset = VVADataset(x_validation,y_validation,val.data['y'].squeeze(),val.t,length_in,length_out,self.max_voc_size)
return train_dataset,validation_dataset
[docs]
def transform(self,test:MyDataset)->Dataset:
"""Similar to `fit_transform` but only transformation task will be performed
Args:
test (MyDataset): test val `Dataset`
Returns:
Dataset: transformed test `Dataset`
"""
idx_target = test.idx_target
samples,length,_ = test.data['y'].shape
tmp = test.data['x_num_past'][:,:,idx_target[0]].reshape(samples,-1,self.token_split)
_,length_in, _ = tmp.shape
length_out = length//self.token_split
tmp = tmp.reshape(-1,self.token_split)
clusters = self.cl.predict(tmp)
x = clusters.reshape(-1,length_in)
y = self.cl.predict(test.data['y'].squeeze().reshape(samples,-1,self.token_split).reshape(-1,self.token_split)).reshape(-1,length_out)
return VVADataset(x,y,test.data['y'].squeeze(),test.t,length_in,length_out,self.max_voc_size)
[docs]
def inverse_transform(self,res:np.array,real:np.array)->[np.array,np.array]:
"""The results must be reverted respect to the prediction task
Args:
res (np.array): raw prediction
Returns:
np.array: inverse transofrmation of the predictions
"""
tot = []
for sample in res:
tmp_sample = []
for index in sample:
tmp = []
for i in index:
tmp.append(self.centroids[i])
tmp = np.array(tmp)
if tmp.shape[0]==1:
tmp2 = tmp[0,:,:]
else:
tmp2 = tmp.mean(axis=0)
tmp2[:,0] -= 1.96*tmp.std(axis=0)[:,0] #using confidence interval
tmp2[:,2] += 1.96*tmp.std(axis=0)[:,2]
tmp_sample.append(tmp2)
tot.append(np.vstack(tmp_sample))
return np.expand_dims(np.stack(tot),2),np.expand_dims(real,2)