.. default-domain:: chpl .. module:: ParallelIO :synopsis: Helper procedures for doing parallel I/O ParallelIO ========== **Usage** .. code-block:: chapel use ParallelIO; or .. code-block:: chapel import ParallelIO; .. warning:: the 'parallelIO' module is unstable and subject to change in a future release Helper procedures for doing parallel I/O This module provides a few procedures for reading a file's contents into a distributed array in parallel. The procedures are designed to be used for cases where a large file contains a header followed by a continuous stream of delimited values of the same type. The procedures are: * :proc:`readLinesAsBlockArray`: read each of the lines of a file as a ``string`` or ``bytes`` value * :proc:`readDelimitedAsBlockArray`: read a file where each value is strictly separated by a delimiter, and the delimiter cannot be found in the value (e.g., CSV) * :proc:`readItemsAsBlockArray`: read a file where values are separated by a delimiter, but the delimiter can be found in the value There are also non-distributed versions of these procedures that return a default rectangular array instead of a block-distributed array. These tend to be a faster option if the file is small enough to fit in memory on a single locale. Two parallel iterators are also provided: * :proc:`readLines`: iterate over a file's lines in parallel * :proc:`readDelimited`: iterate over the values of a delimited file in parallel Both iterators only work in a standalone context (i.e., they cannot be used for zippered iteration). Adding leader/follower support is a future goal. This module also exposes some helper procedures used to break files into chunks. These could be used as building blocks to implement other parallel I/O routines: * :proc:`findDelimChunks`: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter * :proc:`findItemOffsets`: get a prefix sum of the number of items in each chunk of a file, where the chunks are defined by the ``byteOffsets`` array, and each item is strictly separated by the given delimiter * :proc:`findDelimChunksChecked`: find a set of byte offsets that divide a file into roughly equal chunks where each chunk begins with a delimiter and each chunk starts with a deserializable value of the given type .. iterfunction:: iter readLines(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, targetLocales: [] locale = [here]): lineType where lineType == string || lineType == bytes Iterate over a file's lines in parallel. This routine is similar to :proc:`readLinesAsArray`, except that it yields each line as it is read instead of returning an array of lines. **Example**: .. code-block:: chapel use ParallelIO; var sum = 0; forall line in readLines("ints.txt") with (+ reduce sum) do sum += line:int; .. warning:: This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release. .. note:: Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context. :arg filePath: the file to read :arg lineType: which type to return for each line: either ``string`` or ``bytes`` — defaults to ``string`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg nTasks: the number of tasks used to read the file — defaults to ``here.maxTaskPar`` :arg targetLocales: the locales to use for reading the file — by default, only the calling locale is used .. iterfunction:: iter readDelimited(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer, targetLocales: [] locale = [here]): t where dt == string || dt == bytes Iterate over the values of a delimited file in parallel. This routine is similar to :proc:`readDelimitedAsArray`, except that it yields each value as it is read instead of returning an array of values. **Example**: .. code-block:: chapel use IO, ParallelIO; record color { var r, g, b: uint(8); proc ref deserialize(reader: fileReader(?), ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } } forall c in readDelimited("colors.csv", color, header=headerPolicy.skipLines(1)) do processColor(c); .. warning:: This routine will halt if the file cannot be opened or if an I/O error occurs while reading the file. This limitation is expected to be removed in a future release. .. note:: Only serial and standalone-parallel iteration is supported. This iterator cannot yet be used in a zippered context. :arg filePath: the file to read :arg t: the type of value to read from the file :arg delim: the delimiter to use to separate ``t`` values in the file — defaults to the newline character :arg header: how to handle the file header (see :record:`headerPolicy`) :arg nTasks: the number of tasks used to read the file — defaults to ``here.maxTaskPar`` :arg deserializerType: The type of deserializer to use when reading values — defaults to the I/O module's default deserializer :arg targetLocales: the locales to use for reading the file — by default, only the calling locale is used .. function:: proc readLinesAsBlockArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = -1, targetLocales: [?d] locale = Locales): [] lineType throws where lineType == string || lineType == bytes Read a file's lines in parallel into a block-distributed array. This routine is similar to :proc:`readDelimitedAsBlockArray`, except that it reads each line as a :type:`~String.string` or :type:`~Bytes.bytes` value. :arg filePath: the file to read :arg lineType: the element type of the returned array: either ``string`` or ``bytes`` — defaults to ``string`` :arg nTasks: the number of tasks to use per locale — defaults to ``-1``, meaning each locale should query ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg targetLocales: the locales to read the file on and the target locales for the returned block-distributed array :returns: a block-distributed array of ``lineType`` values :throws: :class:`OffsetNotFoundError` if a starting offset cannot be found in any of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting to open the file .. function:: proc readLinesAsArray(filePath: string, type lineType = string, header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar): [] lineType throws where lineType == string || lineType == bytes Read a file's lines in parallel into an array. This routine is essentially the same as :proc:`readLinesAsBlockArray`, except that it only executes on the calling locale. As such, it does not accept a ``targetLocales`` argument and returns a non-distributed array. :arg filePath: the file to read :arg lineType: the element type of the returned array: either ``string`` or ``bytes`` — defaults to ``string`` :arg nTasks: the number of tasks to use — defaults to ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :returns: a default rectangular array of ``lineType`` values :throws: :class:`OffsetNotFoundError` if a starting offset cannot be found in one or more of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting .. function:: proc readDelimitedAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales): [] t throws where d.rank == 1 && (dt == bytes || dt == string) Read a delimited file in parallel into a block-distributed array. This routine assumes that the file is composed of a series of deserializable values of type ``t`` (optionally with a header at the beginning of the file). Each ``t`` must be separated by exactly one delimiter which can either be provided as a ``string`` or ``bytes`` value. This routine will use the delimiter to split the file into ``d.size`` chunks of roughly equal size and read each chunk concurrently across the target locales. If multiple tasks are used per locale, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. The chunks and corresponding array indices are computed using :proc:`findDelimChunks` and :proc:`findItemOffsets` respectively. .. note:: ``t`` must: * have a 'deserialize method' * have a default (zero argument) initializer * not contain the delimiter in its serialized form (if it does, consider using :proc:`readItemsAsBlockArray` instead) This procedure can be used for a variety of purposes, such as reading a CSV file. To do so, the delimiter should keep its default value of ``b"\n"``. The file will then be split by lines, where each line will be parsed as a ``t`` value. For CSV, the commas between ``t``'s fields must be parsed by it's ``deserialize`` method. This can be accomplished in one of two ways: (1) by using a custom deserialize method that parses the comma values manually (like in the example below), or (2) by using a deserializer that will handle commas appropriately with ``t``'s default ``deserialize`` method. **Example:** .. code-block:: chapel use IO, ParallelIO; record color { var r, g, b: uint(8); } proc ref color.deserialize(reader, ref deserializer) throws { reader.read(this.r); reader.readLiteral(b","); reader.read(this.g); reader.readLiteral(b","); reader.read(this.b); } var colors = readDelimitedAsBlockArray( "colors.csv", t=color, header=headerPolicy.skipLines(1) ); :arg filePath: the file to read :arg t: the type of value to read from the file :arg delim: the delimiter to use to separate ``t`` values in the file — defaults to the newline character :arg nTasks: the number of tasks to use per locale — defaults to ``-1``, meaning each locale should query ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg deserializerType: the type of deserializer to use — defaults to the I/O module's default deserializer :arg targetLocales: the locales to read the file on and the target locales for the returned block-distributed array :returns: a block-distributed array of ``t`` values :throws: :class:`OffsetNotFoundError` if a starting offset cannot be found in one or more of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting to open the file .. function:: proc readDelimitedAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer): [] t throws where dt == bytes || dt == string Read a delimited file in parallel into an array. This procedure is essentially the same as :proc:`readDelimitedAsBlockArray`, except that it only executes on the calling locale. As such, it does not accept a ``targetLocales`` argument and returns a non-distributed array. :arg filePath: the file to read :arg t: the type of value to read from the file :arg delim: the delimiter to use to separate ``t`` values in the file — defaults to the newline character :arg nTasks: the number of tasks to use — defaults to ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg deserializerType: the type of deserializer to use — defaults to the I/O module's default deserializer :returns: a default rectangular array of ``t`` values :throws: :class:`OffsetNotFoundError` if a starting offset cannot be found in one or more of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting to open the file .. function:: proc readItemsAsBlockArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = -1, type deserializerType = defaultDeserializer, targetLocales: [?d] locale = Locales): [] t throws where d.rank == 1 && (dt == bytes || dt == string) Read items from a file in parallel into a block-distributed array. This routine assumes that the file is composed of a series of deserializable values of type ``t`` (optionally with a header at the beginning of the file). Each ``t`` must be separated by a delimiter which can either be provided as a ``string`` or ``bytes`` value. Unlike :proc:`readDelimitedAsBlockArray` the delimiter can also be found in the serialized form of ``t``. This routine uses the following heuristic to split the file into chunks, which may not be accurate in all cases: A given byte offset is a valid offset for a task to start deserializing values of type ``t`` if: * it is preceded by, or begins with the delimiter * a ``t`` can be deserialized at that offset (i.e., calling ``t.deserialize`` on the bytes starting at that offset does not throw an error) The heuristic, implemented in :proc:`findDelimChunksChecked`, will be used to split the file in ``d.size`` chunks with a roughly equal number of items per chunk. If multiple tasks per locale are used, each locale will further decompose its chunk into smaller chunks and read each of those in parallel. .. note:: ``t`` must: * have a 'deserialize' method that throws when a valid ``t`` cannot be read * have a default (zero argument) initializer :arg filePath: the file to read :arg t: the type of value to read from the file :arg delim: the delimiter used to guide file chunking - defaults to the newline character :arg nTasks: the number of tasks to use per locale — defaults to ``-1``, meaning each locale should query ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg deserializerType: the type of deserializer to use — defaults to the I/O module's default deserializer :arg targetLocales: the locales to read the file on and the target locales for the returned block-distributed array :returns: a block-distributed array of ``t`` values :throws: :class:`OffsetNotFoundError` if a valid byte offset cannot be found in one or more of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting to open the file .. function:: proc readItemsAsArray(filePath: string, type t, in delim: ?dt = b"\n", header = headerPolicy.noHeader, nTasks: int = here.maxTaskPar, type deserializerType = defaultDeserializer): [] t throws where dt == bytes || dt == string Read items from a file in parallel into an array. This procedure is essentially the same as :proc:`readItemsAsBlockArray`, except that it only executes on the calling locale. As such, it does not accept a ``targetLocales`` argument and returns a non-distributed array. :arg filePath: the file to read :arg t: the type of value to read from the file :arg delim: the delimiter used to guide file chunking - defaults to the newline character :arg nTasks: the number of tasks to use — defaults to ``here.maxTaskPar`` :arg header: how to handle the file header (see :record:`headerPolicy`) :arg deserializerType: the type of deserializer to use — defaults to the I/O module's default deserializer :returns: a default rectangular array of ``t`` values :throws: :class:`OffsetNotFoundError` if a valid byte offset cannot be found in one or more of the chunks See :proc:`~IO.open` for other errors that could be thrown when attempting to open the file .. function:: proc findDelimChunks(const ref f: file, in delim: ?dt, n: int, bounds: range, header = headerPolicy.noHeader): [] int throws where dt == bytes || dt == string Get an array of ``n+1`` byte offsets that divide the file ``f`` into ``n`` roughly equally sized chunks, where each byte offset lines up with a delimiter. :arg f: the file to search :arg delim: the delimiter to use to separate the file into chunks :arg n: the number of chunks to find :arg bounds: a range of byte offsets to break into chunks :arg header: a header policy to use when searching for the first byte offset :returns: a length ``n+1`` array of byte offsets (the last offset is ``bounds.high``) :throws: :class:`OffsetNotFoundError` if a valid byte offset cannot be found in any of the chunks .. function:: proc findDelimChunksChecked(const ref f: file, in delim: ?dt, n: int, type t, bounds: range, header = headerPolicy.noHeader, type deserializerType = defaultDeserializer): [] int throws where dt == bytes || dt == string Get an array of ``n+1`` byte offsets that divide the file ``f`` into ``n`` roughly equally sized chunks, where each byte offset lines up with a delimiter and a ``t`` can be deserialized at that offset. This procedure is similar to :proc:`findDelimChunks`, except that when it finds a delimiter, it confirms that a ``t`` can be deserialized at that offset before recording it. This way, the serialized values can also contain the delimiter. :arg f: the file to search :arg delim: the delimiter to use to separate the file into chunks :arg n: the number of chunks to find :arg t: the type of value to read from the file :arg bounds: a range of byte offsets to break into chunks :arg header: a header policy to use when searching for the first byte offset :arg deserializerType: the type of deserializer to use :returns: a length ``n+1`` array of byte offsets (the last offset is ``bounds.high``) :throws: :class:`OffsetNotFoundError` if a valid byte offset cannot be found in any of the chunks .. function:: proc findItemOffsets(const ref f: file, in delim: ?dt, const ref byteOffsets: [?d] int): [d] int throws where dt == bytes || dt == string Get a prefix sum of the number of items in each chunk of the file ``f``, where the chunks are defined by the ``byteOffsets`` array, and each item is separated by the given delimiter. :arg f: the file to search :arg delim: the delimiter used to separate items in the file :arg byteOffsets: an array of byte offsets that divide the file into chunks :returns: an array of length ``byteOffsets.size`` containing the number of items in the file before the start of each chunk. The last entry contains the total number of items in the file. .. class:: OffsetNotFoundError : Error An error thrown when a starting offset cannot be found in a chunk of a file. .. method:: proc init() .. method:: proc init(msg: string) .. record:: headerPolicy A type describing how to handle the file header when reading a file in parallel. .. method:: proc type headerPolicy.skipLines(n: int) Skip the first ``n`` lines of the file .. method:: proc type headerPolicy.skipBytes(n: int) Skip the first ``n`` bytes of the file .. method:: proc type headerPolicy.findStart Find the first byte offset in the file that can be used to start reading .. method:: proc type headerPolicy.noHeader Don't expect a header in the file