ParallelIO

Usage

use ParallelIO;

or

import ParallelIO;

Warning

the ‘parallelIO’ module is unstable and subject to change in a future release

Helper procedures for doing parallel I/O

This module provides a few procedures for reading a file’s contents into a distributed array in parallel. The procedures are designed to be used for cases where a large file contains a header followed by a continuous stream of delimited values of the same type. The procedures are:

  • readLinesAsBlockArray: read each of the lines of a file as a string or bytes value

  • readDelimitedAsBlockArray: read a file where each value is strictly separated by a delimiter, and the delimiter cannot be found in the value (e.g., CSV)

  • readItemsAsBlockArray: read a file where values are separated by a delimiter, but the delimiter can be found in the value

There are also non-distributed versions of these procedures that return a default rectangular array instead of a block-distributed array. These tend to be a faster option if the file is small enough to fit in memory on a single locale.

Two parallel iterators are also provided:

  • readLines: iterate over a file’s lines in parallel

  • readDelimited: iterate over the values of a delimited file in parallel

Both iterators only work in a standalone context (i.e., they cannot be used for zippered iteration). Adding leader/follower support is a future goal.

This module also exposes some helper procedures used to break files into chunks. These could be used as building blocks to implement other parallel I/O routines:

  • findDelimChunks: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter

  • findItemOffsets: get a prefix sum of the number of items in each chunk of a file, where the chunks are defined by the byteOffsets array, and each item is strictly separated by the given delimiter

  • findDelimChunksChecked: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter and each chunk starts with a deserializable value of the given type

iter readLines(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar) : lineType  where lineType == string || lineType == bytes

Iterate over a file’s lines in parallel.

This routine is similar to readLinesAsArray, except that it yields each line as it is read instead of returning an array of lines.

Warning

This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.

Note

Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.

Arguments:
  • filePath – a path to the file to read from

  • lineType – which type to return for each line — either string or bytes

  • header – how to handle the file header (see headerPolicy)

  • nTasks – the number of tasks used to read the file

iter readDelimited(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : t  where dt == string || dt == bytes

Iterate over the values of a delimited file in parallel.

This routine is similar to readDelimitedAsArray, except that it yields each value as it is read instead of returning an array of values.

Warning

This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.

Note

Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.

Arguments:
  • filePath – a path to the file to read from

  • t – the type of value to read from the file

  • delim – the delimiter to use to separate t values in the file

  • header – how to handle the file header (see headerPolicy)

  • nTasks – the number of tasks used to read the file

  • deserializerType – the type of deserializer to use when reading values

proc readLinesAsBlockArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = -1, targetLocales: [?d] locale = Locales) : [] lineType throws  where lineType == string || lineType == bytes

Read a file’s lines in parallel into a block-distributed array.

This routine is similar to readDelimitedAsBlockArray, except that it reads each line as a string or bytes value.

Arguments:
  • filePath – a path to the file to read from

  • lineType – which type to represent a line: either string or bytes

  • nTasks – the number of tasks to use per locale (if -1, query here.maxTaskPar on each locale)

  • header – how to handle the file header (see headerPolicy)

  • targetLocales – the locales to read the file on

Returns:

a block-distributed array of lineType values

Throws:

OffsetNotFoundError if a starting offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting to open the file

proc readLinesAsArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar) : [] lineType throws  where lineType == string || lineType == bytes

Read a file’s lines in parallel into an array.

This routine is essentially the same as readLinesAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a targetLocales argument and returns a non-distributed array.

Arguments:
  • filePath – a path to the file to read from

  • lineType – which type to represent a line: either string or bytes

  • nTasks – the number of tasks to use

  • header – how to handle the file header (see headerPolicy)

Returns:

a default rectangular array of lineType values

Throws:

OffsetNotFoundError if a starting offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting

proc readDelimitedAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws  where d.rank == 1 && (dt == bytes || dt == string)

Read a delimited file in parallel into a block-distributed array.

This routine assumes that the file is composed of a series of deserializable values of type t (optionally with a header at the beginning of the file). Each t must be separated by exactly one delimiter which can either be provided as a string or bytes value.

This routine will use the delimiter to split the file into d.size chunks of roughly equal size and read each chunk concurrently across the target locales. If multiple tasks are used per locale, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. The chunks and corresponding array indices are computed using findDelimChunks and findItemOffsets respectively.

Note

t must:

  • have a ‘deserialize method’

  • have a default (zero argument) initializer

  • not contain the delimiter in its serialized form (if it does, consider using readItemsAsBlockArray instead)

This procedure can be used for a variety of purposes, such as reading a CSV file. To do so, the delimiter should keep its default value of b"\n". The file will then be split by lines, where each line will be parsed as a t value. For CSV, the commas between t’s fields must be parsed by it’s deserialize method. This can be accomplished in one of two ways: (1) by using a custom deserialize method that parses the comma values manually, e.g.,

record color {
  var r, g, b: uint(8);
}

proc ref color.deserialize(reader, ref deserializer) throws {
  reader.read(this.r);
  reader.readLiteral(b",");
  reader.read(this.g);
  reader.readLiteral(b",");
  reader.read(this.b);
}

or (2) by using a deserializer that will handle commas appropriately with t’s default deserialize method.

Arguments:
  • filePath – a path to the file to read from

  • delim – the delimiter to use to separate t values in the file

  • t – the type of value to read from the file

  • nTasks – the number of tasks to use per locale (if -1, query here.maxTaskPar on each locale)

  • header – how to handle the file header (see headerPolicy)

  • deserializerType – the type of deserializer to use

  • targetLocales – the locales to read the file on

Returns:

a block-distributed array of t values

Throws:

OffsetNotFoundError if a starting offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting to open the file

proc readDelimitedAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws  where dt == bytes || dt == string

Read a delimited file in parallel into an array.

This procedure is essentially the same as readDelimitedAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a targetLocales argument and returns a non-distributed array.

Arguments:
  • filePath – a path to the file to read from

  • delim – the delimiter to use to separate t values in the file

  • t – the type of value to read from the file

  • nTasks – the number of tasks to use

  • header – how to handle the file header (see headerPolicy)

  • deserializerType – the type of deserializer to use

Returns:

a default rectangular array of t values

Throws:

OffsetNotFoundError if a starting offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting to open the file

proc readItemsAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws  where d.rank == 1 && (dt == bytes || dt == string)

Read items from a file in parallel into a block-distributed array.

This routine assumes that the file is composed of a series of deserializable values of type t (optionally with a header at the beginning of the file). Each t must be separated by a delimiter which can either be provided as a string or bytes value. Unlike readDelimitedAsBlockArray the delimiter can also be found in the serialized form of t.

This routine uses the following heuristic to split the file into chunks, which may not be accurate in all cases:

A given byte offset is a valid offset for a task to start deserializing values of type t if:

  • it is preceded by, or begins with the delimiter

  • a t can be deserialized at that offset (i.e., calling t.deserialize on the bytes starting at that offset does not throw an error)

The heuristic, implemented in findDelimChunksChecked, will be used to split the file in d.size chunks with a roughly equal number of items per chunk. If multiple tasks per locale are used, each locale will further decompose its chunk into smaller chunks and read each of those in parallel.

Note

t must:

  • have a ‘deserialize’ method that throws when a valid t cannot be read

  • have a default (zero argument) initializer

Arguments:
  • filePath – a path to the file to read from

  • delim – the delimiter used to guide file chunking

  • t – the type of value to read from the file

  • nTasks – the number of tasks to use per locale (if -1, query here.maxTaskPar on each locale)

  • header – how to handle the file header (see headerPolicy)

  • deserializerType – the type of deserializer to use

  • targetLocales – the locales to read the file on

Returns:

a block-distributed array of t values

Throws:

OffsetNotFoundError if a valid byte offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting to open the file

proc readItemsAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws  where dt == bytes || dt == string

Read items from a file in parallel into an array.

This procedure is essentially the same as readItemsAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a targetLocales argument and returns a non-distributed array.

Arguments:
  • filePath – a path to the file to read from

  • delim – the delimiter used to guide file chunking

  • t – the type of value to read from the file

  • nTasks – the number of tasks to use

  • header – how to handle the file header (see headerPolicy)

  • deserializerType – the type of deserializer to use

Returns:

a default rectangular array of t values

Throws:

OffsetNotFoundError if a valid byte offset cannot be found in any of the chunks

See open for other errors that could be thrown when attempting to open the file

proc findDelimChunks(const ref f: file, in delim: ?dt, n: int, bounds: range, header = headerPolicy.noHeader) : [] int throws  where dt == bytes || dt == string

Get an array of n+1 byte offsets that divide the file f into n roughly equally sized chunks, where each byte offset lines up with a delimiter.

Arguments:
  • f – the file to search

  • delim – the delimiter to use to separate the file into chunks

  • n – the number of chunks to find

  • bounds – a range of byte offsets to break into chunks

  • header – a header policy to use when searching for the first byte offset

Returns:

a length n+1 array of byte offsets (the last offset is bounds.high)

Throws:

OffsetNotFoundError if a valid byte offset cannot be found in any of the chunks

proc findDelimChunksChecked(const ref f: file, in delim: ?dt, n: int, type t, bounds: range, header = headerPolicy.noHeader, type deserializerType = defaultDeserializer) : [] int throws  where dt == bytes || dt == string

Get an array of n+1 byte offsets that divide the file f into n roughly equally sized chunks, where each byte offset lines up with a delimiter and a t can be deserialized at that offset.

This procedure is similar to findDelimChunks, except that when it finds a delimiter, it confirms that a t can be deserialized at that offset before recording it. This way, the serialized values can also contain the delimiter.

Arguments:
  • f – the file to search

  • delim – the delimiter to use to separate the file into chunks

  • n – the number of chunks to find

  • t – the type of value to read from the file

  • bounds – a range of byte offsets to break into chunks

  • header – a header policy to use when searching for the first byte offset

  • deserializerType – the type of deserializer to use

Returns:

a length n+1 array of byte offsets (the last offset is bounds.high)

Throws:

OffsetNotFoundError if a valid byte offset cannot be found in any of the chunks

proc findItemOffsets(const ref f: file, in delim: ?dt, const ref byteOffsets: [?d] int) : [d] int throws  where dt == bytes || dt == string

Get a prefix sum of the number of items in each chunk of the file f, where the chunks are defined by the byteOffsets array, and each item is separated by the given delimiter.

Arguments:
  • f – the file to search

  • delim – the delimiter used to separate items in the file

  • byteOffsets – an array of byte offsets that divide the file into chunks

Returns:

an array of length byteOffsets.size containing the number of items in the file before the start of each chunk. The last entry contains the total number of items in the file.

class OffsetNotFoundError : Error

An error thrown when a starting offset cannot be found in a chunk of a file.

proc init()
proc init(msg: string)
record headerPolicy

A type describing how to handle the file header when reading a file in parallel.

proc type headerPolicy.skipLines(n: int)

Skip the first n lines of the file

proc type headerPolicy.skipBytes(n: int)

Skip the first n bytes of the file

proc type headerPolicy.findStart

Find the first byte offset in the file that can be used to start reading

proc type headerPolicy.noHeader

Don’t expect a header in the file