DistributedBag

Usage

use DistributedBag;

or

import DistributedBag;

Implements a highly parallel segmented multi-pool specialized for depth-first search (DFS), sometimes referenced as DistBag_DFS.

Summary

A parallel-safe distributed multi-pool implementation specialized for depth-first search (DFS), that scales in terms of nodes, processors per node (PPN), and workload; the more PPN, the more segments we allocate to increase raw parallelism, and the larger the workload the better locality (see distributedBagInitialSegmentCap). This data structure is locally ordered (ensuring DFS), encapsulates a dynamic work stealing mechanism to balance work across nodes, and provides a means to obtain a privatized instance of the data structure for maximized performance.

Note

This module is a work in progress and may change in future releases.

Usage

To use distBag, the initializer must be invoked explicitly to properly initialize the data structure. By default, one bag instance is initialized per locale, and one segment per task.

var bag = new distBag(int, targetLocales=ourTargetLocales);

The basic methods that distBag supports require a taskId argument. This taskId will serve as an index to the segment to be updated and it must be in 0..<here.maskTaskPar. More precisely, it is used to map each task to a segment, which ensures the parallel-safety of the data structure, as well as the local DFS ordering.

bag.add(0, taskId);
bag.addBulk(1..100, taskId);
var (hasElt, elt) = bag.remove(taskId)

While the bag is safe to use in a distributed manner, each locale always operates on its privatized instance. This means that it is easy to add data in bulk, expecting it to be distributed, when in reality it is not; if another locale needs data, it will steal work on-demand. Here is an example of concurrent operations on distBag across multiple locales and tasks:

coforall loc in Locales do on loc {
  coforall taskId in 0..<here.maxTaskPar {
    var (hasElt, elt) = bag.remove(taskId);
    if hasElt {
      elt += 1;
      bag.add(elt, taskId);
    }
  }
}

Finally, distBag supports serial and parallel iteration, as well as a set of global operations. Here is an example of a distributed parallel iteration and a few global operations working with a distBag:

forall elts in bag do
  body();

const size = bag.getSize();
const foundElt = bag.contains(elt);
bag.clear();

Methods

config const distributedBagInitialSegmentCap : int = 1024

The initial capacity of each segment. When a segment is full, we double its capacity.

config const distributedBagMaxSegmentCap : int = 1024 * 1024

The maximum capacity of each segment. This is crucial to ensure memory usage does not grow out of control.

config const distributedBagWorkStealingMinElts : int = 1

The minimum number of elements a segment must have to become eligible to be stolen from. This may be useful if some segments contain fewer elements than others and should not be stolen from.

record distBag : serializable

A parallel-safe distributed multi-pool implementation specialized for depth-first search (DFS), that scales in terms of nodes, processors per node (PPN), and workload; the more PPN, the more segments we allocate to increase raw parallelism, and the larger the workload the better locality (see distributedBagInitialSegmentCap). This data structure is locally DFS ordered, encapsulates a dynamic work stealing mechanism to balance work across nodes, and provides a means to obtain a privatized instance of the data structure for maximized performance.

type eltType

The type of the elements contained in this distBag.

var _impl : unmanaged DistributedBagImpl(eltType)?

The implementation of the Bag is forwarded. See DistributedBagImpl for documentation.

class DistributedBagImpl : CollectionImpl(?)
var targetLocales : [targetLocDom] locale

The locales to allocate bags for and load balance across. targetLocDom represents the corresponding range of locales.

proc init(type eltType, targetLocales: [?targetLocDom] locale = Locales)

Initialize an empty distBag.

proc add(elt: eltType, taskId: int) : bool

Insert an element in segment taskId. The ordering is guaranteed to be preserved. If distributedBagMaxSegmentCap is reached, the insertion fails and returns false.

Arguments:
  • elt : eltType – The element to insert.

  • taskId : int – The index of the segment into which the element is inserted.

Returns:

true if elt is successfully inserted in segment taskId.

Return type:

bool

proc addBulk(elts, taskId: int) : int

Insert elements in bulk in segment taskId. If the bag instance rejects an element (e.g., when distributedBagMaxSegmentCap is reached), we cease to offer more. We return the number of elements successfully inserted.

Arguments:
  • elts – The elements to insert.

  • taskId : int – The index of the segment into which the element is inserted.

Returns:

The number of elements successfully inserted in segment taskId.

Return type:

int

proc remove(taskId: int) : (bool, eltType)

Remove an element from segment taskId. The order in which elements are removed is guaranteed to be the same order they have been inserted. If this bag instance is empty, it will attempt to steal elements from bags of other nodes.

Arguments:

taskId : int – The index of the segment from which the element is removed.

Returns:

Depending on the scenarios: (true, elt) if we successfully removed element elt from distBag; (false, defaultOf(eltType)) otherwise.

Return type:

(bool, eltType)

override proc getSize() : int

Obtain the number of elements held in this distBag.

Returns:

The current number of elements contained in this distBag.

Return type:

int

Warning

This method is best-effort and can be non-deterministic for concurrent updates across nodes, and may miss elements resulting from any concurrent insertion or removal operations.

override proc contains(elt: eltType) : bool

Perform a lookup to determine if the requested element exists in this distBag.

Arguments:

elt : eltType – An element to search for.

Returns:

true if this distBag contains elt.

Return type:

bool

Warning

This method is best-effort and can be non-deterministic for concurrent updates across nodes, and may miss elements resulting from any concurrent insertion or removal operations.

override proc clear() : void

Clear this distBag.

Warning

This method is best-effort and can be non-deterministic for concurrent updates across nodes, and may miss elements resulting from any concurrent insertion or removal operations.

override iter these() : eltType

Iterate over the elements of this distBag. To avoid holding onto locks, we take a snapshot approach, increasing memory consumption but also increasing parallelism. This allows other concurrent, even mutating, operations while iterating, but opens the possibility to iterating over duplicates or missing elements from concurrent operations.

Yields:

A reference to one of the elements contained in this distBag.

Warning

Iteration takes a snapshot approach, and as such can easily result in a Out-Of-Memory issue. If the data structure is large, the user is doubly advised to use parallel iteration, for both performance and memory benefit.