ParallelIO
Usage
use ParallelIO;
or
import ParallelIO;
Warning
the ‘parallelIO’ module is unstable and subject to change in a future release
Helper procedures for doing parallel I/O
This module provides a few procedures for reading a file’s contents into a distributed array in parallel. The procedures are designed to be used for cases where a large file contains a header followed by a continuous stream of delimited values of the same type. The procedures are:
- readLinesAsBlockArray: read each of the lines of a file as a- stringor- bytesvalue
- readDelimitedAsBlockArray: read a file where each value is strictly separated by a delimiter, and the delimiter cannot be found in the value (e.g., CSV)
- readItemsAsBlockArray: read a file where values are separated by a delimiter, but the delimiter can be found in the value
There are also non-distributed versions of these procedures that return a default rectangular array instead of a block-distributed array. These tend to be a faster option if the file is small enough to fit in memory on a single locale.
Two parallel iterators are also provided:
- readLines: iterate over a file’s lines in parallel
- readDelimited: iterate over the values of a delimited file in parallel
Both iterators only work in a standalone context (i.e., they cannot be used for zippered iteration). Adding leader/follower support is a future goal.
This module also exposes some helper procedures used to break files into chunks. These could be used as building blocks to implement other parallel I/O routines:
- findDelimChunks: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter
- findItemOffsets: get a prefix sum of the number of items in each chunk of a file, where the chunks are defined by the- byteOffsetsarray, and each item is strictly separated by the given delimiter
- findDelimChunksChecked: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter and each chunk starts with a deserializable value of the given type
- iter readLines(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, targetLocales: [] locale = [here]) : lineType where lineType == string || lineType == bytes
- Iterate over a file’s lines in parallel. - This routine is similar to - readLinesAsArray, except that it yields each line as it is read instead of returning an array of lines.- Example: - use ParallelIO; var sum = 0; forall line in readLines("ints.txt") with (+ reduce sum) do sum += line:int; - Warning - This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release. - Note - Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context. - Arguments:
- filePath – the file to read 
- lineType – which type to return for each line: either - stringor- bytes— defaults to- string
- header – how to handle the file header (see - headerPolicy)
- nTasks – the number of tasks used to read the file — defaults to - here.maxTaskPar
- targetLocales – the locales to use for reading the file — by default, only the calling locale is used 
 
 
- iter readDelimited(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer, targetLocales: [] locale = [here]) : t where dt == string || dt == bytes
- Iterate over the values of a delimited file in parallel. - This routine is similar to - readDelimitedAsArray, except that it yields each value as it is read instead of returning an array of values.- Example: - use IO, ParallelIO; record color { var r, g, b: uint(8); proc ref deserialize(reader: fileReader(?), ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } } forall c in readDelimited("colors.csv", color, header=headerPolicy.skipLines(1)) do processColor(c); - Warning - This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release. - Note - Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context. - Arguments:
- filePath – the file to read 
- t – the type of value to read from the file 
- delim – the delimiter to use to separate - tvalues in the file — defaults to the newline character
- header – how to handle the file header (see - headerPolicy)
- nTasks – the number of tasks used to read the file — defaults to - here.maxTaskPar
- deserializerType – The type of deserializer to use when reading values — defaults to the I/O module’s default deserializer 
- targetLocales – the locales to use for reading the file — by default, only the calling locale is used 
 
 
- proc readLinesAsBlockArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = -1, targetLocales: [?d] locale = Locales) : [] lineType throws where lineType == string || lineType == bytes
- Read a file’s lines in parallel into a block-distributed array. - This routine is similar to - readDelimitedAsBlockArray, except that it reads each line as a- stringor- bytesvalue.- Arguments:
- filePath – the file to read 
- lineType – the element type of the returned array: either - stringor- bytes— defaults to- string
- nTasks – the number of tasks to use per locale — defaults to - -1, meaning each locale should query- here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
- targetLocales – the locales to read the file on and the target locales for the returned block-distributed array 
 
- Returns:
- a block-distributed array of - lineTypevalues
- Throws:
- OffsetNotFoundErrorif a starting offset cannot be found in any of the chunks
 - See - openfor other errors that could be thrown when attempting to open the file
- proc readLinesAsArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar) : [] lineType throws where lineType == string || lineType == bytes
- Read a file’s lines in parallel into an array. - This routine is essentially the same as - readLinesAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a- targetLocalesargument and returns a non-distributed array.- Arguments:
- filePath – the file to read 
- lineType – the element type of the returned array: either - stringor- bytes— defaults to- string
- nTasks – the number of tasks to use — defaults to - here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
 
- Returns:
- a default rectangular array of - lineTypevalues
- Throws:
- OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
 - See - openfor other errors that could be thrown when attempting
- proc readDelimitedAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)
- Read a delimited file in parallel into a block-distributed array. - This routine assumes that the file is composed of a series of deserializable values of type - t(optionally with a header at the beginning of the file). Each- tmust be separated by exactly one delimiter which can either be provided as a- stringor- bytesvalue.- This routine will use the delimiter to split the file into - d.sizechunks of roughly equal size and read each chunk concurrently across the target locales. If multiple tasks are used per locale, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. The chunks and corresponding array indices are computed using- findDelimChunksand- findItemOffsetsrespectively.- Note - tmust:- have a ‘deserialize method’ 
- have a default (zero argument) initializer 
- not contain the delimiter in its serialized form (if it does, consider using - readItemsAsBlockArrayinstead)
 - This procedure can be used for a variety of purposes, such as reading a CSV file. To do so, the delimiter should keep its default value of - b"\n". The file will then be split by lines, where each line will be parsed as a- tvalue. For CSV, the commas between- t’s fields must be parsed by it’s- deserializemethod. This can be accomplished in one of two ways: (1) by using a custom deserialize method that parses the comma values manually (like in the example below), or (2) by using a deserializer that will handle commas appropriately with- t’s default- deserializemethod.- Example: - use IO, ParallelIO; record color { var r, g, b: uint(8); } proc ref color.deserialize(reader, ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } var colors = readDelimitedAsBlockArray( "colors.csv", t=color, header=headerPolicy.skipLines(1) ); - Arguments:
- filePath – the file to read 
- t – the type of value to read from the file 
- delim – the delimiter to use to separate - tvalues in the file — defaults to the newline character
- nTasks – the number of tasks to use per locale — defaults to - -1, meaning each locale should query- here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
- deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer 
- targetLocales – the locales to read the file on and the target locales for the returned block-distributed array 
 
- Returns:
- a block-distributed array of - tvalues
- Throws:
- OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
 - See - openfor other errors that could be thrown when attempting to open the file
- proc readDelimitedAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string
- Read a delimited file in parallel into an array. - This procedure is essentially the same as - readDelimitedAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a- targetLocalesargument and returns a non-distributed array.- Arguments:
- filePath – the file to read 
- t – the type of value to read from the file 
- delim – the delimiter to use to separate - tvalues in the file — defaults to the newline character
- nTasks – the number of tasks to use — defaults to - here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
- deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer 
 
- Returns:
- a default rectangular array of - tvalues
- Throws:
- OffsetNotFoundErrorif a starting offset cannot be found in one or more of the chunks
 - See - openfor other errors that could be thrown when attempting to open the file
- proc readItemsAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales) : [] t throws where d.rank == 1 && (dt == bytes || dt == string)
- Read items from a file in parallel into a block-distributed array. - This routine assumes that the file is composed of a series of deserializable values of type - t(optionally with a header at the beginning of the file). Each- tmust be separated by a delimiter which can either be provided as a- stringor- bytesvalue. Unlike- readDelimitedAsBlockArraythe delimiter can also be found in the serialized form of- t.- This routine uses the following heuristic to split the file into chunks, which may not be accurate in all cases: - A given byte offset is a valid offset for a task to start deserializing values of type - tif:- it is preceded by, or begins with the delimiter 
- a - tcan be deserialized at that offset (i.e., calling- t.deserializeon the bytes starting at that offset does not throw an error)
 - The heuristic, implemented in - findDelimChunksChecked, will be used to split the file in- d.sizechunks with a roughly equal number of items per chunk. If multiple tasks per locale are used, each locale will further decompose its chunk into smaller chunks and read each of those in parallel.- Note - tmust:- have a ‘deserialize’ method that throws when a valid - tcannot be read
- have a default (zero argument) initializer 
 - Arguments:
- filePath – the file to read 
- t – the type of value to read from the file 
- delim – the delimiter used to guide file chunking - defaults to the newline character 
- nTasks – the number of tasks to use per locale — defaults to - -1, meaning each locale should query- here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
- deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer 
- targetLocales – the locales to read the file on and the target locales for the returned block-distributed array 
 
- Returns:
- a block-distributed array of - tvalues
- Throws:
- OffsetNotFoundErrorif a valid byte offset cannot be found in one or more of the chunks
 - See - openfor other errors that could be thrown when attempting to open the file
- proc readItemsAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer) : [] t throws where dt == bytes || dt == string
- Read items from a file in parallel into an array. - This procedure is essentially the same as - readItemsAsBlockArray, except that it only executes on the calling locale. As such, it does not accept a- targetLocalesargument and returns a non-distributed array.- Arguments:
- filePath – the file to read 
- t – the type of value to read from the file 
- delim – the delimiter used to guide file chunking - defaults to the newline character 
- nTasks – the number of tasks to use — defaults to - here.maxTaskPar
- header – how to handle the file header (see - headerPolicy)
- deserializerType – the type of deserializer to use — defaults to the I/O module’s default deserializer 
 
- Returns:
- a default rectangular array of - tvalues
- Throws:
- OffsetNotFoundErrorif a valid byte offset cannot be found in one or more of the chunks
 - See - openfor other errors that could be thrown when attempting to open the file
- proc findDelimChunks(const ref f: file, in delim: ?dt, n: int, bounds: range, header = headerPolicy.noHeader) : [] int throws where dt == bytes || dt == string
- Get an array of - n+1byte offsets that divide the file- finto- nroughly equally sized chunks, where each byte offset lines up with a delimiter.- Arguments:
- f – the file to search 
- delim – the delimiter to use to separate the file into chunks 
- n – the number of chunks to find 
- bounds – a range of byte offsets to break into chunks 
- header – a header policy to use when searching for the first byte offset 
 
- Returns:
- a length - n+1array of byte offsets (the last offset is- bounds.high)
- Throws:
- OffsetNotFoundErrorif a valid byte offset cannot be found in any of the chunks
 
- proc findDelimChunksChecked(const ref f: file, in delim: ?dt, n: int, type t, bounds: range, header = headerPolicy.noHeader, type deserializerType = defaultDeserializer) : [] int throws where dt == bytes || dt == string
- Get an array of - n+1byte offsets that divide the file- finto- nroughly equally sized chunks, where each byte offset lines up with a delimiter and a- tcan be deserialized at that offset.- This procedure is similar to - findDelimChunks, except that when it finds a delimiter, it confirms that a- tcan be deserialized at that offset before recording it. This way, the serialized values can also contain the delimiter.- Arguments:
- f – the file to search 
- delim – the delimiter to use to separate the file into chunks 
- n – the number of chunks to find 
- t – the type of value to read from the file 
- bounds – a range of byte offsets to break into chunks 
- header – a header policy to use when searching for the first byte offset 
- deserializerType – the type of deserializer to use 
 
- Returns:
- a length - n+1array of byte offsets (the last offset is- bounds.high)
- Throws:
- OffsetNotFoundErrorif a valid byte offset cannot be found in any of the chunks
 
- proc findItemOffsets(const ref f: file, in delim: ?dt, const ref byteOffsets: [?d] int) : [d] int throws where dt == bytes || dt == string
- Get a prefix sum of the number of items in each chunk of the file - f, where the chunks are defined by the- byteOffsetsarray, and each item is separated by the given delimiter.- Arguments:
- f – the file to search 
- delim – the delimiter used to separate items in the file 
- byteOffsets – an array of byte offsets that divide the file into chunks 
 
- Returns:
- an array of length - byteOffsets.sizecontaining the number of items in the file before the start of each chunk. The last entry contains the total number of items in the file.
 
- class OffsetNotFoundError : Error
- An error thrown when a starting offset cannot be found in a chunk of a file. - proc init()
 - proc init(msg: string)
 
- record headerPolicy
- A type describing how to handle the file header when reading a file in parallel. 
- proc type headerPolicy.skipLines(n: int)
- Skip the first - nlines of the file
- proc type headerPolicy.skipBytes(n: int)
- Skip the first - nbytes of the file
- proc type headerPolicy.findStart
- Find the first byte offset in the file that can be used to start reading 
- proc type headerPolicy.noHeader
- Don’t expect a header in the file