Transformer Based Anomaly Detection for Tabular Data

I recently explored an anomaly detection system based on a Transformer encoder and reconstruction error. The inputs for my example were the UCI Digits dataset items. Each data item is 64 pixel values between 0 and 16. I used UCI Digits items because they easily mapped to a Transformer encoder.

The idea is that a Transformer model was originally designed for input items that are words, such as “I think therefore I am”. Each word/token is mapped to an integer ID such as [17, 283, 167, 17, 35]. Then each word/token ID is mapped to a word embedding such as 17 = [0.1234, 0.9876, 0.2468, 0.1357]. For UCI Digits data items, each pixel value corresponds to a word/token.

But what about tabular data? For example, suppose you have a dataset of employee information like so:

 1   0.24   1   0   0   0.2950   0   0   1
-1   0.39   0   0   1   0.5120   0   1   0
 1   0.63   0   1   0   0.7580   1   0   0
-1   0.36   1   0   0   0.4450   0   1   0
. . .

The columns represent sex (male = -1, female = +1), age (divided by 100), city (one-hot encoded), income (divided by 100,000) and job type (one-hot encoded).

I coded up a demo. Because the employee data aren’t integers, I skipped the standard embedding layer and replaced it with a Linear layer. I was mildly surprised when the demo system seemed to work quite well.

Very interesting.



In real life, man-eating plants are anomalous. But in old comic book covers, man-eating plants are common.


Demo code. Replace “lt”, “gt”, “lte”, “gte” with Boolean operator symbols. The Employee data can be found at:

jamesmccaffrey.wordpress.com/2022/05/17/autoencoder-anomaly-detection-using-pytorch-1-10-on-windows-11/

# employee_trans_anomaly.py
# Transformer based reconstruction error anomaly detection
# PyTorch 1.10.0-CPU Anaconda3-2020.02  Python 3.7.6
# Windows 10/11

import numpy as np
import torch as T

device = T.device('cpu') 
T.set_num_threads(1)

# -----------------------------------------------------------

class EmployeeDataset(T.utils.data.Dataset):
  # sex  age   city     income  job
  # -1   0.27  0  1  0  0.7610  0  0  1
  # +1   0.19  0  0  1  0.6550  0  1  0
  # sex: -1 = male, +1 = female
  # city: anaheim, boulder, concord
  # job: mgmt, supp, tech

  def __init__(self, src_file):
    tmp_x = np.loadtxt(src_file, usecols=range(0,9),
      delimiter="\t", comments="#", dtype=np.float32)
    self.x_data = T.tensor(tmp_x, dtype=T.float32).to(device)

  def __len__(self):
    return len(self.x_data)

  def __getitem__(self, idx):
    preds = self.x_data[idx, :]  # row idx, all cols
    sample = { 'predictors' : preds }  # as Dictionary
    return sample  

# -----------------------------------------------------------

class PositionalEncoding(T.nn.Module):  # documentation code
  def __init__(self, d_model: int, dropout: float=0.1,
   max_len: int=5000):
    super(PositionalEncoding, self).__init__()  # old syntax
    self.dropout = T.nn.Dropout(p=dropout)
    pe = T.zeros(max_len, d_model)  # like 10x4
    position = \
      T.arange(0, max_len, dtype=T.float).unsqueeze(1)
    div_term = T.exp(T.arange(0, d_model, 2).float() * \
      (-np.log(10_000.0) / d_model))
    pe[:, 0::2] = T.sin(position * div_term)
    pe[:, 1::2] = T.cos(position * div_term)
    pe = pe.unsqueeze(0).transpose(0, 1)
    self.register_buffer('pe', pe)  # allows state-save

  def forward(self, x):
    x = x + self.pe[:x.size(0), :]
    return self.dropout(x)

# -----------------------------------------------------------

class Transformer_Net(T.nn.Module):
  def __init__(self):
    # 9 numeric inputs: no exact word embedding equivalent
    # pseudo embed_dim = 4
    # seq_len = 9
    super(Transformer_Net, self).__init__()

    self.fc1 = T.nn.Linear(9, 9*4)  # pseudo-embedding

    self.pos_enc = \
      PositionalEncoding(4, dropout=0.00)  # positional

    self.enc_layer = T.nn.TransformerEncoderLayer(d_model=4,
      nhead=2, dim_feedforward=100, 
      batch_first=True)  # d_model divisible by nhead

    self.trans_enc = T.nn.TransformerEncoder(self.enc_layer,
      num_layers=6)

    self.dec1 = T.nn.Linear(36, 18)
    self.dec2 = T.nn.Linear(18, 9)

    # use default weight initialization

  def forward(self, x):
    # x is Size([bs, 9])
    z = T.tanh(self.fc1(x))   # [bs, 36]
    z = z.reshape(-1, 9, 4)   # [bs, 9, 4] 
    z = self.pos_enc(z)       # [bs, 9, 4]
    z = self.trans_enc(z)     # [bs, 9, 4]

    z = z.reshape(-1, 36)              # [bs, 36]
    z = T.tanh(self.dec1(z))           # [bs, 18]
    z = self.dec2(z)  # no activation  # [bs, 9]
  
    return z

# -----------------------------------------------------------

def analyze_error(model, ds):
  largest_err = 0.0
  worst_x = None
  worst_y = None
  n_features = len(ds[0]['predictors'])

  for i in range(len(ds)):
    X = ds[i]['predictors']
    with T.no_grad():
      Y = model(X)  # should be same as X
    err = T.sum((X-Y)*(X-Y)).item()  # SSE all features
    err = err / n_features           # sort of norm'ed SSE 

    if err "gt" largest_err:  # replace here
      largest_err = err
      worst_x = X
      worst_y = Y

  np.set_printoptions(formatter={'float': '{: 0.4f}'.format})
  print("Largest reconstruction error: %0.4f" % largest_err)
  print("Worst data item    = ")
  print(worst_x.numpy())
  print("Its reconstruction = " )
  print(worst_y.numpy())

# -----------------------------------------------------------

def main():
  # 0. get started
  print("\nBegin Employee transformer based anomaly detect ")
  T.manual_seed(0)
  np.random.seed(0)
  
  # 1. create DataLoader objects
  print("\nCreating Employee Dataset ")

  data_file = ".\\Data\\employee_all.txt"
  data_ds = EmployeeDataset(data_file)  # 240 rows

  bat_size = 10
  data_ldr = T.utils.data.DataLoader(data_ds,
    batch_size=bat_size, shuffle=True)

  # 2. create network
  print("\nCreating Transformer encoder-decoder network ")
  net = Transformer_Net().to(device)

# -----------------------------------------------------------

  # 3. train autoencoder model
  max_epochs = 100
  ep_log_interval = 10
  lrn_rate = 0.005

  loss_func = T.nn.MSELoss()
  optimizer = T.optim.Adam(net.parameters(), lr=lrn_rate)

  print("\nbat_size = %3d " % bat_size)
  print("loss = " + str(loss_func))
  print("optimizer = Adam")
  print("lrn_rate = %0.3f " % lrn_rate)
  print("max_epochs = %3d " % max_epochs)
  
  print("\nStarting training")
  net.train()
  for epoch in range(0, max_epochs):
    epoch_loss = 0  # for one full epoch

    for (batch_idx, batch) in enumerate(data_ldr):
      X = batch['predictors'] 
      Y = batch['predictors'] 

      optimizer.zero_grad()
      oupt = net(X)
      loss_val = loss_func(oupt, Y)  # a tensor
      epoch_loss += loss_val.item()  # accumulate
      loss_val.backward()
      optimizer.step()

    if epoch % ep_log_interval == 0:
      print("epoch = %4d  |  loss = %0.4f" % \
       (epoch, epoch_loss))
  print("Done ")

# -----------------------------------------------------------

  # 4. find item with largest reconstruction error
  print("\nAnalyzing data for largest reconstruction error \n")
  net.eval()
  analyze_error(net, data_ds)

  print("\nEnd transformer autoencoder anomaly demo ")

if __name__ == "__main__":
  main()
This entry was posted in Transformers. Bookmark the permalink.