The need to analyze large datasets shows up everywhere: from scientific research to insurance risk analysis to modeling in healthcare, market research, economics, finance, and elsewhere. Regardless of the field, a common task is to scan through a large dataset, often stored in something like a CSV file, and process it — for example, by finding averages, running sums, etc. This type of computation has shown up recently in a viral coding competition called the “One Billion Row Challenge” (1BRC). In this blog post, we focus on creating a straightforward, yet parallel, implementation of the 1BRC in Chapel to execute on a multicore machine.

The goal of the 1BRC is to read data from a file that (aptly enough) contains one billion rows of temperature data, consisting of pairs of weather station names and temperature values. As the data is read in, it is aggregated to find the min, max, and average temperatures for each weather station.

On the official GitHub page, the challenge has contestants submit Java-based implementations. These implementations are evaluated in terms of their execution times, but to remove the cost of I/O overhead, the data file is first loaded onto a RAM disk before starting the timer. Competitive implementations are able to complete the task in less than 2 seconds, and do so by pulling every trick in the book: memory-mapping data, avoiding all unnecessary string copies, writing custom hashmaps, writing custom number parser functions with built-in assumptions that values have no more than three digits, using Java’s “Unsafe” API, etc. These implementations are clever and fascinating from a competitive-programming perspective. They demonstrate that it is possible to write low-level “bare-metal” style code in Java. That said, they’re less interesting from a more practical perspective; namely, they’re difficult to read and don’t demonstrate how easy and performant it would be to use a given language to conduct this sort of analysis in general.

So, rather than approaching this strictly from the competitive-programming perspective, this blog post tries to stay practical. I/O overhead is, of course, a big concern in the “real world,” so we’ll consider it. Let’s start by looking at a straightforward Python implementation of the 1BRC: a version that is simple to implement and easy to understand. Following that, we give a simple Chapel version, and then adapt it using Chapel’s parallel programming features to eliminate overhead and improve performance.

But before we get into code, let’s first discuss a bit more about the input format. The challenge’s input file consists of a billion rows of data, where each row is in the format:

<string: station name>;<double: measurement>

For example, the first few lines of the file might look like this:

Tel Aviv;15.6
Hong Kong;51.4
Dikson;-15.4
London;-4.0
Entebbe;7.6
Hong Kong;51.4
Port Moresby;16.2
Assab;35.6
Lusaka;26.6

Station names may repeat. The task is to calculate the minimum, maximum, and average temperature value for each weather station. The output should sort the station names and report these values, looking something like this (where for each station, the first number is the minimum temperature, the second the average, and the third the maximum):

       Abha:   -22.1    18.0    56.3
    Abidjan:   -17.3    26.0    67.8
     Abéché:    -7.9    29.4    71.9
      Accra:   -18.5    26.4    63.0
Addis Ababa:   -23.2    16.1    58.4

Writing a straightforward Python version

If someone were given this task for a work or school assignment, they might start by writing a non-performant, albeit simple serial version of the 1BRC. Many programmers are familiar with Python, so let’s start there. In Python, you might write this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time, csv, sys

filename = 'measurements.txt'

start_time = time.time()

mins   = {}
maxes  = {}
sums   = {}
counts = {}

with open(filename, newline='') as csvfile:
  reader = csv.reader(csvfile, delimiter=';')
  for row in reader:
    city_name = row[0]
    temperature = float(row[1])

    if not city_name in mins:
      mins [city_name]  = sys.float_info.max
      maxes[city_name]  = sys.float_info.min
      sums [city_name]  = 0
      counts[city_name] = 0

    mins [city_name]  =  min(temperature, mins[city_name])
    maxes[city_name]  =  max(temperature, maxes[city_name])
    sums [city_name]  += temperature
    counts[city_name] += 1

elapsed = time.time() - start_time

# Report results:
for city in sorted(mins.keys()):
  print("%20s: %7.1f %7.1f %7.1f" %
        (city, mins[city], sums[city] / counts[city], maxes[city]))
print(f"elapsed time: {elapsed:.2f}")

This program is simple and easy-to-read. It uses Python’s CSV parser to read the data, changing the separator from a comma to a semi-colon. We store running results into dictionaries with the keys being the names of temperature stations and the values the current value. The first time we encounter a city we populate the dictionary with “identity” values that will be replaced when we process the city’s temperature. To calculate averages, we need to keep a running sum of temperatures as well as a count of how many entries exist for a given weather station. At the end, we sort and print out our results with a calculated average.

Writing a serial Chapel version

Now, let’s try doing the same in Chapel:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use IO, Time, Map, Sort;

config const filename = "measurements.txt";

var mins, maxes, sums, counts: map(bytes, real);
var watch: stopwatch;

watch.start();

var reader = open(filename, ioMode.r).reader(locking=false);
var ct: cityTemp;
while reader.read(ct) {
  const (city, temp) = (ct.city, ct.temp);
  if !mins.contains(city) {
    mins[city]   = max(real);
    maxes[city]  = min(real);
    sums[city]   = 0;
    counts[city] = 0;
  }
  mins[city]   =  min(temp, mins[city]);
  maxes[city]  =  max(temp, maxes[city]);
  sums[city]   += temp;
  counts[city] += 1;
}

watch.stop();

// Report results
var cities = mins.keys();
sort(cities);
for city in cities {
  writef("%20s: %7.1dr %7.1dr %7.1dr\n",
         city.decode(), mins[city], sums[city] / counts[city],  maxes[city]);
}
writeln("elapsed time: ", watch.elapsed());

record cityTemp: readDeserializable {
  var city: bytes;
  var temp: real;

  proc ref deserialize(reader: fileReader(?), ref deserializer) throws {
    this.city = reader.readThrough(b";", stripSeparator=true);
    this.temp = reader.read(real);
    reader.readNewline();
  }
}

The Chapel implementation isn’t quite as compact as the Python version, though it is still fairly straightforward.

One notable difference is that in the Chapel version, we represent the data read in from the file in a special cityTemp record. We supply this record with a deserialize method that specifies how to read the value from a file. For more information about deserializers see this technical note.

When running the Python and Chapel versions, we can already see a difference in terms of performance. On a 64-core (AMD EPYC 7513) machine, the Python version takes 1312 seconds (21 minutes, 52 seconds) while the serial Chapel version takes 908 seconds (15 minutes, 8 seconds). Both versions are running in serial, though, so despite running this on a beefy 64-core machine, we’re only using one of its cores.

Writing a parallel Chapel version

Now let’s try and make better use of our hardware and make the Chapel version faster by introducing parallelism.

Given the large size of the input file, it makes sense to read and process its data in parallel. We would like each task to process whole, unsplit, lines of data, but figuring out the exact byte-offset to split the file up can be difficult. Thankfully, Chapel has a ParallelIO module that can handle these details for you.

Using the ParallelIO module, one simple approach would be to read all the rows of the file in as an array, and then use a forall loop to iterate through that array in parallel. This can be quite easily expressed as:

var cityTemps = readItemsAsArray(filePath=filename, delim="\n", t=cityTemp);
forall ct in cityTemps do ...

An even better approach would be to avoid loading the entire file into memory by processing it a line at a time, and this is possible using the ParallelIO module’s readDelimited iterator.

So, we could replace our original loop:

while reader.read(ct) {

with:

forall ct in readDelimited(filename, t=cityTemp) {

where the t argument specifies the type of data to load. Note we’re not just changing from using the serial read method to the parallel readDelimited, we’re also changing from using a serial while loop to the parallel forall loop. This approach will keep all our cores busy reading and processing the file.

By itself, this is a pretty trivial change; but alas, if you try and compile it, you’ll soon run into an error:

error: const actual is passed to 'ref' formal 'this' of addOrReplace()
note: to formal of type map(bytes,real(64),false)
note: The shadow variable 'mins' is constant due to task intents in this loop

The final note on the error is telling you that the mins map is constant within the loop due to task intents. In parallel loops (like forall), variables used in the loop that come from outside the loop (like mins in this case) are given a “task intent”, which defines how the variable enters and can be used in the loop. For example, variables with ref intent can be modified in the loop while variables with const intent cannot. By default, objects like maps are given const intent, since modifying the map in a parallel loop can lead to bugs.

You might find this error annoying, but by defaulting to const intent, Chapel saved us from accidentally introducing a potential bug, and not only that, one of the worst kinds of bugs: a race condition. Race conditions can be particularly difficult to debug since, by definition, their behavior might not be consistent from run to run. We could silence the error message by giving the map a ref intent using a with clause like this:

forall ct in readDelimited(filename, t=cityTemp) with (ref mins)

But, unfortunately, silencing the error message doesn’t remove that nasty race condition. So what to do? Well, Chapel has another map datatype that’s meant for concurrent updates like this: the ConcurrentMap type, so let’s use that!

The way to do a parallel-safe update with ConcurrentMap is to use its update method. In our implementation, we’ll call update like this:

cityTempStats.update(ct.city, new updater(ct.temp), token);

In this case, ct.city is the key to update in the cityTempStats map. But, you may be asking, what’s up with the updater and token arguments?

The updater argument takes an object that indicates how to update the key. This object defines an updater method named this that takes a single argument of the element type to update with. We can create a simple updater to do our aggregation like this:

record updater {
  var temp: real;
  proc this(ref td: tempData) {
    td.min = Math.min(td.min, temp);
    td.max = Math.max(td.max, temp);
    td.total += temp;
    td.count += 1;
  }
}

And since our updater is a function that can update multiple values, for this version, rather than have four separate maps for min, max, total, and count, we’ll instead store these values together in a single tempData record and use a single ConcurrentMap object.

The update method also takes a token, which is a special value used to coordinate between tasks. We can get the task-specific instance of this token by calling the getToken() method on ConcurrentMap and assigning it to a task-specific variable in the with clause of our forall loop. Putting it all together looks like this:

forall ct in readDelimited(fileName, t=cityTemp, delim="\n", nTasks=nTasks)
    with (var token = cityTempStats.getToken())
        do cityTempStats.update(ct.city, new updater(ct.temp), token);

And in its entirety, the program is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use IO, Math, ParallelIO, ConcurrentMap, Sort, Time;

config const fileName = "measurements.txt",
             nTasks = here.maxTaskPar,
             printOutput = false,
             timeExecution = false;

proc main() {
  var t = new stopwatch();
  if timeExecution then t.start();

  // map to store temperature statistics for each city
  var cityTempStats = new ConcurrentMap(bytes, tempData);

  // read and aggregate temperature data for each city
  forall ct in readDelimited(fileName, t=cityTemp, delim="\n", nTasks=nTasks)
    with (var token = cityTempStats.getToken())
      do cityTempStats.update(ct.city, new updater(ct.temp), token);

  if timeExecution then writeln("elapsed time: ", t.elapsed());

  // print temperature statistics for each city
  if printOutput {
    var results = cityTempStats.toArray();
    sort(results, new comparator());
    for (city, td) in results do writef("%20s: %?", city.decode().strip(), td);
  }
}

// record to store temperature stats for a particular city
record tempData: writeSerializable {
  var min: real = Types.max(real);
  var max: real = Types.min(real);
  var total: real = 0.0;
  var count: int = 0;
}

// how to write a tempData record to a file (min/avg/max)
proc tempData.serialize(writer: fileWriter(?), ref serializer) throws {
  writer.writef("%7.1dr %7.1dr %7.1dr\n",
                this.min, this.total / this.count, this.max);
}

// record representing a <city>;<temperature> pair
record cityTemp: readDeserializable {
  var city: bytes;
  var temp: real;
}

// how to read a cityTemp record from a file
proc ref cityTemp.deserialize(reader: fileReader(?), ref deserializer) throws {
  this.city = reader.readThrough(b";", stripSeparator=true);
  this.temp = reader.read(real);
}

// used to facilitate atomic updates to tempData records
record updater {
  var temp: real;
  proc this(ref td: tempData) {
    td.min = Math.min(td.min, temp);
    td.max = Math.max(td.max, temp);
    td.total += temp;
    td.count += 1;
  }
}

// used to sort tempData records by city name
record comparator : keyComparator { }
proc comparator.key(k: (bytes, tempData)) do return k[0];

/*

### Updates to this article

{{< changetable >}}
| Date         | Change                                                      |
|:-------------|:------------------------------------------------------------|
| Sep 27, 2024 | Updated to reflect custom `comparator`-related changes in Chapel 2.2 |
*/

And sure enough, this version blows all our previous results out of the water. While the Python version took 1312 seconds (21 minutes, 52 seconds), and the serial Chapel version took 908 seconds (15 minutes, 8 seconds), when running the parallel Chapel version on all 64 cores of this machine, it took only 24 seconds!

Summary

In this post, we used the billion-row challenge to demonstrate Chapel’s ability to process a massive amount of data on a multicore machine in parallel.

Some key points to take away include:

Given that Chapel supports distributed-memory parallelism in addition to the shared-memory, multicore parallelism that we used here, it’d be interesting to consider what a distributed version of this code would look like; but we’ll leave that question for another day.

Altogether, in this post, we focused on creating a straightforward, yet parallel, implementation of the one billion row challenge in Chapel. The resulting code executes in 24 seconds on a multicore CPU, which is orders of magnitude faster than a similarly straightforward Python implementation.