ParallelIO
Usage
use ParallelIO;
or
import ParallelIO;
Warning
the ‘parallelIO’ module is unstable and subject to change in a future release
Helper procedures for doing parallel I/O
This module provides a few procedures for reading a file’s contents into a distributed array in parallel. The procedures are designed to be used for cases where a large file contains a header followed by a continuous stream of delimited values of the same type. The procedures are:
readLinesAsBlockArray: read each of the lines of a file as astringorbytesvaluereadDelimitedAsBlockArray: read a file where each value is strictly separated by a delimiter, and the delimiter cannot be found in the value (e.g., CSV)readItemsAsBlockArray: read a file where values are separated by a delimiter, but the delimiter can be found in the value
There are also non-distributed versions of these procedures that return a default rectangular array instead of a block-distributed array. These tend to be a faster option if the file is small enough to fit in memory on a single locale.
Two parallel iterators are also provided:
readLines: iterate over a file’s lines in parallelreadDelimited: iterate over the values of a delimited file in parallel
Both iterators only work in a standalone context (i.e., they cannot be used for zippered iteration). Adding leader/follower support is a future goal.
This module also exposes some helper procedures used to break files into chunks. These could be used as building blocks to implement other parallel I/O routines:
findDelimChunks: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiterfindItemOffsets: get a prefix sum of the number of items in each chunk of a file, where the chunks are defined by thebyteOffsetsarray, and each item is strictly separated by the given delimiterfindDelimChunksChecked: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter and each chunk starts with a deserializable value of the given type
- iter readLines(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, targetLocales: [] locale = [here]) : lineType where lineType == string || lineType == bytes
Iterate over a file’s lines in parallel.
This routine is similar to
readLinesAsArray, except that it yields each line as it is read instead of returning an array of lines.Example:
use ParallelIO; var sum = 0; forall line in readLines("ints.txt") with (+ reduce sum) do sum += line:int;
Warning
This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.
Note
Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.
- Arguments:
filePath – the file to read
lineType – which type to return for each line: either
stringorbytes— defaults tostringheader – how to handle the file header (see
headerPolicy)nTasks – the number of tasks used to read the file — defaults to
here.maxTaskPartargetLocales – the locales to use for reading the file — by default, only the calling locale is used
- iter readDelimited(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer, targetLocales: [] locale = [here]) : t where dt == string || dt == bytes
Iterate over the values of a delimited file in parallel.
This routine is similar to
readDelimitedAsArray, except that it yields each value as it is read instead of returning an array of values.Example:
use IO, ParallelIO; record color { var r, g, b: uint(8); proc ref deserialize(reader: fileReader(?), ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } } forall c in readDelimited("colors.csv", color, header=headerPolicy.skipLines(1)) do processColor(c);
Warning
This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.
Note
Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
tvalues in the file — defaults to the newline characterheader – how to handle the file header (see
headerPolicy)nTasks – the number of tasks used to read the file — defaults to
here.maxTaskPardeserializerType – The type of deserializer to use when reading values — defaults to the I/O module’s default deserializer
targetLocales – the locales to use for reading the file — by default, only the calling locale is used
- proc readLinesAsBlockArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = -1, targetLocales: [?d] locale = Locales) : [] lineType throws where lineType == string || lineType == bytes
Read a file’s lines in parallel into a block-distributed array.
This routine is similar to
readDelimitedAsBlockArray, except that it reads each line as astringorbytesvalue.- Arguments:
filePath – the file to read
lineType – the element type of the returned array: either
stringorbytes— defaults tostringnTasks – the number of tasks to use per locale — defaults to
-1, meaning each locale should queryhere.maxTaskParheader – how to handle the file header (see
headerPolicy)targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
lineTypevalues- Throws:
OffsetNotFoundErrorif a starting offset cannot be found in any of the chunks
See
openfor other errors that could be thrown when attempting to open the file
- proc readLinesAsArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar) : [] lineType throws where lineType == string || lineType == bytes
Read a file’s lines in parallel into an array.
This routine is essentially the same as
readLinesAsBlockArray, except that it only executes on the calling locale. As such, it does not accept atargetLocalesargument and returns a non-distributed array.- Arguments:
filePath – the file to read
lineType – the element type of the returned array: either
stringorbytes— defaults tostringnTasks – the number of tasks to use — defaults to
here.maxTaskParheader – how to handle the file header (see
headerPolicy)
- Returns:
a default rectangular array of
lineTypevalues- Throws:
OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
See
openfor other errors that could be thrown when attempting
- proc readDelimitedAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)
Read a delimited file in parallel into a block-distributed array.
This routine assumes that the file is composed of a series of deserializable values of type
t(optionally with a header at the beginning of the file). Eachtmust be separated by exactly one delimiter which can either be provided as astringorbytesvalue.This routine will use the delimiter to split the file into
d.sizechunks of roughly equal size and read each chunk concurrently across the target locales. If multiple tasks are used per locale, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. The chunks and corresponding array indices are computed usingfindDelimChunksandfindItemOffsetsrespectively.Note
tmust:have a ‘deserialize method’
have a default (zero argument) initializer
not contain the delimiter in its serialized form (if it does, consider using
readItemsAsBlockArrayinstead)
This procedure can be used for a variety of purposes, such as reading a CSV file. To do so, the delimiter should keep its default value of
b"\n". The file will then be split by lines, where each line will be parsed as atvalue. For CSV, the commas betweent’s fields must be parsed by it’sdeserializemethod. This can be accomplished in one of two ways: (1) by using a custom deserialize method that parses the comma values manually (like in the example below), or (2) by using a deserializer that will handle commas appropriately witht’s defaultdeserializemethod.Example:
use IO, ParallelIO; record color { var r, g, b: uint(8); } proc ref color.deserialize(reader, ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } var colors = readDelimitedAsBlockArray( "colors.csv", t=color, header=headerPolicy.skipLines(1) );
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
tvalues in the file — defaults to the newline characternTasks – the number of tasks to use per locale — defaults to
-1, meaning each locale should queryhere.maxTaskParheader – how to handle the file header (see
headerPolicy)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
tvalues- Throws:
OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
See
openfor other errors that could be thrown when attempting to open the file
- proc readDelimitedAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string
Read a delimited file in parallel into an array.
This procedure is essentially the same as
readDelimitedAsBlockArray, except that it only executes on the calling locale. As such, it does not accept atargetLocalesargument and returns a non-distributed array.- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
tvalues in the file — defaults to the newline characternTasks – the number of tasks to use — defaults to
here.maxTaskParheader – how to handle the file header (see
headerPolicy)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
- Returns:
a default rectangular array of
tvalues- Throws:
OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
See
openfor other errors that could be thrown when attempting to open the file
- proc readItemsAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)
Read items from a file in parallel into a block-distributed array.
This routine assumes that the file is composed of a series of deserializable values of type
t(optionally with a header at the beginning of the file). Eachtmust be separated by a delimiter which can either be provided as astringorbytesvalue. UnlikereadDelimitedAsBlockArraythe delimiter can also be found in the serialized form oft.This routine uses the following heuristic to split the file into chunks, which may not be accurate in all cases:
A given byte offset is a valid offset for a task to start deserializing values of type
tif:it is preceded by, or begins with the delimiter
a
tcan be deserialized at that offset (i.e., callingt.deserializeon the bytes starting at that offset does not throw an error)
The heuristic, implemented in
findDelimChunksChecked, will be used to split the file ind.sizechunks with a roughly equal number of items per chunk. If multiple tasks per locale are used, each locale will further decompose its chunk into smaller chunks and read each of those in parallel.Note
tmust:have a ‘deserialize’ method that throws when a valid
tcannot be readhave a default (zero argument) initializer
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter used to guide file chunking - defaults to the newline character
nTasks – the number of tasks to use per locale — defaults to
-1, meaning each locale should queryhere.maxTaskParheader – how to handle the file header (see
headerPolicy)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
tvalues- Throws:
OffsetNotFoundErrorif a valid byte offset cannot be found in one or more of the chunks
See
openfor other errors that could be thrown when attempting to open the file
- proc readItemsAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string
Read items from a file in parallel into an array.
This procedure is essentially the same as
readItemsAsBlockArray, except that it only executes on the calling locale. As such, it does not accept atargetLocalesargument and returns a non-distributed array.- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter used to guide file chunking - defaults to the newline character
nTasks – the number of tasks to use — defaults to
here.maxTaskParheader – how to handle the file header (see
headerPolicy)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
- Returns:
a default rectangular array of
tvalues- Throws:
OffsetNotFoundErrorif a valid byte offset cannot be found in one or more of the chunks
See
openfor other errors that could be thrown when attempting to open the file
- proc findDelimChunks(const ref f: file, in delim: ?dt, n: int, bounds: range, header = headerPolicy.noHeader) : [] int throws where dt == bytes || dt == string
Get an array of
n+1byte offsets that divide the filefintonroughly equally sized chunks, where each byte offset lines up with a delimiter.- Arguments:
f – the file to search
delim – the delimiter to use to separate the file into chunks
n – the number of chunks to find
bounds – a range of byte offsets to break into chunks
header – a header policy to use when searching for the first byte offset
- Returns:
a length
n+1array of byte offsets (the last offset isbounds.high)- Throws:
OffsetNotFoundErrorif a valid byte offset cannot be found in any of the chunks
- proc findDelimChunksChecked(const ref f: file, in delim: ?dt, n: int, type t, bounds: range, header = headerPolicy.noHeader, type deserializerType = defaultDeserializer) : [] int throws where dt == bytes || dt == string
Get an array of
n+1byte offsets that divide the filefintonroughly equally sized chunks, where each byte offset lines up with a delimiter and atcan be deserialized at that offset.This procedure is similar to
findDelimChunks, except that when it finds a delimiter, it confirms that atcan be deserialized at that offset before recording it. This way, the serialized values can also contain the delimiter.- Arguments:
f – the file to search
delim – the delimiter to use to separate the file into chunks
n – the number of chunks to find
t – the type of value to read from the file
bounds – a range of byte offsets to break into chunks
header – a header policy to use when searching for the first byte offset
deserializerType – the type of deserializer to use
- Returns:
a length
n+1array of byte offsets (the last offset isbounds.high)- Throws:
OffsetNotFoundErrorif a valid byte offset cannot be found in any of the chunks
- proc findItemOffsets(const ref f: file, in delim: ?dt, const ref byteOffsets: [?d] int) : [d] int throws where dt == bytes || dt == string
Get a prefix sum of the number of items in each chunk of the file
f, where the chunks are defined by thebyteOffsetsarray, and each item is separated by the given delimiter.- Arguments:
f – the file to search
delim – the delimiter used to separate items in the file
byteOffsets – an array of byte offsets that divide the file into chunks
- Returns:
an array of length
byteOffsets.sizecontaining the number of items in the file before the start of each chunk. The last entry contains the total number of items in the file.
- class OffsetNotFoundError : Error
An error thrown when a starting offset cannot be found in a chunk of a file.
- proc init()
- proc init(msg: string)
- record headerPolicy
A type describing how to handle the file header when reading a file in parallel.
- proc type headerPolicy.skipLines(n: int)
Skip the first
nlines of the file
- proc type headerPolicy.skipBytes(n: int)
Skip the first
nbytes of the file
- proc type headerPolicy.findStart
Find the first byte offset in the file that can be used to start reading
- proc type headerPolicy.noHeader
Don’t expect a header in the file