Package: src/packages/fibres.fdoc
Synchronous threads¶
key | file |
---|---|
fibres.flx | share/lib/std/control/fibres.flx |
schannels.flx | share/lib/std/control/schannels.flx |
mux.flx | share/lib/std/control/mux.flx |
spipes.flx | share/lib/std/control/spipes.flx |
Fibres (fthreads)¶
//[fibres.flx]
//$ Low level management of Felix fthreads (fibres).
open class Fibres
{
private gen _start[t]: (t->0)*t->cont = "$1->clone()->call(0,$2)";
//$ Function to start a continution with argument type t.
gen start[t] (p:t->0) (x:t) = { return _start (p,x); }
private fun _start0: (1->0)->cont = "$1->clone()->call(0)";
//$ Function to start a contiuation without an argument.
gen start (p:1->0) = { return _start0 (p); }
//$ Function to make a fibre out of a continuation.
gen mk_thread: cont->fthread = "new(*PTF gcp,::flx::rtl::_fthread_ptr_map,false) ::flx::rtl::fthread_t($1)";
// Spawn a fibre on this fibres scheduler.
// uses a supervisor call so can't be used in a function
proc spawn_fthread(p:1->0)
{
var con = start p; // get continuation of p
var fthr = mk_thread con;
svc$ svc_spawn_fthread fthr;
}
proc schedule_fthread(p:1->0)
{
var con = start p; // get continuation of p
var fthr = mk_thread con;
svc$ svc_schedule_fthread fthr;
}
proc suicide: 1 = "throw (con_t*)NULL;";
proc chain : cont = "return $1;" requires property "heap_closure";
// *********************************************************
// NESTED SYNC SCHEDULER
// NOTE: deprecated in favour of async scheduler below
// *********************************************************
//$ The type of a fibre scheduler.
type fibre_scheduler = "::flx::run::sync_sched*" requires header '#include "flx_sync.hpp"';
//$ Construct a fibre scheduler.
//$ NOTE: NOW GARBAGE COLLECTED!
ctor fibre_scheduler: bool = """
new(*PTF gcp,::flx::run::sync_sched_ptr_map,false)
::flx::run::sync_sched
(
$1,
PTF gcp,
new(*PTF gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(PTF gcp)
)
"""
;
ctor fibre_scheduler () =>
fibre_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
;
//$ Spawn a fibre on a given scheduler with a given continuation.
//$ Note: does NOT run it!
//$ FIXME: no mutex guard!!
proc spawn_fibre: fibre_scheduler * fthread = """
$1->active->push_back($2);
""";
proc frun: (1->0) = "::flx::rtl::executil::frun (PTF gcp, $1);"
requires header '#include "flx_executil.hpp"'
;
proc run: fibre_scheduler = "$1->frun();";
proc run (p: 1 -> 0) {
var s = fibre_scheduler();
spawn_fthread s p;
s.run;
}
//$ The type of the stop state of the fibre scheduler.
//$ terminated: the scheduler is terminated.
//$ blocked: the scheduler is out of threads to run.
//$ delegated: the scheduler has been issued a service
//$ request by a thread which it cannot satisfy.
//$ The scheduler is put in delegated state and awaits
//$ for another service to satisfy the request and put
//$ it back in operation.
//$
//$ Note: there is no "operating" state because the
//$ stop state can only be queried by the schedulers caller
//$ when the scheduler returns control to it.
enum fibre_scheduler_state {
terminated,
blocked,
delegated
};
fun get_state : fibre_scheduler -> fibre_scheduler_state = "$1->fs";
//$ Core user procedure for launching a fibre.
proc spawn_fthread (fs:fibre_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }
// *********************************************************
// ASYNC SCHEDULER
// *********************************************************
// FIXME: it is leaked .. to be fixed shortly
// async scheduler type
type async_scheduler = "::flx::run::async_sched*"
requires header '#include "flx_async.hpp"',
package "flx_arun"
;
// async scheduler constructor
ctor async_scheduler: bool = """
new
::flx::run::async_sched
(
PTF world, // world object
$1, // debug driver flag
PTF gcp, // GC profile object
new(*PTF gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(PTF gcp),
::flx::run::async_sched::mainline // temporary hack! thread kind (should be inherited)
)
"""
;
// async scheduler constructor wrapper
ctor async_scheduler () =>
async_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
;
// spawn fibre on async scheduler from fthread object
proc spawn_fibre: async_scheduler * fthread = """
$1->ss->active->push_back($2);
""";
// spawn fibre on async scheduler from procedure
proc spawn_fthread (fs:async_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }
proc prun: async_scheduler = "$1->prun();";
proc async_run (p: 1 -> 0) {
var s = async_scheduler();
spawn_fthread s p;
s.prun;
}
// *********************************************************
// MISC STUFF THAT MAY NOT BE USED, CONSIDER DELETING IT
// UNRELIABLE ANYHOW .. CHECK PLUGINS ...
// *********************************************************
//$ Execute a single step of a fibre.
gen step: cont -> cont = "$1->resume()";
//$ Schedule death of a fibre.
proc kill: fthread = "$1->cc = 0;";
//$ Run a continuation until it terminates.
//$ Do not use this proc if the underlying
//$ procedure attempts to read messages.
//$ This is a low level primitive, bypassing fthreads.
proc run: cont = "::flx::rtl::executil::run($1);" requires package "flx_executil";
private proc _send[t]: &cont * t =
"""
{
using namespace ::flx::rtl;
con_t *tmp = *(con_t**)$1.get_data();
// run target until it reaches a service request (or death)
while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
try { tmp=tmp->resume(); }
catch (con_t *x) { tmp = x; }
}
// check it is alive and making the expected service request
if (!tmp)
throw flx_exec_failure_t (__FILE__,"send","Send to terminated procedure");
if (!tmp->p_svc)
throw flx_exec_failure_t (__FILE__,"send","Send to unready Procedure");
if (tmp->p_svc->variant != svc_read)
throw flx_exec_failure_t (__FILE__,"send","Send to Procedure which is not trying to read");
// store the message
**(?1**)tmp->p_svc->data= $2;
// clear the service request
tmp->p_svc = 0;
// run the target until the next service request (or death)
while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
try { tmp=tmp->resume(); }
catch (con_t *x) { tmp = x; }
}
// save the new continuation
*(con_t**)$1.get_data() = tmp;
}
""";
//$ Send a message to a continuation.
//$ There is no type checking on the message type.
//$ The procedure is executed until
//$ the next wait_state, then the message is stored.
//$ Low level primitive, bypassing fthreads.
proc send[t] (p:&cont) (x:t)
{
_send (p,x);
}
}
Synchronous Channels¶
//[schannels.flx]
//$ Sychronous Channels.
//$ Used to exchange control and possibly data
//$ between Felix f-threads (aka fibres).
open class Schannel
{
//$ The type of a bidirectional synchronous channel.
_gc_pointer type schannel[t] = "::flx::rtl::schannel_t*";
//$ The type of an input synchronous channel.
_gc_pointer type ischannel[t] = "::flx::rtl::schannel_t*";
//$ The type of an output synchronous channel.
_gc_pointer type oschannel[t] = "::flx::rtl::schannel_t*";
gen mk_untyped_schannel: 1 -> address =
"new(*PTF gcp,::flx::rtl::schannel_ptr_map,false) ::flx::rtl::schannel_t()"
requires property "needs_gc"
;
//$ Create a bidirectional synchronous channel.
gen mk_schannel[t]():schannel[t] =>
C_hack::cast[schannel[t]] #mk_untyped_schannel
;
//$ Model a NULL pointer as an schannel.
//$ Necessary for killing off schannels,
//$ so as to make them unreachable, so the gc can reap them.
//$ Note: null_schannels are safe.
gen mk_null_schannel[t]: 1->schannel[t] = "NULL";
//$ Model a NULL pointer as an ischannel.
//$ Necessary for killing off schannels,
//$ so as to make them unreachable, so the gc can reap them.
gen mk_null_ischannel[t]: 1->ischannel[t] = "NULL";
//$ Model a NULL pointer as an oschannel.
//$ Necessary for killing off schannels,
//$ so as to make them unreachable, so the gc can reap them.
gen mk_null_oschannel[t]: 1->oschannel[t] = "NULL";
ctor[T] address: oschannel[T] = "$1";
ctor[T] address: ischannel[T] = "$1";
//$ Check if an schannel is NULL.
fun isNULL[T] :schannel[T] -> bool = "NULL==$1";
//$ Check if an ischannel is NULL.
fun isNULL[T] :ischannel[T] -> bool = "NULL==$1";
//$ Check if an oschannel is NULL.
fun isNULL[T] :oschannel[T] -> bool = "NULL==$1";
//$ Safe cast from bidirectional to ouput synchronous channel.
ctor[t] oschannel[t](x:schannel[t]) => C_hack::cast[oschannel[t]] x;
//$ Safe cast from bidirectional to input synchronous channel.
ctor[t] ischannel[t](x:schannel[t]) => C_hack::cast[ischannel[t]] x;
//$ Make an input and an output channel out of a bidirectional channel.
gen mk_ioschannel_pair[t](var ch:schannel[t]) =>
ischannel[t] ch, oschannel[t] ch
;
//$ Construct a connected input and output channel pair.
gen mk_ioschannel_pair[t]() =>
mk_ioschannel_pair[t]$ mk_schannel[t] ()
;
// pass in address of location to put the pointer to the T data
proc read[T] (chan:schannel[T], loc: &&T) {
svc$ svc_sread$ C_hack::cast[_schannel] chan, C_hack::reinterpret[&root::address] (loc);
}
// pass in address of location to put the T data
proc read[T] (chan:schannel[T], p: &T) {
var loc: &T;
read (chan, &loc);
p <- *loc;
}
//$ Read an item from a bidirectional channel.
inline gen read[T] (chan:schannel[T]) = {
var loc: &T;
read (chan, &loc);
return *loc;
}
proc read[T] (chan:ischannel[T], loc: &&T) { read (C_hack::cast[schannel[T]] chan, loc); }
proc read[T] (chan:ischannel[T], p: &T) { read (C_hack::cast[schannel[T]] chan, p); }
//$ Read an item from an input channel.
inline gen read[T] (chan:ischannel[T]) => read$ C_hack::cast[schannel[T]] chan;
//$ Test if channel is read for a read.
inline gen ready[T] :ischannel[T] -> bool = "$1->top!=nullptr && !(uintptr_t)$1->top &1u)";
inline gen ready[T] : schannel[T] -> bool = "$1->top!=nullptr && (uintptr_t)$1->top &1u)";
//$ Return Some value if ready, otherwise None
inline gen maybe_read[T] (chan:ischannel[T]) =>
if chan.ready then Some chan.read else None[T]
;
inline gen maybe_read[T] (chan:schannel[T]) =>
if chan.ready then Some chan.read else None[T]
;
//$ Write an item to a bidirectional channel.
proc write[T] (chan:schannel[T], v:T) {
var ps = C_hack::cast[root::address]$ new v;
svc$ svc_swrite$ C_hack::cast[_schannel] chan, &ps;
}
proc write[T] (chan:oschannel[T], v:T) {
write (C_hack::cast[schannel[T]] chan, v);
}
//$ Multi Write an item to a bidirectional channel.
proc broadcast[T] (chan:schannel[T], v:T) {
var ps = C_hack::cast[root::address]$ new v;
svc$ svc_multi_swrite$ C_hack::cast[_schannel] chan, &ps;
}
//$ Multi Write an item to an output channel.
proc broadcast[T] (chan:oschannel[T], v:T) {
broadcast (C_hack::cast[schannel[T]] chan, v);
}
// Very high power though not very efficient conversion
// from ischannel to iterator.
// Given i: ischannel[T] you can just write
// for j in i do .. done
gen iterator[T] (i:ischannel[T]) () : opt[T] = {
next:>
var y = None[T];
frun { var x = read i; y = Some x; };
match y do
| Some _ => yield y; goto next;
| None => return y;
done
}
// Here is a subroutine call, assuming the
// fibre is already created
inline gen subcall[r,w] (chout:%>w, chin:%<r) (arg:w):r =
{
write (chout,arg);
return read chin;
}
// Now, we can use the channels AS a function:
inline fun apply[r,w] (ch:(%>w * %<r), arg:w):r =>
subcall ch arg
;
}
Synchronous multiplexor¶
The following device acts like a select, that is, the reader get all the input data, but the order is indeterminate.
[Not clear how this is useful .. ]
//[mux.flx]
//$ Schannel multiplexor.
//$ Read multiple input schannels, write to an output schannel.
open class Multiplexor
{
//$ Schannel copy.
noinline proc copy[T] (i:ischannel[T],o:oschannel[T]) ()
{
while true do
var x = read i;
write (o,x);
done
}
//$ Schannel multiplexor based on iterator argument.
//$ Accepts stream of input schannels.
//$ Writes to output schannel.
proc mux[T] (inp:1->opt[ischannel[T]], out:oschannel[T]) ()
{
for i in inp do
spawn_fthread$ copy(i,out);
done
}
//$ Schannel multiplexor based on streamable data structure.
//$ Creates stream of input schannels.
//$ Writes to output schannel.
fun mux[C,T with Streamable[C,ischannel[T]]] (a:C, out:oschannel[T]) =>
mux (iterator a, out)
;
}
Schannel and Pipe syntax¶
Special syntax for both pipes and also abbreviation for schannel types.
//[schannels.flx]
open class DuplexSchannels
{
_gc_pointer type duplex_schannel[r,w] = "::flx::rtl::schannel_t*";
inline gen read[r,w] (chan:duplex_schannel[r,w]) : r =>
read (C_hack::cast[ischannel[r]] chan)
;
inline proc write[r,w] (chan:duplex_schannel[r,w], v:w) =>
write (C_hack::cast[oschannel[w]] chan, v)
;
ctor[r,w] duplex_schannel[r,w] () =>
C_hack::cast[duplex_schannel[r,w]] #mk_untyped_schannel
;
// NOTE: assuming the mainline want to read an r
// after passing a w to the subroutine, it must
// use the second channel of the pair to do so.
// passing the first one to the subroutine.
gen mk_duplex_schannel_pair[r,w] () =>
let c = #mk_untyped_schannel in
C_hack::cast[duplex_schannel[w,r]] c,
C_hack::cast[duplex_schannel[r,w]] c
;
// Here is our subroutine call, assuming the
// fibre is already created
inline gen subcall[r,w] (ch:duplex_schannel[r,w]) (arg:w):r =
{
write (ch,arg);
return read ch;
}
// Now, we can use the duplex channel AS a function:
inline fun apply[r,w] (ch:duplex_schannel[r,w], arg:w):r =>
subcall ch arg
;
// Here is a self contained subcall that spawns the fibre
// and creates the channel too. This model is for a one shot.
inline gen subcall[r,w]
(fib: duplex_schannel[w,r] -> 1 -> 0)
(arg: w)
: r =
{
var wr,rw = mk_duplex_schannel_pair[r,w]();
spawn_fthread$ fib wr;
write (rw,arg);
return read rw;
}
inline gen apply[r,w] (
fib: duplex_schannel[w,r] -> 1 -> 0,
arg: w)
: r =>
subcall fib arg
;
} // class DuplexSchannels
Let’s now rewrite our example:
//[subrout-02.flx]
proc int_to_string (ch: %<int%>string) ()
{
var x = read ch;
var r = x.str;
write(ch, r);
}
var wr, rw = mk_duplex_schannel_pair[string,int]();
spawn_fthread$ int_to_string wr;
println$ rw 42;
42
Even more compactly:
//[subrout-03.flx]
proc int_to_string (ch: %<int%>string) ()
{
var x = read ch;
var r = x.str;
write(ch, r);
}
println$ int_to_string 42;
42