Package: src/packages/pthreads.fdoc

Preemptive Threading Support

key file
__init__.flx share/lib/std/pthread/__init__.flx
pthread.flx share/lib/std/pthread/pthread.flx
pchannels.flx share/lib/std/pthread/pchannels.flx
ppipe.flx share/lib/std/pthread/ppipe.flx
forkjoin.flx share/lib/std/pthread/forkjoin.flx
mutex.flx share/lib/std/pthread/mutex.flx
semaphore.flx share/lib/std/pthread/semaphore.flx
condition_variable.flx share/lib/std/pthread/condition_variable.flx
ts_bound_queue.flx share/lib/std/pthread/ts_bound_queue.flx
atomic.flx share/lib/std/pthread/atomic.flx
threadpool.flx share/lib/std/pthread/threadpool.flx
threadpoolex1.flx share/demo/threadpoolex1.flx

Pthread Synopsis

//[__init__.flx]

// pthreads (portable)
include "std/pthread/pthread";
//include "std/pthread/pchannels";
include "std/pthread/mutex";
//include "std/pthread/ts_bound_queue";
//include "std/pthread/semaphore";
//include "std/pthread/condition_variable";
//include "std/pthread/ppipe";
//include "std/pthread/forkjoin";
//include "std/pthread/atomic";
//include "std/pthread/threadpool";

Pthreads.

General support for pre-emptive threading, aka shared memory concurrency. The core routines are based on Posix C interface. Emulations are provided for Windows.

The core support routines are written in C++. Adaption to the local platform operating system is done in C++ using configuration data provided by Felix configuration scripts.

Felix pthreads are always detached. It is not possible to directly wait on a pthread, kill a pthread, or join to a pthread. Pchannels or other devices such as mutex locks, semaphores or conditiona variables must be used for synchronisation instead.

//[pthread.flx]

header pthread_hxx = '#include "pthread_thread.hpp"';
header mutex_hxx = '#include "pthread_mutex.hpp"';
header condv_hxx = '#include "pthread_condv.hpp"';
header semaphore_hxx = '#include "pthread_semaphore.hpp"';
header monitor_hxx = '#include "pthread_monitor.hpp"';
header work_fifo_hxx = '#include "pthread_work_fifo.hpp"';

//$ This class provides access to the operating system's native
//$ threading routines. On systems with multiple cpus, this may
//$ increase performance as the operating system may schedule
//$ threads on different processors.
open class Pthread
{
  requires package "flx_pthread";

  //$ spawn a detached pthread.
  proc spawn_pthread(p:1->0)
  {
      var con = start p;              // get continuation of p
      var fthr = mk_thread con;
      svc$ svc_spawn_pthread fthr;
  }
  //$ spawn a detached pthread sharing active list with spawner
  proc spawn_process(p:1->0)
  {
      var con = start p;              // get continuation of p
      var fthr = mk_thread con;
      svc$ svc_spawn_process fthr;
  }
  proc thread_yield : 1 = "PTF gcp->collector->get_thread_control()->yield();";
}

Pchannels.

A <em>pchannel</em> is a <em>monitor</em> object, which is used to synchronise pthreads by use of read and write procedures which transfer a pointer to a heap allocated object. Ownership is transfered from the writer to the reader.

After initial synchronisation the read gains control and takes possession of the data. The reader then signals that the writer may proceed. The control interlock ensures that the reader is able to capture the data from the writer without the writer interfering. This may be necessary if the value needs to be deep copied, for example. The monitor data exchange protocol is designed to permit transfer of data on the writer’s machine stack, or data which the writer may modify after regaining control. However the read/write operations on pchannels automatically copy the data onto the heap and perform the synchronisation.

Pchannels should be used carefully because they block the whole pthread, that is, all fibres. Unlike fibres, if a deadlock occurs it cannot be resolved and should generally be considered a programming error.

//[pchannels.flx]

//$ Pchannels are unbuffered synchronisation points
//$ for pre-emptive threads.
//$
//$ Similarly to schannels, paired reader-writer pthreads
//$ cannot proceed until both parties agree data exchange is complete.
//$ Unlike schannels, both reader and writer can subsequently
//$ continue concurrently after the exchange.
open class Pchannel
{
  requires package "flx_pthread";

  //$ Pre-emptive thread channels (monitor).
  type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
  //$ Pre-emptive thread input channel.
  type ipchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
  //$ Pre-emptive thread output channel.
  type opchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;

  //$ Make bidirectional pchannel.
  fun mk_pchannel[t]: 1->pchannel[t] = "new flx::pthread::monitor_t(PTF gcp->collector->get_thread_control())";

  //$ Safe cast from bidirectional to output pchannel.
  ctor[t] opchannel[t](x:pchannel[t]) => C_hack::cast[opchannel[t]] x;
  //$ Safe cast from bidirectional to input pchannel.
  ctor[t] ipchannel[t](x:pchannel[t]) => C_hack::cast[ipchannel[t]] x;

  //$ Make an input and an output pchannel out of a bidirectional channel.
  fun mk_iopchannel_pair[t](var ch:pchannel[t]) =>
    ipchannel[t] ch, opchannel[t] ch
  ;

  //$ Construct a connected input and output pchannel pair.
  fun mk_iopchannel_pair[t]() =>
    mk_iopchannel_pair[t]$ mk_pchannel[t] ()
  ;


  // NOTE: read/write on pchannels uses suspend/resume
  // to tell any pending collector it is safe to proceed
  // whilst it is doing the I/O (which may block),
  // to block returning from the I/O during a collection
  // AND, if the I/O completed before the collection got
  // going, to yield at this point.

  //$ Read from a pchannel.
  proc _read[t]: pchannel[t] * &&t = """
    {
    //fprintf(stderr,"READ:DQ\\n");
    *$2 = (?1*)($1->dequeue());
    PTF gcp->collector->remove_root(*$2);
    //fprintf(stderr,"DONE READ:DQ\\n");
    }
  """ requires property "needs_ptf";

  //$ Write to a pchannel.
  noinline gen read[t] (chan:pchannel[t]) = {
    var p : &t;
    _read (chan,  &p);
    return *p;
  }
  gen read[t] (chan:ipchannel[t]) => read$ C_hack::cast[pchannel[t]] chan;

  proc _write[t]: pchannel[t] * &t = """
    {
    //fprintf(stderr,"WRITE:NQ\\n");
    PTF gcp->collector->add_root($2);
    $1->enqueue((void*)$2);
    //fprintf(stderr,"DONE WRITE:NQ\\n");
    }
  """ requires property "needs_ptf";

  noinline proc write[t](chan:pchannel[t], v:t) {
    var ps = new v;
    _write (chan,ps);
  }
  proc write[t] (chan:opchannel[t], v:t) { write$ C_hack::cast[pchannel[t]] chan,v; }
}

Ppipes.

//[ppipe.flx]

//$ Asynchronous Synchronous Pipe.
//$ Used to link pthreads.
open class Ppipe {

  //$ Send an stream down a channel.
  proc psource[T] (var it:1 -> T) (out:opchannel[T])
  {
    while true do write (out,#it); done
  }

  //$ isrc converts a streamable data structure
  //$ such as an array into a source.
  proc pisrc[V,T with Streamable[T,V]] (dat:T) (out:opchannel[opt[V]])
  {
    psource[opt[V]] (dat.iterator) out;
  }


  //$ Wire a source component to a sink.
  //$ Return coupled fibre ready to run.
  fun pipe[T]
    (w: opchannel[T] -> 0,
    r: ipchannel[T] -> 0)
  :
    1 -> 0
  =>
    {
      var chi,cho = mk_iopchannel_pair[T] ();
      spawn_pthread { (w cho); };
      spawn_pthread { (r chi); };
    }
  ;

  //$ Wire a source component to a transducer.
  //$ Return source.
  fun pipe[T,U]
    (w: opchannel[T] -> 0,
    t: ipchannel[T] * opchannel[U] -> 0)
  :
    opchannel[U] -> 0
  =>
    proc (out:opchannel[U])
    {
      var chi,cho = mk_iopchannel_pair[T] ();
      spawn_pthread { (w cho); };
      spawn_pthread { (t (chi, out)); };
    }
  ;

  //$ xpipe connects a streamable data structure
  //$ such as an array directly into a transducer.
  fun xpipe[V,T,U with Streamable[T,V]]
    (
      a:T,
      t: ipchannel[opt[V]] * opchannel[U] -> 0
    )
    : opchannel[U] -> 0 =>
    pipe (a.pisrc[V],t)
  ;


  //$ Wire a transducer into a transducer.
  //$ Return another transducer.
  fun pipe[T,U,V]
    (a: ipchannel[T] * opchannel[U] -> 0,
    b: ipchannel[U] * opchannel[V] -> 0)
  :
    ipchannel[T] * opchannel[V] -> 0
  =>
    proc (inp:ipchannel[T], out:opchannel[V])
    {
      var chi,cho = mk_iopchannel_pair[U] ();
      spawn_pthread { a (inp, cho); };
      spawn_pthread { b (chi, out); };
    }
  ;

  //$ Wire a transducer into a sink.
  //$ Return a sink.
  fun pipe[T,U]
    (a: ipchannel[T] * opchannel[U] -> 0,
    b: ipchannel[U] -> 0)
  :
    ipchannel[T]  -> 0
  =>
    proc (inp:ipchannel[T])
    {
      var chi,cho = mk_iopchannel_pair[U] ();
      spawn_pthread { a (inp, cho); };
      spawn_pthread { b (chi); };
    }
  ;


  //$ Stream sort using intermediate darray.
  //$ Requires stream of option type.
  proc sort[T with Tord[T]] (r: ipchannel[opt[T]], w: opchannel[opt[T]])
  {
     var x = darray[T]();
     acquire:while true do
       match read r with
       | Some v => x+=v;
       | #None => break acquire;
       endmatch;
     done
     x.sort;
     for v in x do
       write (w, Some v);
     done
     write (w,None[T]);
  }
}

Fork/Join.

//[forkjoin.flx]
include "std/pthread/pchannels";

//$ Implement fork/join protocol.
open class ForkJoin
{
  //$ Launch a set of pthreads and wait
  //$ until all of them are finished.
  proc concurrently_by_iterator (var it:1 -> opt[1->0])
  {
     // Make a channel to signal termination.
     var iterm,oterm = mk_iopchannel_pair[int](); // should be unit but that bugs out at the moment
     noinline proc manager (var p: 1->0) () { p(); write (oterm, 1); }
     // Count the number of pthreads.
     var count = 0;
   again:>
     match #it with
     | Some p =>
       ++count;
       spawn_pthread$ manager p;
      goto again;

     | #None =>
       while count > 0 do
         C_hack::ignore (read iterm);
         --count;
       done
     endmatch;
  }

  proc concurrently[T with Streamable[T,1->0]] (d:T) => concurrently_by_iterator d.iterator;

}

Mutual Exclusion Lock (Mutex)

Mutex may be used to protect some region of memomry associated with that mutex conceptually, by locking the mutex for a short period of time. The region may then be modified atomically.

A Felix mutex is created on the heap and must be destroyed after use manually, they’re not garbage collected.

//[mutex.flx]

open class Mutex
{
  requires package "flx_pthread";
  // this needs to be fixed to work with gc but at the
  // moment the uglier solution will suffice
  type mutex = "::flx::pthread::flx_mutex_t*" requires mutex_hxx;
  ctor mutex: unit = "new ::flx::pthread::flx_mutex_t";
  proc lock: mutex = "$1->lock();";
  proc unlock: mutex = "$1->unlock();";
  proc destroy: mutex = "delete $1;";
}

Semaphores.

A semaphore is a counted lock. The sem_post procedure increments the counter, and the sem_wait procedure decrements it. However, the counter may not become negative so instead, if it were to become negative, the sem_wait procedure blocks the current pthread, and the pthread joins a set of pthreads waiting on the semaphore. When the counter is finally incremented by a call from some pthread to sem_post one of the pthreads waiting with sem_wait is allowed to proceed, again decrementing the counter to zero so the remaining pthreads waiting continue to do so.

The procedure sem_trywait instead returns a flag indicating whether it succeeded in decrementing the counter or not.

The term <em>post</em> is derived from the idea of posting a flag.

The counting feature of a semaphore is analogous to shoppers in a store. The sem_post function puts products on the shelf, whilst the sem_wait function represents an order on which the customer is waiting due to unavailable stock .. and sem_trywait is the customer that, seeing there is no available stock, decides to go elsewhere!

//[semaphore.flx]

open class Semaphore
{
  // FIXME: does not comply with GC friendly blocking protocol!

  requires package "pthread";
  type semaphore = "::flx::pthread::flx_semaphore_t*" requires semaphore_hxx;
  ctor semaphore = "new ::flx_pthread::flx_semaphore_t";
  ctor semaphore * int = "new ::flx_pthread::flx_semaphore_t($1)";
  proc destroy : semaphore = "delete $1;";
  proc post: semaphore = "$1->post();";
  proc wait: semaphore = "$1->wait();";
  gen trywait: semaphore -> int = "$1->trywait()";
  int get: semaphore = "$1->get();";
}

Condition Variables.

//[condition_variable.flx]

//$ Condition Variable for pthread synchronisation.
open class Condition_Variable
{
  requires package "flx_pthread";

  //$ The type of a condition variable.
  type condition_variable = "::flx::pthread::flx_condv_t*" requires condv_hxx;

  //$ Condition variable constructor taking unit argument.
  ctor condition_variable: 1 = "new ::flx::pthread::flx_condv_t(PTF gcp->collector->get_thread_control())";

  //$ Function to release a condition variable.
  proc destroy: condition_variable = "delete $1;";

  //$ lock/unlock associated mutex
  proc lock : condition_variable = "$1->lock();";
  proc unlock : condition_variable = "$1->unlock();";

  //$ Function to wait until a signal is raised on
  //$ the condition variable by another thread.
  proc wait: condition_variable = "$1->wait();";

  //$ Function to raise a signal on a condition
  //$ variable which will allow at most one thread
  //$ waiting on it to proceed.
  proc signal: condition_variable = "$1->signal();";

  //$ Function to broadcast a signal releasing all
  //$ threads waiting on a conditiona variable.
  proc broadcast: condition_variable = "$1->broadcast();";

  //$ Timed wait for signal on condition variable.
  //$ Time in seconds. Resolution nanoseconds.
  gen timedwait: condition_variable * double -> int = "$1->timedwait($3)";
}

Thread Safe Bound Queue.

//[ts_bound_queue.flx]

open class TS_Bound_Queue
{
  private uncopyable type bQ_ = "::flx::pthread::bound_queue_t";
  _gc_pointer _gc_type bQ_ type ts_bound_queue_t[T] = "::flx::pthread::bound_queue_t*"
    requires
     package "flx_bound_queue",
     scanner "::flx::pthread::bound_queue_scanner"
  ;
  ctor[T] ts_bound_queue_t[T]: !ints =
    """
      new (*PTF gcp, @0, false) ::flx::pthread::bound_queue_t(
      PTF gcp->collector->get_thread_control(), (size_t)$1)
    """ requires property "needs_ptf";

  // NOTE: enqueue/dequeue on queues uses suspend/resume
  // to tell any pending collector it is safe to proceed
  // whilst it is doing the operations (which may block),
  // to block returning from the I/O during a collection
  // AND, if the I/O completed before the collection got
  // going, to yield at this point.


  private proc _enqueue[T]: ts_bound_queue_t[T] * &T = """
    FLX_SAVE_REGS;
//fprintf(stderr,"enqueue to ts_bound_queue q=%p starts, item=%p\\n", $1, $2);
    //PTF gcp->collector->get_thread_control()->suspend();
    $1->enqueue((void*)$2);
//fprintf(stderr,"enqueue to ts_bound_queue q=%p done, item=%p\\n", $1, $2);
    //PTF gcp->collector->get_thread_control()->resume();
  """;


  // Duh .. what happens if $2 storage location is set by
  // the dequeue in the middle of a collection?
  // it might be NULL when scanned, but by the time the queue
  // is scanned the value will be lost from the queue and
  // in the variable instead!
  // The RACE is on!
  private proc _dequeue[T]: ts_bound_queue_t[T] * &&T = """
    FLX_SAVE_REGS;
//fprintf(stderr,"dequeue from ts_bound_queue %p starts\\n", $1);
    //PTF gcp->collector->get_thread_control()->suspend();
    *$2=(?1*)$1->dequeue();
//fprintf(stderr,"dequeue from ts_bound_queue done q=%p item=%p\\n",$1,*$2);
    //PTF gcp->collector->get_thread_control()->resume();
  """;

  proc enqueue[T] (Q:ts_bound_queue_t[T])  (elt:T) {
     _enqueue(Q, new elt);
  }

  gen dequeue[T] (Q:ts_bound_queue_t[T]): T = {
    var x:&T;
    _dequeue (Q,&x);
    return *x;
  }


  proc wait[T]: ts_bound_queue_t[T] = """
    FLX_SAVE_REGS;
    //PTF gcp->collector->get_thread_control()->suspend();
    $1->wait_until_empty();
    //PTF gcp->collector->get_thread_control()->resume();
  """;

  proc resize[T]: ts_bound_queue_t[T] * !ints = "$1->resize((size_t)$2);";

}

Atomic operations

//[atomic.flx]
open class Atomic
{
  // note: only works for some types: constraints need to be added.
  // We have to use a pointer because atomics aren't copyable

  type atomic[T]="::std::atomic<?1>*" requires Cxx11_headers::atomic;

  // FIXME: not managed by GC yet!
  // constructor
  ctor[T] atomic[T]: T = "(new ::std::atomic<?1>($1))";

  proc delete[T] : atomic[T] = "delete $1;";

  // note: only works for even less types! Constraints needed.
  proc pre_incr[T] : &atomic[T] = "++**$1;";
  proc pre_decr[T] : &atomic[T] = "--**$1;";
  gen load[T] : atomic[T] -> T = "$1->load()";
  proc store[T] : atomic[T] * T = "$1->store($2);";
  proc store[T] (a:atomic[T]) (v:T) { store (a,v); }

  instance[T] Str[atomic[T]] {
    fun str (var x:atomic[T]) => x.load.str;
  }
  inherit[T] Str[atomic[T]];
}

Thread Pool

A thread pool is a global object containing set of running threads and a queue. Instead of spawning a new thread, the client just queues the job instead. Each thread grabs a job from the queue and runs it, on completion it grabs another job.

The primary advantage of a global thread pool is it prevent oversaturation of the set of processors and thus excess context switching. The main downside is monitoring the completed state of jobs.

Do not use the threadpool for quick jobs, there is a significant overhead posting a job.

//[threadpool.flx]

include "std/pthread/ts_bound_queue";
include "std/pthread/atomic";
include "std/io/faio";
include "std/pthread/condition_variable";
include "std/pthread/pchannels";

class ThreadPool
{
  typedef job_t = 1 -> 0;
  private const ThreadStop : job_t = "NULL";
  private fun isStop : job_t -> bool = "$1==NULL";
  private var clock = #Faio::mk_alarm_clock;
  private var jobqueue = ts_bound_queue_t[job_t] 1024; // queue up to 1K jobs
  private var nthreads = 8; // great default for quad core i7 ?

  // number of threads actually running
  private var running = atomic 0;

  // number of threads blocked waiting on a barrier
  private var waiting = atomic 0;

  // barrier lock
  private var block = #condition_variable;

  fun get_nthreads () => nthreads;

  // This is a flag used to protect against nested pfor loops.
  // If there is a nested pfor loop, it will just execute serially
  // in the calling thread.
  private var pforrunning = atomic 0;

  proc barrier() {
//println$ "Barrier";
    block.lock;
    ++waiting;
    if waiting.load == nthreads do
      waiting.store 0;
      block.broadcast;
    else
    again:>
      block.wait;
      if waiting.load != 0 goto again;
    done
    block.unlock;
  }

  proc start () {
//println$ "Thread pool start()";
     for i in 1..nthreads call spawn_pthread jobhandler;
//println$ "Threads spawned";
  }

  proc start (n:int) {
     nthreads = n;
     #start;
  }

  private proc jobhandler () {
//println$ "Job handler thread #"+running.str+" started";
     var id = running;
     ++running;
     rpt:while true do
//println$ "Trying to dequeue a job id=" + id.str;
       var job = dequeue jobqueue;
//println$ "Job dequeued id="+id.str;
       if isStop job break rpt;
       job;
       thread_yield();
     done
     --running;
  }

  proc queue_job (job:job_t) {
//println$ "Queuing job";
    if running.load == 0 call start ();
    if nthreads > 0 do
      call enqueue jobqueue job;
    else
      call job;
    done
  }

  proc stop () {
    for i in 1..nthreads
      call enqueue jobqueue ThreadStop;
    while running.load != 0
      call Faio::sleep(clock,0.001);
  }

  proc post_barrier() {
    if nthreads > 0
      for i in 1..nthreads call queue_job barrier;
  }

  proc notify (chan:opchannel[int]) () {
    write (chan,1);
  }

  proc join () {
    if nthreads > 0 do
      post_barrier;
      var ip,op = #mk_iopchannel_pair[int];
      queue_job$ notify op;
      var x = read ip;
      C_hack::ignore(x);
    done
  }

  proc pfor_segment (first:int, last:int) (lbody: int * int -> 1 -> 0)
  {
//println$ "Pfor segment " + first.str + "," last.str;
    var N = last - first + 1;
    var nt = nthreads + 1;
    if pforrunning.load == 0 and N >= nthreads and nthreads > 0 do
      pforrunning.store 1;
      for var counter in 0 upto nt - 2 do
        var sfirst = first + (N * counter) / nt;
        var slast = first + (N * (counter + 1)) / nt - 1;
//println$ "QUEUE JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
        ThreadPool::queue_job$ lbody (sfirst, slast);
      done
      sfirst = first + (N * (nt - 1)) / nt;
      slast = last;
//println$ "UNQUEUED JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
      lbody (sfirst, slast) ();
      join;
      pforrunning.store 0;
    else
      // Run serially
      lbody (first, last) ();
    done
  }

  noinline proc forloop (lbody: int -> 0) (first:int, last:int) ()
  {
//println$ "forloop " + first.str + "," + last.str;
    for var i in first upto last call lbody i;
  }
  noinline proc pforloop (first: int) (last:int) (lbody: int -> 0)
  {
//println$ "Pfor segment " + first.str + "," last.str;
    pfor_segment (first, last)  (forloop lbody);
  }
  inline proc tpfor (first:int, last:int, lbody: int-> 0)
  {
     pforloop first last lbody;
  }

}

Thread Pool Demo

//[threadpoolex1.flx]
include "std/pthread/threadpool";
open ThreadPool;

// Matrix multiply
macro val N = 1000;
typedef N = 1000;

typedef vec_t = array[double, N];
typedef mx_t = array[vec_t,N];
var a : mx_t;
var b : mx_t;
var r : mx_t;
var s : mx_t;

proc clear (mx:&mx_t) {
  for i in 0..<N
  for j in 0..<N
    perform mx . i . j <- 0.0;
}

proc rinit (mx:&mx_t) {
  for i in 0..<N
  for j in 0..<N
    perform mx . i . j <- #rand.double / RAND_MAX.double;
}

fun check() = {
//println$ "Verification check";
  for i in 0..<N
  for j in 0..<N
    if r.i.j != s.i.j return false;
  return true;
}

proc verify() {
//println$ "Running verify";
  if #check do
    println$ "Verified";
  else
    println "Wrong!";
  done
//println$ "Verify ran";
}

clear &r;
clear &s;
rinit &a;
rinit &b;

fun inner_product (pr: &vec_t, pc: &vec_t) =
{
  var sum = 0.0;
  for (var k=0; k<N; ++k;)
    perform sum = sum + *(pr.k) * *(pc.k);
  return sum;
}

// naive multiply
var start = #time;
begin
  for i in 0..<N
  for (var j=0; j<N; ++j;)
    perform &r . i . j <- inner_product (&a.i, &b.j);
  s = r;
end
var fin = #time;
println$ "Naive mul elapsed " + (fin - start).str + " seconds";

//println$ "Starting thread pool";
ThreadPool::start 8;
//println$ "Thread pool started";

// naive parallel multiply
noinline proc inner_products_proc (var i:int)
{
  for (var j=0; j<N; ++j;)
    perform &r . i . j <- inner_product (&a.i, &b.j);
}

noinline proc inner_products_job (var i:int) () {
  for (var j=0; j<N; ++j;)
    perform &r . i . j <- inner_product (&a.i, &b.j);
}

clear &r;
start = #time;
begin
  for i in 0..<N
    call ThreadPool::queue_job$ inner_products_job (i);
  ThreadPool::join;
end
fin = #time;
println$ "Naive Parallel mul elapsed " + (fin - start).str + " seconds";
verify;

// smart parallel multiply
clear &r;
start = #time;
begin
println$ "Using thread pool's pforloop";
  ThreadPool::pforloop 0 (N - 1) inner_products_proc;
end
fin = #time;
println$ "Smart Parallel mul elapsed " + (fin - start).str + " seconds";
verify;

// smart parallel multiply with syntax
clear &r;
start = #time;
begin
  pfor i in 0 upto (N - 1) do
  for (var j=0; j<N; ++j;)
    perform &r . i . j <- inner_product (&a.i, &b.j);
  done
end
fin = #time;
println$ "pfor mul elapsed " + (fin - start).str + " seconds";
verify;


ThreadPool::stop;