Working with Huge Training Data Files for PyTorch by Using a Streaming Data Loader

The most common approach for handling PyTorch training data is to write a custom Dataset class that loads data into memory, and then you serve up the data in batches using the built-in DataLoader class. This approach is simple but requires you to store all training data in memory. Therefore if your source training data is huge and won’t fit into memory, you can’t use the Dataset plus DataLoader approach.

One simple way to deal with a huge training data file is to physically break it down into smaller, more manageable files and train using the sub-files. This approach is a bit messy and is a lot of work.

A bad approach that I’ve seen repeatedly suggested on the Internet is to use the PyTorch IterableDataset class. But this approach is fatally flawed because a DataLoader that uses an IterableDatset cannot specify shuffle=True, and therefore the technique is useless for working with training data. It is possible to write a helper class that wraps IterableDataset which allows shuffling, but this is a lot of work, and just doesn’t feel right.

A third approach, and the topic of this blog post, is to write a custom streaming data loader class that reads data from a file. I’ll illustrate with a concrete example. Suppose you have a huge text file of training data that looks like:

-1   0.39   0   0   1   0.5120   1
 1   0.24   1   0   0   0.2950   2
-1   0.22   1   0   0   0.3350   0
 1   0.50   0   1   0   0.5650   1
. . .

The data represents employees. The first value on each line is sex (male = -1, female = 1), then age (divided by 100), then company city (1 0 0 = anaheim, 0 1 0 = boulder, 0 0 1 = concord), then annual income (divided by 100,000), and then job type (0 = mgmt, 1 = supp, 2 = tech). You want to predict job type from sex, age, city, income.



Click to enlarge


You can implement a steaming data loader class fairly easily. The basic idea is to maintain a buffer of data in memory (for example 10,000 lines of data) and fetch batches from the internal buffer. When the buffer is used up, you read more data from the source training data file and reload the buffer. You repeat this process until all lines of the training data have been used. You can shuffle the buffer data into random order after loading it into memory. Essentially, you programmatically break a huge file into smaller virtual files.

Conceptually the idea is very simple, but you have to be quite careful when implementing a streaming data loader to avoid off-by-one errors.

I wrote a demo. My source data is just 40 lines of training data so I could see what was happening. The demo sets a bat_size = 3, and a buff_batches = 4. This means the internal buffer will hold enough lines for 4 batches of data, which is 12 lines of data. The buffer will load three times when processing all 40 lines.

I designed the streaming data loader as a “drop_last=True” loader. Therefore, streaming through the entire 40-item file will generate 12 batches of 3 items each (with 4 items not used because there aren’t enough lines of data to fill the buffer more than 3 times). To avoid drop_last, you can make bat_size * buff_batches evenly divisible into the number of lines of training data.

Every large-file scenario requires a custom streaming data loader so you won’t find a generic StreamingDataLoader anytime soon. Put another way, if you stumbled upon this blog post looking for examples of streaming a large data file, you can use the demo code as a general guide, but you’ll have to heavily customize the demo to work with your data. There are many tradeoffs between performance and simplicity.



Three menu art illustrations for “Trader Vic’s”. Trader Vic’s is a chain of Polynesian style restaurants and Tiki bars, founded by Victor Bergeron (1902-1984). Trader Vic, and his friendly competitor “Don the Beachcomber” (Donn Beach, 1907-1989) both claim to have invented my favorite drink, the Mai Tai. All three illustrations were done by artist William Kay (1891-1983).


# stream_loader_demo.py

# PyTorch 1.7.1-CPU Anaconda3-2020.02  Python 3.7.6
# Windows 10

import numpy as np
import torch as T

# -----------------------------------------------------------

# a Dataset cannot handle files that are too big for memory
# class EmployeeDataset(T.utils.data.Dataset):
  # sex age   city     income  job
  # -1  0.27  0  1  0  0.7610  2
  # +1  0.19  0  0  1  0.6550  0
# train_ds = EmployeeDataset("employee_train.txt"
# train_ldr = T.utils.data.DataLoader(train_ds,
#   batch_size=3, shuffle=True)
# for epoch in range(max_epochs):
#   for (batch_idx, batch) in enumerate(train_ldr):
#     X = batch[0]   # predictors
#     Y = batch[1]   # correct class/label/job

# an IterableDataset does not allow shuffle in DataLoader
# class EmployeeIterableDataset(T.utils.data.IterableDataset):
  # sex age   city     income  job
  # -1  0.27  0  1  0  0.7610  2
  # +1  0.19  0  0  1  0.6550  0

# -----------------------------------------------------------

class EmployeeStreamLoader():

  def __init__(self, fn, bat_size, buff_batches, shuffle=False):
    # if bat_size = 3 buff_batches = 4, buffers hold 12 items
    self.bat_size = bat_size
    self.buff_batches = buff_batches  
    self.buff_len = self.bat_size * self.buff_batches
    self.shuffle = shuffle

    self.ptr = 0              # points into x_data and y_data
    self.fin = open(fn, "r")  # line-based text file
    self.x_data = None
    self.y_data = None

    self.reload_data()  # store tensors into x_data, y_data

  def reload_data(self):
    xy_lst = []
    ct = 0      # number of lines read

    while ct "less-than" self.buff_len:  # replace
      line = self.fin.readline()
      if line == "":    # EOF so unable to fully reload
        # self.fin.close()
        self.fin.seek(0)  # reset file for another pass
        return False

      line = line.strip()  # remove trailing newline
      np_vec = np.fromstring(line, sep="\t")
      xy_lst.append(np_vec)  # list of numpy vectors
      ct += 1

    # assert: xy_lst has len() == self.buff_len
    if self.shuffle == True:
      np.random.shuffle(xy_lst)

    xy_mat = np.array(xy_lst)  # numpy matrix 
    self.x_data = T.tensor(xy_mat[:, 0:6], \
      dtype=T.float32).to(device)
    self.y_data = T.tensor(xy_mat[:, 6], \
      dtype=T.int64).to(device)
    return True  # successfully loaded

  def __iter__(self):
    return self

  def __next__(self):  # next batch as a tuple
    # must reload now?
    if self.ptr + self.bat_size "greater-than" self.buff_len: 
      self.ptr = 0
      result = self.reload_data()
      if result == False:  # did not fully reload
        raise StopIteration

    start = self.ptr
    end = self.ptr + self.bat_size
    x = self.x_data[start:end, :]
    y = self.y_data[start:end]
    self.ptr += self.bat_size
    return (x,y)

def main():
  print("\nBegin streaming data loader demo \n")
  np.random.seed(1)

  fn = ".\\Data\\employee_train_40.txt"  # 40 lines of data

  # bat_size * buff_batches should evenly divide file length
  # to avoid dropping last few batch(es)
  # batch_size = 3 
  # buffer_batches = 4  # internally load 4 batches worth of data
  emp_ldr = EmployeeStreamLoader(fn, bat_size=3, \
    buff_batches=4, shuffle=True)  

  max_epochs = 1
  for epoch in range(max_epochs):
    print("epoch: " + str(epoch))
    for (b_idx, batch) in enumerate(emp_ldr):
      print("batch idx: " + str(b_idx))     # batch index
      print(batch[0])  # predictors
      print(batch[1])  # label
      print("")

  print("\nEnd demo ")

if __name__ == "__main__":
  main()
This entry was posted in PyTorch. Bookmark the permalink.

2 Responses to Working with Huge Training Data Files for PyTorch by Using a Streaming Data Loader

  1. B Isabel's avatar B Isabel says:

    Thanks for this. How would you use this as a dataloader with pytorch as it doesn’t extend any of the Dataset classes? Did you mean to extend IterableDataset since you implement __iter__?

    • The __iter__() method is needed only to make the class iterable so that the __next__() method will work. The __iter__() method can be added to any class definition. You can use the EmployeeStreamLoader() just like a built-in PyTorch DataLoader. Both return a tuple where item[0] is a Tensor of predictor values and item[1] is a Tensor of values-to-predict. Put another way, EmployeeStreamLoader() replaces both a custom EmployeeDataset and also the DataLoader object that would use it.

Comments are closed.