Package: src/packages/pthreads.fdoc
Preemptive Threading Support¶
key | file |
---|---|
__init__.flx | share/lib/std/pthread/__init__.flx |
pthread.flx | share/lib/std/pthread/pthread.flx |
pchannels.flx | share/lib/std/pthread/pchannels.flx |
ppipe.flx | share/lib/std/pthread/ppipe.flx |
forkjoin.flx | share/lib/std/pthread/forkjoin.flx |
mutex.flx | share/lib/std/pthread/mutex.flx |
semaphore.flx | share/lib/std/pthread/semaphore.flx |
condition_variable.flx | share/lib/std/pthread/condition_variable.flx |
ts_bound_queue.flx | share/lib/std/pthread/ts_bound_queue.flx |
atomic.flx | share/lib/std/pthread/atomic.flx |
threadpool.flx | share/lib/std/pthread/threadpool.flx |
threadpoolex1.flx | share/demo/threadpoolex1.flx |
Pthread Synopsis¶
//[__init__.flx]
// pthreads (portable)
include "std/pthread/pthread";
//include "std/pthread/pchannels";
include "std/pthread/mutex";
//include "std/pthread/ts_bound_queue";
//include "std/pthread/semaphore";
//include "std/pthread/condition_variable";
//include "std/pthread/ppipe";
//include "std/pthread/forkjoin";
//include "std/pthread/atomic";
//include "std/pthread/threadpool";
Pthreads.¶
General support for pre-emptive threading, aka shared memory concurrency. The core routines are based on Posix C interface. Emulations are provided for Windows.
The core support routines are written in C++. Adaption to the local platform operating system is done in C++ using configuration data provided by Felix configuration scripts.
Felix pthreads are always detached. It is not possible to directly wait on a pthread, kill a pthread, or join to a pthread. Pchannels or other devices such as mutex locks, semaphores or conditiona variables must be used for synchronisation instead.
//[pthread.flx]
header pthread_hxx = '#include "pthread_thread.hpp"';
header mutex_hxx = '#include "pthread_mutex.hpp"';
header condv_hxx = '#include "pthread_condv.hpp"';
header semaphore_hxx = '#include "pthread_semaphore.hpp"';
header monitor_hxx = '#include "pthread_monitor.hpp"';
header work_fifo_hxx = '#include "pthread_work_fifo.hpp"';
//$ This class provides access to the operating system's native
//$ threading routines. On systems with multiple cpus, this may
//$ increase performance as the operating system may schedule
//$ threads on different processors.
open class Pthread
{
requires package "flx_pthread";
//$ spawn a detached pthread.
proc spawn_pthread(p:1->0)
{
var con = start p; // get continuation of p
var fthr = mk_thread con;
svc$ svc_spawn_pthread fthr;
}
//$ spawn a detached pthread sharing active list with spawner
proc spawn_process(p:1->0)
{
var con = start p; // get continuation of p
var fthr = mk_thread con;
svc$ svc_spawn_process fthr;
}
proc thread_yield : 1 = "PTF gcp->collector->get_thread_control()->yield();";
}
Pchannels.¶
A <em>pchannel</em> is a <em>monitor</em> object, which is used to synchronise pthreads by use of read and write procedures which transfer a pointer to a heap allocated object. Ownership is transfered from the writer to the reader.
After initial synchronisation the read gains control and takes possession of the data. The reader then signals that the writer may proceed. The control interlock ensures that the reader is able to capture the data from the writer without the writer interfering. This may be necessary if the value needs to be deep copied, for example. The monitor data exchange protocol is designed to permit transfer of data on the writer’s machine stack, or data which the writer may modify after regaining control. However the read/write operations on pchannels automatically copy the data onto the heap and perform the synchronisation.
Pchannels should be used carefully because they block the whole pthread, that is, all fibres. Unlike fibres, if a deadlock occurs it cannot be resolved and should generally be considered a programming error.
//[pchannels.flx]
//$ Pchannels are unbuffered synchronisation points
//$ for pre-emptive threads.
//$
//$ Similarly to schannels, paired reader-writer pthreads
//$ cannot proceed until both parties agree data exchange is complete.
//$ Unlike schannels, both reader and writer can subsequently
//$ continue concurrently after the exchange.
open class Pchannel
{
requires package "flx_pthread";
//$ Pre-emptive thread channels (monitor).
type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
//$ Pre-emptive thread input channel.
type ipchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
//$ Pre-emptive thread output channel.
type opchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
//$ Make bidirectional pchannel.
fun mk_pchannel[t]: 1->pchannel[t] = "new flx::pthread::monitor_t(PTF gcp->collector->get_thread_control())";
//$ Safe cast from bidirectional to output pchannel.
ctor[t] opchannel[t](x:pchannel[t]) => C_hack::cast[opchannel[t]] x;
//$ Safe cast from bidirectional to input pchannel.
ctor[t] ipchannel[t](x:pchannel[t]) => C_hack::cast[ipchannel[t]] x;
//$ Make an input and an output pchannel out of a bidirectional channel.
fun mk_iopchannel_pair[t](var ch:pchannel[t]) =>
ipchannel[t] ch, opchannel[t] ch
;
//$ Construct a connected input and output pchannel pair.
fun mk_iopchannel_pair[t]() =>
mk_iopchannel_pair[t]$ mk_pchannel[t] ()
;
// NOTE: read/write on pchannels uses suspend/resume
// to tell any pending collector it is safe to proceed
// whilst it is doing the I/O (which may block),
// to block returning from the I/O during a collection
// AND, if the I/O completed before the collection got
// going, to yield at this point.
//$ Read from a pchannel.
proc _read[t]: pchannel[t] * &&t = """
{
//fprintf(stderr,"READ:DQ\\n");
*$2 = (?1*)($1->dequeue());
PTF gcp->collector->remove_root(*$2);
//fprintf(stderr,"DONE READ:DQ\\n");
}
""" requires property "needs_ptf";
//$ Write to a pchannel.
noinline gen read[t] (chan:pchannel[t]) = {
var p : &t;
_read (chan, &p);
return *p;
}
gen read[t] (chan:ipchannel[t]) => read$ C_hack::cast[pchannel[t]] chan;
proc _write[t]: pchannel[t] * &t = """
{
//fprintf(stderr,"WRITE:NQ\\n");
PTF gcp->collector->add_root($2);
$1->enqueue((void*)$2);
//fprintf(stderr,"DONE WRITE:NQ\\n");
}
""" requires property "needs_ptf";
noinline proc write[t](chan:pchannel[t], v:t) {
var ps = new v;
_write (chan,ps);
}
proc write[t] (chan:opchannel[t], v:t) { write$ C_hack::cast[pchannel[t]] chan,v; }
}
Ppipes.¶
//[ppipe.flx]
//$ Asynchronous Synchronous Pipe.
//$ Used to link pthreads.
open class Ppipe {
//$ Send an stream down a channel.
proc psource[T] (var it:1 -> T) (out:opchannel[T])
{
while true do write (out,#it); done
}
//$ isrc converts a streamable data structure
//$ such as an array into a source.
proc pisrc[V,T with Streamable[T,V]] (dat:T) (out:opchannel[opt[V]])
{
psource[opt[V]] (dat.iterator) out;
}
//$ Wire a source component to a sink.
//$ Return coupled fibre ready to run.
fun pipe[T]
(w: opchannel[T] -> 0,
r: ipchannel[T] -> 0)
:
1 -> 0
=>
{
var chi,cho = mk_iopchannel_pair[T] ();
spawn_pthread { (w cho); };
spawn_pthread { (r chi); };
}
;
//$ Wire a source component to a transducer.
//$ Return source.
fun pipe[T,U]
(w: opchannel[T] -> 0,
t: ipchannel[T] * opchannel[U] -> 0)
:
opchannel[U] -> 0
=>
proc (out:opchannel[U])
{
var chi,cho = mk_iopchannel_pair[T] ();
spawn_pthread { (w cho); };
spawn_pthread { (t (chi, out)); };
}
;
//$ xpipe connects a streamable data structure
//$ such as an array directly into a transducer.
fun xpipe[V,T,U with Streamable[T,V]]
(
a:T,
t: ipchannel[opt[V]] * opchannel[U] -> 0
)
: opchannel[U] -> 0 =>
pipe (a.pisrc[V],t)
;
//$ Wire a transducer into a transducer.
//$ Return another transducer.
fun pipe[T,U,V]
(a: ipchannel[T] * opchannel[U] -> 0,
b: ipchannel[U] * opchannel[V] -> 0)
:
ipchannel[T] * opchannel[V] -> 0
=>
proc (inp:ipchannel[T], out:opchannel[V])
{
var chi,cho = mk_iopchannel_pair[U] ();
spawn_pthread { a (inp, cho); };
spawn_pthread { b (chi, out); };
}
;
//$ Wire a transducer into a sink.
//$ Return a sink.
fun pipe[T,U]
(a: ipchannel[T] * opchannel[U] -> 0,
b: ipchannel[U] -> 0)
:
ipchannel[T] -> 0
=>
proc (inp:ipchannel[T])
{
var chi,cho = mk_iopchannel_pair[U] ();
spawn_pthread { a (inp, cho); };
spawn_pthread { b (chi); };
}
;
//$ Stream sort using intermediate darray.
//$ Requires stream of option type.
proc sort[T with Tord[T]] (r: ipchannel[opt[T]], w: opchannel[opt[T]])
{
var x = darray[T]();
acquire:while true do
match read r with
| Some v => x+=v;
| #None => break acquire;
endmatch;
done
x.sort;
for v in x do
write (w, Some v);
done
write (w,None[T]);
}
}
Fork/Join.¶
//[forkjoin.flx]
include "std/pthread/pchannels";
//$ Implement fork/join protocol.
open class ForkJoin
{
//$ Launch a set of pthreads and wait
//$ until all of them are finished.
proc concurrently_by_iterator (var it:1 -> opt[1->0])
{
// Make a channel to signal termination.
var iterm,oterm = mk_iopchannel_pair[int](); // should be unit but that bugs out at the moment
noinline proc manager (var p: 1->0) () { p(); write (oterm, 1); }
// Count the number of pthreads.
var count = 0;
again:>
match #it with
| Some p =>
++count;
spawn_pthread$ manager p;
goto again;
| #None =>
while count > 0 do
C_hack::ignore (read iterm);
--count;
done
endmatch;
}
proc concurrently[T with Streamable[T,1->0]] (d:T) => concurrently_by_iterator d.iterator;
}
Mutual Exclusion Lock (Mutex)¶
Mutex may be used to protect some region of memomry associated with that mutex conceptually, by locking the mutex for a short period of time. The region may then be modified atomically.
A Felix mutex is created on the heap and must be destroyed after use manually, they’re not garbage collected.
//[mutex.flx]
open class Mutex
{
requires package "flx_pthread";
// this needs to be fixed to work with gc but at the
// moment the uglier solution will suffice
type mutex = "::flx::pthread::flx_mutex_t*" requires mutex_hxx;
ctor mutex: unit = "new ::flx::pthread::flx_mutex_t";
proc lock: mutex = "$1->lock();";
proc unlock: mutex = "$1->unlock();";
proc destroy: mutex = "delete $1;";
}
Semaphores.¶
A semaphore is a counted lock. The sem_post
procedure
increments the counter, and the sem_wait
procedure decrements it.
However, the counter may not become negative so instead, if it
were to become negative, the sem_wait
procedure blocks the current
pthread, and the pthread joins a set of pthreads waiting on the
semaphore. When the counter is finally incremented by a call
from some pthread to sem_post
one of the pthreads waiting
with sem_wait
is allowed to proceed, again decrementing
the counter to zero so the remaining pthreads waiting continue
to do so.
The procedure sem_trywait
instead returns a flag indicating
whether it succeeded in decrementing the counter or not.
The term <em>post</em> is derived from the idea of posting a flag.
The counting feature of a semaphore is analogous to shoppers
in a store. The sem_post
function puts products on the shelf,
whilst the the sem_wait
function represents an order on which
the customer is waiting due to unavailable stock .. and sem_trywait
is the customer that, seeing there is no available stock, decides
to go elsewhere!
//[semaphore.flx]
open class Semaphore
{
// FIXME: does not comply with GC friendly blocking protocol!
requires package "pthread";
type semaphore = "::flx::pthread::flx_semaphore_t*" requires semaphore_hxx;
ctor semaphore = "new ::flx_pthread::flx_semaphore_t";
ctor semaphore * int = "new ::flx_pthread::flx_semaphore_t($1)";
proc destroy : semaphore = "delete $1;";
proc post: semaphore = "$1->post();";
proc wait: semaphore = "$1->wait();";
gen trywait: semaphore -> int = "$1->trywait()";
int get: semaphore = "$1->get();";
}
Condition Variables.¶
//[condition_variable.flx]
//$ Condition Variable for pthread synchronisation.
open class Condition_Variable
{
requires package "flx_pthread";
//$ The type of a condition variable.
type condition_variable = "::flx::pthread::flx_condv_t*" requires condv_hxx;
//$ Condition variable constructor taking unit argument.
ctor condition_variable: 1 = "new ::flx::pthread::flx_condv_t(PTF gcp->collector->get_thread_control())";
//$ Function to release a condition variable.
proc destroy: condition_variable = "delete $1;";
//$ lock/unlock associated mutex
proc lock : condition_variable = "$1->lock();";
proc unlock : condition_variable = "$1->unlock();";
//$ Function to wait until a signal is raised on
//$ the condition variable by another thread.
proc wait: condition_variable = "$1->wait();";
//$ Function to raise a signal on a condition
//$ variable which will allow at most one thread
//$ waiting on it to proceed.
proc signal: condition_variable = "$1->signal();";
//$ Function to broadcast a signal releasing all
//$ threads waiting on a conditiona variable.
proc broadcast: condition_variable = "$1->broadcast();";
//$ Timed wait for signal on condition variable.
//$ Time in seconds. Resolution nanoseconds.
gen timedwait: condition_variable * double -> int = "$1->timedwait($3)";
}
Thread Safe Bound Queue.¶
//[ts_bound_queue.flx]
open class TS_Bound_Queue
{
private uncopyable type bQ_ = "::flx::pthread::bound_queue_t";
_gc_pointer _gc_type bQ_ type ts_bound_queue_t[T] = "::flx::pthread::bound_queue_t*"
requires
package "flx_bound_queue",
scanner "::flx::pthread::bound_queue_scanner"
;
ctor[T] ts_bound_queue_t[T]: !ints =
"""
new (*PTF gcp, @0, false) ::flx::pthread::bound_queue_t(
PTF gcp->collector->get_thread_control(), (size_t)$1)
""" requires property "needs_ptf";
// NOTE: enqueue/dequeue on queues uses suspend/resume
// to tell any pending collector it is safe to proceed
// whilst it is doing the operations (which may block),
// to block returning from the I/O during a collection
// AND, if the I/O completed before the collection got
// going, to yield at this point.
private proc _enqueue[T]: ts_bound_queue_t[T] * &T = """
FLX_SAVE_REGS;
//fprintf(stderr,"enqueue to ts_bound_queue q=%p starts, item=%p\\n", $1, $2);
//PTF gcp->collector->get_thread_control()->suspend();
$1->enqueue((void*)$2);
//fprintf(stderr,"enqueue to ts_bound_queue q=%p done, item=%p\\n", $1, $2);
//PTF gcp->collector->get_thread_control()->resume();
""";
// Duh .. what happens if $2 storage location is set by
// the dequeue in the middle of a collection?
// it might be NULL when scanned, but by the time the queue
// is scanned the value will be lost from the queue and
// in the variable instead!
// The RACE is on!
private proc _dequeue[T]: ts_bound_queue_t[T] * &&T = """
FLX_SAVE_REGS;
//fprintf(stderr,"dequeue from ts_bound_queue %p starts\\n", $1);
//PTF gcp->collector->get_thread_control()->suspend();
*$2=(?1*)$1->dequeue();
//fprintf(stderr,"dequeue from ts_bound_queue done q=%p item=%p\\n",$1,*$2);
//PTF gcp->collector->get_thread_control()->resume();
""";
proc enqueue[T] (Q:ts_bound_queue_t[T]) (elt:T) {
_enqueue(Q, new elt);
}
gen dequeue[T] (Q:ts_bound_queue_t[T]): T = {
var x:&T;
_dequeue (Q,&x);
return *x;
}
proc wait[T]: ts_bound_queue_t[T] = """
FLX_SAVE_REGS;
//PTF gcp->collector->get_thread_control()->suspend();
$1->wait_until_empty();
//PTF gcp->collector->get_thread_control()->resume();
""";
proc resize[T]: ts_bound_queue_t[T] * !ints = "$1->resize((size_t)$2);";
}
Atomic operations¶
//[atomic.flx]
open class Atomic
{
// note: only works for some types: constraints need to be added.
// We have to use a pointer because atomics aren't copyable
type atomic[T]="::std::atomic<?1>*" requires Cxx11_headers::atomic;
// FIXME: not managed by GC yet!
// constructor
ctor[T] atomic[T]: T = "(new ::std::atomic<?1>($1))";
proc delete[T] : atomic[T] = "delete $1;";
// note: only works for even less types! Constraints needed.
proc pre_incr[T] : &atomic[T] = "++**$1;";
proc pre_decr[T] : &atomic[T] = "--**$1;";
gen load[T] : atomic[T] -> T = "$1->load()";
proc store[T] : atomic[T] * T = "$1->store($2);";
proc store[T] (a:atomic[T]) (v:T) { store (a,v); }
instance[T] Str[atomic[T]] {
fun str (var x:atomic[T]) => x.load.str;
}
inherit[T] Str[atomic[T]];
}
Thread Pool¶
A thread pool is a global object containing set of running threads and a queue. Instead of spawning a new thread, the client just queues the job instead. Each thread grabs a job from the queue and runs it, on completion it grabs another job.
The primary advantage of a global thread pool is it prevent oversaturation of the set of processors and thus excess context switching. The main downside is monitoring the completed state of jobs.
Do not use the threadpool for quick jobs, there is a significant overhead posting a job.
//[threadpool.flx]
include "std/pthread/ts_bound_queue";
include "std/pthread/atomic";
include "std/io/faio";
include "std/pthread/condition_variable";
include "std/pthread/pchannels";
class ThreadPool
{
typedef job_t = 1 -> 0;
private const ThreadStop : job_t = "NULL";
private fun isStop : job_t -> bool = "$1==NULL";
private var clock = #Faio::mk_alarm_clock;
private var jobqueue = ts_bound_queue_t[job_t] 1024; // queue up to 1K jobs
private var nthreads = 8; // great default for quad core i7 ?
// number of threads actually running
private var running = atomic 0;
// number of threads blocked waiting on a barrier
private var waiting = atomic 0;
// barrier lock
private var block = #condition_variable;
fun get_nthreads () => nthreads;
// This is a flag used to protect against nested pfor loops.
// If there is a nested pfor loop, it will just execute serially
// in the calling thread.
private var pforrunning = atomic 0;
proc barrier() {
//println$ "Barrier";
block.lock;
++waiting;
if waiting.load == nthreads do
waiting.store 0;
block.broadcast;
else
again:>
block.wait;
if waiting.load != 0 goto again;
done
block.unlock;
}
proc start () {
//println$ "Thread pool start()";
for i in 1..nthreads call spawn_pthread jobhandler;
//println$ "Threads spawned";
}
proc start (n:int) {
nthreads = n;
#start;
}
private proc jobhandler () {
//println$ "Job handler thread #"+running.str+" started";
var id = running;
++running;
rpt:while true do
//println$ "Trying to dequeue a job id=" + id.str;
var job = dequeue jobqueue;
//println$ "Job dequeued id="+id.str;
if isStop job break rpt;
job;
thread_yield();
done
--running;
}
proc queue_job (job:job_t) {
//println$ "Queuing job";
if running.load == 0 call start ();
if nthreads > 0 do
call enqueue jobqueue job;
else
call job;
done
}
proc stop () {
for i in 1..nthreads
call enqueue jobqueue ThreadStop;
while running.load != 0
call Faio::sleep(clock,0.001);
}
proc post_barrier() {
if nthreads > 0
for i in 1..nthreads call queue_job barrier;
}
proc notify (chan:opchannel[int]) () {
write (chan,1);
}
proc join () {
if nthreads > 0 do
post_barrier;
var ip,op = #mk_iopchannel_pair[int];
queue_job$ notify op;
var x = read ip;
C_hack::ignore(x);
done
}
proc pfor_segment (first:int, last:int) (lbody: int * int -> 1 -> 0)
{
//println$ "Pfor segment " + first.str + "," last.str;
var N = last - first + 1;
var nt = nthreads + 1;
if pforrunning.load == 0 and N >= nthreads and nthreads > 0 do
pforrunning.store 1;
for var counter in 0 upto nt - 2 do
var sfirst = first + (N * counter) / nt;
var slast = first + (N * (counter + 1)) / nt - 1;
//println$ "QUEUE JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
ThreadPool::queue_job$ lbody (sfirst, slast);
done
sfirst = first + (N * (nt - 1)) / nt;
slast = last;
//println$ "UNQUEUED JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
lbody (sfirst, slast) ();
join;
pforrunning.store 0;
else
// Run serially
lbody (first, last) ();
done
}
noinline proc forloop (lbody: int -> 0) (first:int, last:int) ()
{
//println$ "forloop " + first.str + "," + last.str;
for var i in first upto last call lbody i;
}
noinline proc pforloop (first: int) (last:int) (lbody: int -> 0)
{
//println$ "Pfor segment " + first.str + "," last.str;
pfor_segment (first, last) (forloop lbody);
}
inline proc tpfor (first:int, last:int, lbody: int-> 0)
{
pforloop first last lbody;
}
}
Thread Pool Demo¶
//[threadpoolex1.flx]
include "std/pthread/threadpool";
open ThreadPool;
// Matrix multiply
macro val N = 1000;
typedef N = 1000;
typedef vec_t = array[double, N];
typedef mx_t = array[vec_t,N];
var a : mx_t;
var b : mx_t;
var r : mx_t;
var s : mx_t;
proc clear (mx:&mx_t) {
for i in 0..<N
for j in 0..<N
perform mx . i . j <- 0.0;
}
proc rinit (mx:&mx_t) {
for i in 0..<N
for j in 0..<N
perform mx . i . j <- #rand.double / RAND_MAX.double;
}
fun check() = {
//println$ "Verification check";
for i in 0..<N
for j in 0..<N
if r.i.j != s.i.j return false;
return true;
}
proc verify() {
//println$ "Running verify";
if #check do
println$ "Verified";
else
println "Wrong!";
done
//println$ "Verify ran";
}
clear &r;
clear &s;
rinit &a;
rinit &b;
fun inner_product (pr: &vec_t, pc: &vec_t) =
{
var sum = 0.0;
for (var k=0; k<N; ++k;)
perform sum = sum + *(pr.k) * *(pc.k);
return sum;
}
// naive multiply
var start = #time;
begin
for i in 0..<N
for (var j=0; j<N; ++j;)
perform &r . i . j <- inner_product (&a.i, &b.j);
s = r;
end
var fin = #time;
println$ "Naive mul elapsed " + (fin - start).str + " seconds";
//println$ "Starting thread pool";
ThreadPool::start 8;
//println$ "Thread pool started";
// naive parallel multiply
noinline proc inner_products_proc (var i:int)
{
for (var j=0; j<N; ++j;)
perform &r . i . j <- inner_product (&a.i, &b.j);
}
noinline proc inner_products_job (var i:int) () {
for (var j=0; j<N; ++j;)
perform &r . i . j <- inner_product (&a.i, &b.j);
}
clear &r;
start = #time;
begin
for i in 0..<N
call ThreadPool::queue_job$ inner_products_job (i);
ThreadPool::join;
end
fin = #time;
println$ "Naive Parallel mul elapsed " + (fin - start).str + " seconds";
verify;
// smart parallel multiply
clear &r;
start = #time;
begin
println$ "Using thread pool's pforloop";
ThreadPool::pforloop 0 (N - 1) inner_products_proc;
end
fin = #time;
println$ "Smart Parallel mul elapsed " + (fin - start).str + " seconds";
verify;
// smart parallel multiply with syntax
clear &r;
start = #time;
begin
pfor i in 0 upto (N - 1) do
for (var j=0; j<N; ++j;)
perform &r . i . j <- inner_product (&a.i, &b.j);
done
end
fin = #time;
println$ "pfor mul elapsed " + (fin - start).str + " seconds";
verify;
ThreadPool::stop;