A parallel-safe distributed multiset implementation that scales in terms of
nodes, processors per node (PPN), and workload; The more PPN, the more segments
we allocate to increase raw parallelism, and the larger the workload the better
distributedBagInitialBlockSize). This data structure is unordered
and employs its own work stealing algorithm to balance work across nodes.
This package module is new in 1.16 and may contain bugs. The interface may change. The documentation is being incrementally revised and improved.
DistBag, the constructor must be invoked explicitly to
properly initialize the structure. Using the default state without initializing
will result in a halt.
var bag = new DistBag(int, targetLocales=ourTargetLocales);
While the bag is safe to use in a distributed manner, each node always operates on it's privatized
instance. This means that it is easy to add data in bulk, expecting it to be distributed, when in
reality it is not; if another node needs data, it will steal work on-demand. This may not always be
desired, and likely will more memory consumption on a single node. We offer a way for the user to
invoke a more static load balancing approach, called
balance, which will redistributed work.
- Dynamic work-stealing will require an overhaul to use a helper algorithm to keep down the number of tasks spawned. Currently user tasks will wait on the current work-stealer task, which will spawn is own helper tasks which act as shepherds, which then spawns more in a fork-join fashion. This leads to an excessive amount of tasks being spawned at once. To make matters worse, the waiting tasks don't even get any elements, nor does the work stealing task, which opens up the possibility of live-lock where nodes steal work back and forth before either can process it.
- Static work-stealing (A.K.A
balance) requires a rework that performs a more distributed and fast way of distributing memory, as currently 'excess' elements are shifted to a single node to be redistributed in the next pass. On the note, we need to collapse the pass for moving excess elements into a single pass, hopefully with a zero-copy overhead.
The initial amount of elements in an unroll block. Each successive unroll block is double the size of it's predecessor, allowing for better locality for when there are larger numbers of elements. The better the locality, the better raw performance and easier it is to redistribute work.
To prevent stealing too many elements (horizontally) from another node's segment (hence creating an artificial load imbalance), if the other node's segment has less than a certain threshold (see
distributedBagWorkStealingMemCap) but above another threshold (see
distributedBagWorkStealingMinElems), we steal a percentage of their elements, leaving them with majority of their elements. This way, the amount the other segment loses is proportional to how much it owns, ensuring a balance.
distributedBagWorkStealingMemCap: real = 1.0¶
The maximum amount of work to steal from a horizontal node's segment. This should be set to a value, in megabytes, that determines the maximum amount of data that should be sent in bulk at once. The maximum number of elements is determined by: (
distributedBagWorkStealingMemCap* 1024 * 1024) / sizeof(
eltType). For example, if we are storing 8-byte integers and have a 1MB limit, we would have a maximum of 125,000 elements stolen at once.
The minimum number of elements a horizontal segment must have to become eligible to be stolen from. This may be useful if some segments produce less elements than others and should not be stolen from.
The maximum amount of elements in an unroll block. This is crucial to ensure memory usage does not rapidly grow out of control.
A parallel-safe distributed multiset implementation that scales in terms of nodes, processors per node (PPN), and workload; The more PPN, the more segments we allocate to increase raw parallelism, and the larger the workload the better locality (see
distributedBagInitialBlockSize). This data structure is unordered and employs its own work-stealing algorithm, and provides a means to obtain a privatized instance of the data structure for maximized performance.
The implementation of the Bag is forwarded. See
targetLocales: [targetLocDom] locale¶
The locales to allocate bags for and load balance across.
DistributedBagImpl(type eltType, targetLocales: [?targetLocDom] locale = Locales)¶
add(elt: eltType): bool¶
Insert an element to this node's bag. The ordering is not guaranteed to be preserved.
remove(): (bool, eltType)¶
Remove an element from this node's bag. The order in which elements are removed are not guaranteed to be the same order it has been inserted. If this node's bag is empty, it will attempt to steal elements from bags of other nodes.
Obtain the number of elements held in all bags across all nodes. This method is best-effort and can be non-deterministic for concurrent updates across nodes, and may miss elements or even count duplicates resulting from any concurrent insertion or removal operations.
contains(elt: eltType): bool¶
Performs a lookup to determine if the requested element exists in this bag. This method is best-effort and can be non-deterministic for concurrent updates across nodes, and may miss elements resulting from any concurrent insertion or removal operations.
Clear all bags across all nodes in a best-effort approach. Elements added or moved around from concurrent additions or removals may be missed while clearing.
Triggers a more static approach to load balancing, fairly redistributing all elements fairly for bags across nodes. The result will result in all segments having roughly the same amount of elements.
This method is very heavy-weight in that it should not be called too often. Dynamic work stealing handles cases where there is a relatively fair distribution across majority of nodes, but this should be called when you have a severe imbalance, or when you have a smaller number of elements to balance. Furthermore, while this operation is parallel-safe, it should be called from at most one task.
Iterate over each bag in each node. To avoid holding onto locks, we take a snapshot approach, increasing memory consumption but also increasing parallelism. This allows other concurrent, even mutating, operations while iterating, but opens the possibility to iterating over duplicates or missing elements from concurrent operations.
zip iteration is not yet supported with rectangular data structures.
Iteration takes a snapshot approach, and as such can easily result in a Out-Of-Memory issue. If the data structure is large, the user is doubly advised to use parallel iteration, for both performance and memory benefit.
these(param tag: iterKind)
these(param tag: iterKind, followThis)