6.3 Rayon

Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel one. It also guarantees data-race freedom. Rayon’s goal is to make it easy to add parallelism to your sequential code.

So basically to take existing for loops or iterators and make them run in parallel.

extern crate rayon;

trait Joiner {
    fn is_parallel() -> bool;
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send;
}

struct Parallel;
impl Joiner for Parallel {
    #[inline]
    fn is_parallel() -> bool {
        true
    }
    #[inline]
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send
    {
        rayon::join(oper_a, oper_b)
    }
}

struct Sequential;
impl Joiner for Sequential {
    #[inline]
    fn is_parallel() -> bool {
        false
    }
    #[inline]
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send
    {
        let a = oper_a();
        let b = oper_b();
        (a, b)
    }
}

fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() <= 1 {
        return;
    }

    if J::is_parallel() && v.len() <= 5*1024 {
        return quick_sort::<Sequential, T>(v);
    }

    let mid = partition(v);
    let (lo, hi) = v.split_at_mut(mid);
    J::join(|| quick_sort::<J,T>(lo),
            || quick_sort::<J,T>(hi));
}

fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

// #[rustr_export]
pub fn quick_f64(x:Vec<f64>)->Vec<f64>{
    let mut y = x;
    { quick_sort::<Parallel, _>(&mut y);}
    y
}

// #[rustr_export]
pub fn quick_f64s(x:Vec<f64>)->Vec<f64>{
    let mut y = x;
    { quick_sort::<Sequential, _>(&mut y);}
    y
}
rust(code = '
extern crate rayon;

trait Joiner {
    fn is_parallel() -> bool;
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send;
}

struct Parallel;
impl Joiner for Parallel {
    #[inline]
    fn is_parallel() -> bool {
        true
    }
    #[inline]
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send
    {
        rayon::join(oper_a, oper_b)
    }
}

struct Sequential;
impl Joiner for Sequential {
    #[inline]
    fn is_parallel() -> bool {
        false
    }
    #[inline]
    fn join<A,R_A,B,R_B>(oper_a: A, oper_b: B) -> (R_A, R_B)
        where A: FnOnce() -> R_A + Send, B: FnOnce() -> R_B + Send,
              R_A: Send, R_B: Send
    {
        let a = oper_a();
        let b = oper_b();
        (a, b)
    }
}

fn quick_sort<J:Joiner, T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() <= 1 {
        return;
    }

    if J::is_parallel() && v.len() <= 5*1024 {
        return quick_sort::<Sequential, T>(v);
    }

    let mid = partition(v);
    let (lo, hi) = v.split_at_mut(mid);
    J::join(|| quick_sort::<J,T>(lo),
            || quick_sort::<J,T>(hi));
}

fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

// #[rustr_export]
pub fn quick_f64(x:Vec<f64>)->Vec<f64>{
    let mut y = x;
    { quick_sort::<Parallel, _>(&mut y);}
    y
}

// #[rustr_export]
pub fn quick_f64s(x:Vec<f64>)->Vec<f64>{
    let mut y = x;
    { quick_sort::<Sequential, _>(&mut y);}
    y
}

',depend = '
[dependencies]
rustr = "*"
rayon = "0.3.1"
')
#> updating cached Cargo.toml.

rand = runif(100000,min = 1,max = 10000000)

# 2 core CPU on travis-ci

microbenchmark(quick_f64(rand), quick_f64s(rand), sort(rand))
#> Unit: milliseconds
#>              expr   min    lq  mean median    uq  max neval
#>   quick_f64(rand)  7.40  7.53  8.48   7.61  8.19 55.8   100
#>  quick_f64s(rand)  9.51  9.69 10.51   9.84 11.15 14.7   100
#>        sort(rand) 11.73 11.92 13.51  12.81 13.75 59.3   100