šŸ“° Exercism - Rust - Parallel Letter Frequency

Posted on August 22, 2022 by Myoungjin Jeon
Tags: rust, parallel, concurrency, channel

Brief Introductions (not original)

Count the frequency of letters stored in a list of &str. i.e:

pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize>

Moreover, we need depoly workers(thread) which is specified as worker_count. This requirement makes this task exciting and challenging as I have little exprience on concurrency programming. And original introduction also suggests:

Learn more about concurrency in Rust here: Concurrency

Which covers general knowledge about concurrency enough to understand to solve the task!

Working on concurrency

Breifely, we are using concurrent programming because to get

The task is about the former case. And the latter case is related to almost every programming related to dynamic I/O to interact with user or other service.

Now we should think about how to:

The first thing is most important here because if we donā€™t know how to communicate each other, we could not achieve common goal, counting whole string. So how to talk to each others?

How To Make Thread and How To Reap them

Oh, waitā€¦ I found that we need to talk about how to make thread(s) and harvest the threads. This is the first example on rust documentation

use std::thread;
use std::time::Duration;

fn main() {
    let _handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
        // note: there is no return value here: return value is not essential.
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

And the result:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

oh no! we should wait for our friends!

Does it look good? doesnā€™t is? But if we look carefully. we can find that some output is missing as the our spawned thread is supposed to print up to 9, but we only got 5 here.

hi number 5 from the spawned thread!

This is because main thread reach the end of block before the children thread finished its job.

We should have waited for our friend threads to see all the output.

// .. snip ..
    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    _handle.join().unwrap(); // wait until for the children thread to join to current thread.
}

What join() returns

Communication Between threads in rust

return value from thread

I could see many otherā€™s solution at exercism which is using return value from the each thread after submitting the my first solution. And I found the major pros and ā€“ maybe ā€“ cons from using the return value:

The latter sentence could be cons. letā€™s think about what kind of worst senario might look like:

let sample = &["very long string. abcdefghijklmnopqrstuvxyz...",
               "",
               "some more text", ];


// snip ..
let total = HashMap::new();

for handle in handles {
    our_counting_fold_fn(total, handle.join().unwrap());
}
// snip ..

So, clearly, the first employee need to work harder than other šŸ˜‚. And we can see if the the diffrence between the lengths of the strings is larger, the longer time will be wasted by last two cases and the main thread to wait only for first thread to finished the job. i.e: we lost the chance to summerize other results only to wait for first result.

So, if the main thread could not figure out how to choose right thread, the performance in total isnā€™t great: no benefit from the thread at all.

Shared memory

I found the sentence shown below from the original documentation.

access the same shared data. Consider this part of the slogan from the Go language documentation again: ā€œdo not communicate by sharing memory.ā€

However, Rust language would not say those in the same nuance because it seems that rust ownership will deal with shared-memory very well.

Management of mutexes can be incredibly tricky to get right, which is why so many people are enthusiastic about channels. However, thanks to Rustā€™s type system and ownership rules, you canā€™t get locking and unlocking wrong.

Sounds promising, I hope I can go further on this subject later some time.

By the way, Mutex is an abbreviation for mutual exclusion.

Nevetheless, my first impression on shared-memory in concurrency are:

And the execuses šŸ˜± why Iā€™d like to skip the method in this task are:

Message Passing via channel

letā€™s check outh why many people are enthusiastic about message passing.

The document of message passing(channel) starts with the sentence like below:

One increasingly popular approach to ensuring safe concurrency is message passing

We can safely assume that the message passing as a recommended method for communicating, canā€™t we?

There is one more thing we need to think about before going further to solve the task

ownership matters

The key difference between ā€œmessage passingā€ and ā€œshared-memoryā€ is the number of ownership. and message passing allows only one ownership at a time which gurantees the receiver could get the consistent result as it wonā€™t be allowed for sender to access the value after sending them. It may sounds tedious at first. What if the value could be touched after sending them? the answer is simple:

We cannot gurantee the what will happen

The unpredictable is most dangerous thing in programming world, nobody would want that unless we are throwing a dice.

I felt that rust documentation is quite logical. please read more on The original book.

How to make channel

one on one

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        // sending from the friend thread
        tx.send(val).unwrap();
    }); // return value is not quite useful here so it is in void context
        // no `let` statement or no following function(method call) after that.

    // receiving from the main thread
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

many speakers and only one listener

Now Iā€™d like to remind you that we have actually many workers who want to talk about their accomplishment with their works.

But if tx variable is moved to one thread, we cannot use it in another thread. This has something with the simple principle in rust after all. ownership.

So we need to clone them as much as we need. This process is relatively simple:

// .. snip ..
let (tx_main, rx_main) = mpsc::channel();
let mut handles = vec![];

// .. snip ..
(0..worker_count)
    .for_each(|_| {
        let tx = tx_main.clone(); // this is it. simple and easy

        let handle = thread::spawn(move || {
            // now tx is only available in this thread only
            // as its ownership is moved here.
        });
        handles.push(handle);
    });
// .. snip ..

my first collecting the partial results is not working

The example code in rust shows me how to read from receiver(rx_main). But unknown reason makes the following code runs infinitely.

    rx_main
        .into_iter()
        .fold(HashMap::new(), |mut acc, partial_acc| {
            partial_acc
                .iter()
                .for_each(|(&ch, &count)| match acc.entry(ch) {
                    Occupied(o) => *o.into_mut() += count,
                    Vacant(v) => {
                        v.insert(count);
                    }
                });
            acc
        })
}

So.. I had to drop the code which directly iterating from the receiver, I rather go with the following approach:

// note: the following code works only one main thread.
(0..worker_count) // use it for only numbering the iteration
    .for_each(|_id| {
        rx_main
            .recv() // like when I did for single read.
            .unwrap()
        //  .somefunc() ... and summerizing(folding) go around here
    });

I know this is not perfect reading because it only takes one value per thread. what if each thread talks more than one? It works so far, I apoligize that if it is not enough explanation. I will come back forthis if I found the reason and workaround for it.

Dividing the tasks

It is time to give the threads something to work with. we need to divide our job into a certain amount of smaller pieces so that you can hand them over the our workers(threads).

Weā€™ll get the fixed amount of input data, so I decided:

// example: &[&str] -> Vec<Vec<String>>  with two workers
&[ "abc", "def", "ghi", "jkl" ]
   // ->
 vec![
     "abcghi", // note stored in String
     "defjkl",
 ]

The partial code so far

pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    let num_workers = if worker_count == 0 { 1 } else { worker_count };
    let mut handles = vec![];

    // make a channel for all the results collected
    let (tx_main, rx_main) = mpsc::channel();

    let mut input_per_worker = vec![String::from(""); num_workers];
    input
        .iter()
        .zip((0..worker_count).cycle()) // .cycle() easy way to rotate
                                        //  the numbers
        .for_each(|(&s, i)| { // ( string as &str, index(or id) number )
            *input_per_worker.get_mut(i) // access the value at the index `i`
                .unwrap() // it is safe to access because we allocate the
                          // memory space already
                += s;     // this very efficient way to copy the string from
                          // a &str
        });

    input_per_worker.into_iter().for_each(|input_| {
        // preprare for sending
        let tx = tx_main.clone();
        // prepare for listening
        let handle = thread::spawn(move || {
            // collect count
            let mut subtotal = HashMap::<char, usize>::new();

            input_.iter().for_each(|s| {
                // snip ..

benefits from memory access

input_per_worker.get_mut(i) is a great inheritance from the memory access. In haskell, maybe generating with lazy evaluation is better approach for this kind of problem.

Counting the Alphabets

This is quite easy task and I didnā€™t make a seprate function for this

// .. snip ..
input_per_worker.into_iter().for_each(|input_| {
    // note: .into_iter() used because I want to move the ownership into
    //       the for_each()

    // preprare for sending
    let tx = tx_main.clone();
    // prepare for listening
    let handle = thread::spawn(move || {
        // collect count
        let mut subtotal = HashMap::<char, usize>::new();

        input_.chars().for_each(|ch| {
            if ch.is_alphabetic() {
                // note: this task actually only insensitive on ascii alphabets
                // speed up 2x when using simple to_ascii_lowercase()
                // another option is
                // change s.chars() -> s.to_lowercases().chars()
                // and use as it is.
                let lc = ch.to_ascii_lowercase();
                /*ch.to_lowercase().for_each(|lc|*/

                // the following pattern was posted in
                // https://jeongoon.github.io/posts/2022-05-20-exercism-org-raindrops.html#leave-it-as-basic
                match subtotal.entry(lc) {
                    Occupied(o) => *o.into_mut() += 1,
                    Vacant(v) => {
                        v.insert(1);
                    }
                } /*);*/
            };
        });
        // finally sending
        tx.send(subtotal).unwrap();
    });
    handles.push(handle);
});

to_lowercase() or to_ascii_lowercase()

As I mentioned as a comment, there are some issues with lower case. In short,

I used char::to_ascii_lowercase() because Iā€™d like to filter first (is_alphabetic()), and then apply

Finally Summerizing

fold() could be handy when you are only asked to for the final result. And naturally we donā€™t need to worry about ownership and lifetime on the accumulator thanks to nature of functional programming. I took the fold() again here.

// .. snip
    handles.iter().fold(HashMap::new(), |mut acc, _handle| {
        rx_main
            .recv()
            .unwrap()
            .iter()
            .for_each(|(&ch, &count)| match acc.entry(ch) {
                Occupied(o) => *o.into_mut() += count,
                Vacant(v) => {
                    v.insert(count);
                }
            });
        acc
    })
} // end of pub fn frequency()

Brief Benchmark

This task comes with benchmark code as well and the the following is one of my benchmark on 9 years old xps.

   Compiling parallel-letter-frequency v0.0.0 (/home/myoungjin/exercism/rust/parallel-letter-frequency)
    Finished bench [optimized] target(s) in 1.48s
     Running unittests src/lib.rs (target/release/deps/parallel_letter_frequency-016160b8c250f033)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running benches/benchmark.rs (target/release/deps/benchmark-15a648eb6d4b6f2b)

running 6 tests
test bench_large_parallel   ... bench:     279,074 ns/iter (+/- 37,084)
test bench_large_sequential ... bench:     661,265 ns/iter (+/- 14,291)
test bench_small_parallel   ... bench:      68,663 ns/iter (+/- 3,665)
test bench_small_sequential ... bench:      22,776 ns/iter (+/- 489)
test bench_tiny_parallel    ... bench:      60,142 ns/iter (+/- 3,124)
test bench_tiny_sequential  ... bench:          78 ns/iter (+/- 3)

test result: ok. 0 passed; 0 failed; 0 ignored; 6 measured; 0 filtered out; finished in 21.73s

parallel is not always helpful _ė³‘ė ¬ģ²˜ė¦¬

We could gain some time efficiency on first bench_large_parallel test case from the concurrency, wel.. the other benchmarks do not show the benefit at all.

Because there are some overhead to make thread(s), we should be be careful on applying parallel or concurrency

The bench marking is pretty wise way to learn something to see the what we can earn or lose by using some sort of methods.

Wrapping Up

This is quite long article, due to the subject. But Iā€™d like to share my thoughts on concurrency: