Concurrent Rust

March 22, 2023

 views

Coming from a primary JS/TS background, I've never had to think about concurrency since JS is single-threaded. That simplicity makes a lot of sense for the majority of JS developers who don't want think about the runtime/GC, they want to focus on the user experience and then performance.

Initial Implementation

Since I've been re-reading the Rust book, I've been re-implementing the different chapters against code I've written myself. For threads, I decided to add concurrency to a recent Advent of Code that I completed.

The solution to the day's part two problem came down to implementing a stack of chars and popping off the stack when the expected closing char was provided. My full solution can be found here.

My initial solution read through the input and created a Line struct which really just was a wrapping to the line so that I could functionally filter out the content I didn't want:

// sniped conent
impl Line {
    fn remove_invalid(&self) -> Option<Vec<char>> {
        let mut stack = Vec::new();
        for c in self.line.chars() {
            // Match on the current char and if the last char in the current stack is correct
            // pop it, otherwise return None and the Line will filtered out from the next step
            // below.
            match c {
                '>' => {
                    if stack.last().unwrap() == &'<' {
                        stack.pop();
                    } else {
                        return None;
                    }
                }
                '}' => {
                    if stack.last().unwrap() == &'{' {
                        stack.pop();
                    } else {
                        return None;
                    }
                }
                ')' => {
                    if stack.last().unwrap() == &'(' {
                        stack.pop();
                    } else {
                        return None;
                    }
                }
                ']' => {
                    if stack.last().unwrap() == &'[' {
                        stack.pop();
                    } else {
                        return None;
                    }
                }
                _ => stack.push(c),
            }
        }
        return Some(stack);
    }
}

fn part_two(path: &str) -> Result<usize> {
    let mut scores = read_one_at_a_time::<Line>(path)?
        .iter()
        .filter_map(|line| line.remove_invalid())
        // Once invalid lines are filtered out, calculate the lines score.
        .map(|vec| get_score(vec))
        .collect::<Vec<_>>();

    scores.sort();
    Ok(scores[scores.len() / 2])
}

fn main() -> Result<()> {
    println!("Part Two: {}", part_two("./data/10.data")?);

    Ok(())
}

I thought this was a good example (really most advent of codes follow this pattern of parsing lines) to try threading on, because for each line in my vec I'm doing a fair bit of work and that work is entirely independent of other lines' work until it comes time to find the actual answer to the problem (which is sorting the scores of all the lines and finding the middle value).

Lets Add Some Threads

So lets look at the threading solution:

fn threading(lines: Vec<Line>) -> Vec<usize> {
    // I create a vec that will be shared across the threads to hold all the 
    // lines' scores.
    let total = Arc::new(Mutex::new(Vec::new()));
    let mut handles = Vec::new();
    for line in lines {
        let total = Arc::clone(&total);
        let handle = thread::spawn(move || {
            // If the current line is valid, get the mutex's lock and push the 
            // Line's score the totals vec.
            if let Some(l) = line.remove_invalid() {
                let mut scores = total.lock().unwrap();
                scores.push(get_score(l));
            };
        });
        handles.push(handle);
    }

    // Collecting the threads into a vector allows me to join each of them so
    // that I can ensure each thread is allowed to finish before the main thread
    // terminates.
    for handle in handles {
        handle.join().unwrap();
    }

    let total = total.lock().unwrap();
    total.to_vec()
}

fn part_two(path: &str) -> Result<usize> {
    let scores = read_one_at_a_time::<Line>(path)?;
    let mut scores = threading(scores);

    scores.sort();

    Ok(scores[scores.len() / 2])
}

The interesting part here is that instead of functionally iterating through each Line, I'm passing my vec of Lines to the threading function, which for each Line creates a thread to calculate the score and adds it to the ultimate vec of scores that is contained in an Arc.

This threading version adds some complexity to the solution, but optimizes the code. Or does it?

In this simple example, it doesn't:

You can see the no_thread run was maybe 2ms faster on average even with an outlier. The optimization really didn't matter in this context. (The input file has about 100 Lines.)

Optimization in Action

So I decided to add some 'sleeps' if you will to simulate 'what if the computation to calculate the Line score was actually higher?'

In the threading example I added a full second of sleep to each thread:

// ...
    for line in lines {
        let total = Arc::clone(&total);
        let handle = thread::spawn(move || {

            // THIS LINE HERE.
            thread::sleep(Duration::from_secs(1));

            if let Some(l) = line.remove_invalid() {
                let mut scores = total.lock().unwrap();
                scores.push(get_score(l));
            };
        });
        handles.push(handle);
    }
// ...

In the single threaded solution I added 100ms of sleep to each calculation:

// ...
    fn remove_invalid(&self) -> Option<Vec<char>> {
        std::thread::sleep(Duration::from_millis(100));
        let mut stack = Vec::new();
        for c in self.line.chars() {
            match c {
// ...

So the threaded version is ostensibly now waiting 10x more per line. How much will the threads actually optimize the code here?

SIGNIFICANTLY!

Even though the thread sleeps a whole second, because it's sleeping in parallel, it's still ~10x faster than the single threaded solution which only waits 100ms per Line.

I think that's a really awesome result that ensures safety (the Arc/ Mutex) and performance that isn't overly complicated to implement or grok.