ParallelIO¶
Usage
use ParallelIO;
or
import ParallelIO;
Warning
the ‘parallelIO’ module is unstable and subject to change in a future release
Helper procedures for doing parallel I/O
This module provides a few procedures for reading a file’s contents into a distributed array in parallel. The procedures are designed to be used for cases where a large file contains a header followed by a continuous stream of delimited values of the same type. The procedures are:
readLinesAsBlockArray
: read each of the lines of a file as astring
orbytes
valuereadDelimitedAsBlockArray
: read a file where each value is strictly separated by a delimiter, and the delimiter cannot be found in the value (e.g., CSV)readItemsAsBlockArray
: read a file where values are separated by a delimiter, but the delimiter can be found in the value
There are also non-distributed versions of these procedures that return a default rectangular array instead of a block-distributed array. These tend to be a faster option if the file is small enough to fit in memory on a single locale.
Two parallel iterators are also provided:
readLines
: iterate over a file’s lines in parallelreadDelimited
: iterate over the values of a delimited file in parallel
Both iterators only work in a standalone context (i.e., they cannot be used for zippered iteration). Adding leader/follower support is a future goal.
This module also exposes some helper procedures used to break files into chunks. These could be used as building blocks to implement other parallel I/O routines:
findDelimChunks
: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiterfindItemOffsets
: get a prefix sum of the number of items in each chunk of a file, where the chunks are defined by thebyteOffsets
array, and each item is strictly separated by the given delimiterfindDelimChunksChecked
: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter and each chunk starts with a deserializable value of the given type
- iter readLines(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, targetLocales: [] locale = [here]) : lineType where lineType == string || lineType == bytes¶
Iterate over a file’s lines in parallel.
This routine is similar to
readLinesAsArray
, except that it yields each line as it is read instead of returning an array of lines.Example:
use ParallelIO; var sum = 0; forall line in readLines("ints.txt") with (+ reduce sum) do sum += line:int;
Warning
This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.
Note
Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.
- Arguments:
filePath – the file to read
lineType – which type to return for each line: either
string
orbytes
— defaults tostring
header – how to handle the file header (see
headerPolicy
)nTasks – the number of tasks used to read the file — defaults to
here.maxTaskPar
targetLocales – the locales to use for reading the file — by default, only the calling locale is used
- iter readDelimited(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer, targetLocales: [] locale = [here]) : t where dt == string || dt == bytes¶
Iterate over the values of a delimited file in parallel.
This routine is similar to
readDelimitedAsArray
, except that it yields each value as it is read instead of returning an array of values.Example:
use IO, ParallelIO; record color { var r, g, b: uint(8); proc ref deserialize(reader: fileReader(?), ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } } forall c in readDelimited("colors.csv", color, header=headerPolicy.skipLines(1)) do processColor(c);
Warning
This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release.
Note
Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context.
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
t
values in the file — defaults to the newline characterheader – how to handle the file header (see
headerPolicy
)nTasks – the number of tasks used to read the file — defaults to
here.maxTaskPar
deserializerType – The type of deserializer to use when reading values — defaults to the I/O module’s default deserializer
targetLocales – the locales to use for reading the file — by default, only the calling locale is used
- proc readLinesAsBlockArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = -1, targetLocales: [?d] locale = Locales) : [] lineType throws where lineType == string || lineType == bytes¶
Read a file’s lines in parallel into a block-distributed array.
This routine is similar to
readDelimitedAsBlockArray
, except that it reads each line as astring
orbytes
value.- Arguments:
filePath – the file to read
lineType – the element type of the returned array: either
string
orbytes
— defaults tostring
nTasks – the number of tasks to use per locale — defaults to
-1
, meaning each locale should queryhere.maxTaskPar
header – how to handle the file header (see
headerPolicy
)targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
lineType
values- Throws:
OffsetNotFoundError
if a starting offset cannot be found in any of the chunks
See
open
for other errors that could be thrown when attempting to open the file
- proc readLinesAsArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar) : [] lineType throws where lineType == string || lineType == bytes¶
Read a file’s lines in parallel into an array.
This routine is essentially the same as
readLinesAsBlockArray
, except that it only executes on the calling locale. As such, it does not accept atargetLocales
argument and returns a non-distributed array.- Arguments:
filePath – the file to read
lineType – the element type of the returned array: either
string
orbytes
— defaults tostring
nTasks – the number of tasks to use — defaults to
here.maxTaskPar
header – how to handle the file header (see
headerPolicy
)
- Returns:
a default rectangular array of
lineType
values- Throws:
OffsetNotFoundError
if a starting offset cannot be found in one or more of the chunks
See
open
for other errors that could be thrown when attempting
- proc readDelimitedAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)¶
Read a delimited file in parallel into a block-distributed array.
This routine assumes that the file is composed of a series of deserializable values of type
t
(optionally with a header at the beginning of the file). Eacht
must be separated by exactly one delimiter which can either be provided as astring
orbytes
value.This routine will use the delimiter to split the file into
d.size
chunks of roughly equal size and read each chunk concurrently across the target locales. If multiple tasks are used per locale, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. The chunks and corresponding array indices are computed usingfindDelimChunks
andfindItemOffsets
respectively.Note
t
must:have a ‘deserialize method’
have a default (zero argument) initializer
not contain the delimiter in its serialized form (if it does, consider using
readItemsAsBlockArray
instead)
This procedure can be used for a variety of purposes, such as reading a CSV file. To do so, the delimiter should keep its default value of
b"\n"
. The file will then be split by lines, where each line will be parsed as at
value. For CSV, the commas betweent
’s fields must be parsed by it’sdeserialize
method. This can be accomplished in one of two ways: (1) by using a custom deserialize method that parses the comma values manually (like in the example below), or (2) by using a deserializer that will handle commas appropriately witht
’s defaultdeserialize
method.Example:
use IO, ParallelIO; record color { var r, g, b: uint(8); } proc ref color.deserialize(reader, ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } var colors = readDelimitedAsBlockArray( "colors.csv", t=color, header=headerPolicy.skipLines(1) );
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
t
values in the file — defaults to the newline characternTasks – the number of tasks to use per locale — defaults to
-1
, meaning each locale should queryhere.maxTaskPar
header – how to handle the file header (see
headerPolicy
)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
t
values- Throws:
OffsetNotFoundError
if a starting offset cannot be found in one or more of the chunks
See
open
for other errors that could be thrown when attempting to open the file
- proc readDelimitedAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string¶
Read a delimited file in parallel into an array.
This procedure is essentially the same as
readDelimitedAsBlockArray
, except that it only executes on the calling locale. As such, it does not accept atargetLocales
argument and returns a non-distributed array.- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter to use to separate
t
values in the file — defaults to the newline characternTasks – the number of tasks to use — defaults to
here.maxTaskPar
header – how to handle the file header (see
headerPolicy
)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
- Returns:
a default rectangular array of
t
values- Throws:
OffsetNotFoundError
if a starting offset cannot be found in one or more of the chunks
See
open
for other errors that could be thrown when attempting to open the file
- proc readItemsAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)¶
Read items from a file in parallel into a block-distributed array.
This routine assumes that the file is composed of a series of deserializable values of type
t
(optionally with a header at the beginning of the file). Eacht
must be separated by a delimiter which can either be provided as astring
orbytes
value. UnlikereadDelimitedAsBlockArray
the delimiter can also be found in the serialized form oft
.This routine uses the following heuristic to split the file into chunks, which may not be accurate in all cases:
A given byte offset is a valid offset for a task to start deserializing values of type
t
if:it is preceded by, or begins with the delimiter
a
t
can be deserialized at that offset (i.e., callingt.deserialize
on the bytes starting at that offset does not throw an error)
The heuristic, implemented in
findDelimChunksChecked
, will be used to split the file ind.size
chunks with a roughly equal number of items per chunk. If multiple tasks per locale are used, each locale will further decompose its chunk into smaller chunks and read each of those in parallel.Note
t
must:have a ‘deserialize’ method that throws when a valid
t
cannot be readhave a default (zero argument) initializer
- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter used to guide file chunking - defaults to the newline character
nTasks – the number of tasks to use per locale — defaults to
-1
, meaning each locale should queryhere.maxTaskPar
header – how to handle the file header (see
headerPolicy
)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
targetLocales – the locales to read the file on and the target locales for the returned block-distributed array
- Returns:
a block-distributed array of
t
values- Throws:
OffsetNotFoundError
if a valid byte offset cannot be found in one or more of the chunks
See
open
for other errors that could be thrown when attempting to open the file
- proc readItemsAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string¶
Read items from a file in parallel into an array.
This procedure is essentially the same as
readItemsAsBlockArray
, except that it only executes on the calling locale. As such, it does not accept atargetLocales
argument and returns a non-distributed array.- Arguments:
filePath – the file to read
t – the type of value to read from the file
delim – the delimiter used to guide file chunking - defaults to the newline character
nTasks – the number of tasks to use — defaults to
here.maxTaskPar
header – how to handle the file header (see
headerPolicy
)deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer
- Returns:
a default rectangular array of
t
values- Throws:
OffsetNotFoundError
if a valid byte offset cannot be found in one or more of the chunks
See
open
for other errors that could be thrown when attempting to open the file
- proc findDelimChunks(const ref f: file, in delim: ?dt, n: int, bounds: range, header = headerPolicy.noHeader) : [] int throws where dt == bytes || dt == string¶
Get an array of
n+1
byte offsets that divide the filef
inton
roughly equally sized chunks, where each byte offset lines up with a delimiter.- Arguments:
f – the file to search
delim – the delimiter to use to separate the file into chunks
n – the number of chunks to find
bounds – a range of byte offsets to break into chunks
header – a header policy to use when searching for the first byte offset
- Returns:
a length
n+1
array of byte offsets (the last offset isbounds.high
)- Throws:
OffsetNotFoundError
if a valid byte offset cannot be found in any of the chunks
- proc findDelimChunksChecked(const ref f: file, in delim: ?dt, n: int, type t, bounds: range, header = headerPolicy.noHeader, type deserializerType = defaultDeserializer) : [] int throws where dt == bytes || dt == string¶
Get an array of
n+1
byte offsets that divide the filef
inton
roughly equally sized chunks, where each byte offset lines up with a delimiter and at
can be deserialized at that offset.This procedure is similar to
findDelimChunks
, except that when it finds a delimiter, it confirms that at
can be deserialized at that offset before recording it. This way, the serialized values can also contain the delimiter.- Arguments:
f – the file to search
delim – the delimiter to use to separate the file into chunks
n – the number of chunks to find
t – the type of value to read from the file
bounds – a range of byte offsets to break into chunks
header – a header policy to use when searching for the first byte offset
deserializerType – the type of deserializer to use
- Returns:
a length
n+1
array of byte offsets (the last offset isbounds.high
)- Throws:
OffsetNotFoundError
if a valid byte offset cannot be found in any of the chunks
- proc findItemOffsets(const ref f: file, in delim: ?dt, const ref byteOffsets: [?d] int) : [d] int throws where dt == bytes || dt == string¶
Get a prefix sum of the number of items in each chunk of the file
f
, where the chunks are defined by thebyteOffsets
array, and each item is separated by the given delimiter.- Arguments:
f – the file to search
delim – the delimiter used to separate items in the file
byteOffsets – an array of byte offsets that divide the file into chunks
- Returns:
an array of length
byteOffsets.size
containing the number of items in the file before the start of each chunk. The last entry contains the total number of items in the file.
- class OffsetNotFoundError : Error¶
An error thrown when a starting offset cannot be found in a chunk of a file.
- proc init()¶
- proc init(msg: string)
- record headerPolicy¶
A type describing how to handle the file header when reading a file in parallel.
- proc type headerPolicy.skipLines(n: int)¶
Skip the first
n
lines of the file
- proc type headerPolicy.skipBytes(n: int)¶
Skip the first
n
bytes of the file
- proc type headerPolicy.findStart¶
Find the first byte offset in the file that can be used to start reading
- proc type headerPolicy.noHeader¶
Don’t expect a header in the file