Package: src/packages/fibres.fdoc

Synchronous threads

key file
fibres.flx share/lib/std/control/fibres.flx
schannels.flx share/lib/std/control/schannels.flx
mux.flx share/lib/std/control/mux.flx
spipes.flx share/lib/std/control/spipes.flx

Fibres (fthreads)

//[fibres.flx]

//$ Low level management of Felix fthreads (fibres).
open class Fibres
{
  private gen _start[t]: (t->0)*t->cont = "$1->clone()->call(0,$2)";

  //$ Function to start a continution with argument type t.
  gen start[t] (p:t->0) (x:t) = { return _start (p,x); }

  private fun _start0: (1->0)->cont = "$1->clone()->call(0)";

  //$ Function to start a contiuation without an argument.
  gen start (p:1->0) = { return _start0 (p); }

  //$ Function to make a fibre out of a continuation.
  gen mk_thread: cont->fthread = "new(*PTF gcp,::flx::rtl::_fthread_ptr_map,false) ::flx::rtl::fthread_t($1)";

  // Spawn a fibre on this fibres scheduler.
  // uses a supervisor call so can't be used in a function
  proc spawn_fthread(p:1->0)
  {
      var con = start p;              // get continuation of p
      var fthr = mk_thread con;
      svc$ svc_spawn_fthread fthr;
  }

  proc schedule_fthread(p:1->0)
  {
      var con = start p;              // get continuation of p
      var fthr = mk_thread con;
      svc$ svc_schedule_fthread fthr;
  }

  proc suicide: 1 = "throw (con_t*)NULL;";

  proc chain : cont = "return $1;" requires property "heap_closure";

  // *********************************************************
  // NESTED SYNC SCHEDULER
  // NOTE: deprecated in favour of async scheduler below
  // *********************************************************
  //$ The type of a fibre scheduler.
  type fibre_scheduler = "::flx::run::sync_sched*" requires header '#include "flx_sync.hpp"';

  //$ Construct a fibre scheduler.
  //$  NOTE: NOW GARBAGE COLLECTED!
  ctor fibre_scheduler: bool = """
    new(*PTF gcp,::flx::run::sync_sched_ptr_map,false)
      ::flx::run::sync_sched
      (
        $1,
        PTF gcp,
        new(*PTF gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(PTF gcp)
      )
    """
  ;
  ctor fibre_scheduler () =>
    fibre_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
  ;


  //$ Spawn a fibre on a given scheduler with a given continuation.
  //$ Note: does NOT run it!
  //$ FIXME: no mutex guard!!
  proc spawn_fibre: fibre_scheduler * fthread = """
    $1->active->push_back($2);
  """;

  proc frun: (1->0) = "::flx::rtl::executil::frun (PTF gcp, $1);"
    requires header '#include "flx_executil.hpp"'
  ;

  proc run: fibre_scheduler = "$1->frun();";

  proc run (p: 1 -> 0) {
    var s = fibre_scheduler();
    spawn_fthread s p;
    s.run;
  }


  //$ The type of the stop state of the fibre scheduler.
  //$ terminated: the scheduler is terminated.
  //$ blocked: the scheduler is out of threads to run.
  //$ delegated: the scheduler has been issued a service
  //$  request by a thread which it cannot satisfy.
  //$  The scheduler is put in delegated state and awaits
  //$  for another service to satisfy the request and put
  //$  it back in operation.
  //$
  //$ Note: there is no "operating" state because the
  //$ stop state can only be queried by the schedulers caller
  //$ when the scheduler returns control to it.
  enum fibre_scheduler_state {
    terminated,
    blocked,
    delegated
  };
  fun get_state : fibre_scheduler -> fibre_scheduler_state = "$1->fs";


  //$ Core user procedure for launching a fibre.
  proc spawn_fthread (fs:fibre_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }

  // *********************************************************
  // ASYNC SCHEDULER
  // *********************************************************
  // FIXME: it is leaked .. to be fixed shortly

  // async scheduler type
  type async_scheduler = "::flx::run::async_sched*"
    requires header '#include "flx_async.hpp"',
    package "flx_arun"
  ;

  // async scheduler constructor
  ctor async_scheduler: bool = """
    new
    ::flx::run::async_sched
        (
          PTF world, // world object
          $1, // debug driver flag
          PTF gcp,  // GC profile object
          new(*PTF gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(PTF gcp),
          ::flx::run::async_sched::mainline // temporary hack! thread kind (should be inherited)
        )
      """
    ;

  // async scheduler constructor wrapper
  ctor async_scheduler () =>
    async_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
  ;

  // spawn fibre on async scheduler from fthread object
  proc spawn_fibre: async_scheduler * fthread = """
      $1->ss->active->push_back($2);
  """;

  // spawn fibre on async scheduler from procedure
  proc spawn_fthread (fs:async_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }

  proc prun: async_scheduler = "$1->prun();";


  proc async_run (p: 1 -> 0) {
    var s = async_scheduler();
    spawn_fthread s p;
    s.prun;
  }

// *********************************************************
// MISC STUFF THAT MAY NOT BE USED, CONSIDER DELETING IT
// UNRELIABLE ANYHOW .. CHECK PLUGINS ...
// *********************************************************


  //$ Execute a single step of a fibre.
  gen step: cont -> cont = "$1->resume()";

  //$ Schedule death of a fibre.
  proc kill: fthread = "$1->cc = 0;";

  //$ Run a continuation until it terminates.
  //$ Do not use this proc if the underlying
  //$ procedure attempts to read messages.
  //$ This is a low level primitive, bypassing fthreads.
  proc run: cont = "::flx::rtl::executil::run($1);" requires package "flx_executil";

  private proc _send[t]: &cont * t =
  """
  {
    using namespace ::flx::rtl;
    con_t *tmp = *(con_t**)$1.get_data();
    // run target until it reaches a service request (or death)
    while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
      try { tmp=tmp->resume(); }
      catch (con_t *x) { tmp = x; }
    }
    // check it is alive and making the expected service request
    if (!tmp)
      throw flx_exec_failure_t (__FILE__,"send","Send to terminated procedure");
    if (!tmp->p_svc)
      throw flx_exec_failure_t (__FILE__,"send","Send to unready Procedure");
    if (tmp->p_svc->variant != svc_read)
      throw flx_exec_failure_t (__FILE__,"send","Send to Procedure which is not trying to read");
    // store the message
    **(?1**)tmp->p_svc->data= $2;
    // clear the service request
    tmp->p_svc = 0;
    // run the target until the next service request (or death)
    while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
      try { tmp=tmp->resume(); }
      catch (con_t *x) { tmp = x; }
    }
    // save the new continuation
    *(con_t**)$1.get_data() = tmp;

  }
  """;

  //$ Send a message to a continuation.
  //$ There is no type checking on the message type.
  //$ The procedure is executed until
  //$ the next wait_state, then the message is stored.
  //$ Low level primitive, bypassing fthreads.
  proc send[t] (p:&cont) (x:t)
  {
    _send (p,x);
  }

}

Synchronous Channels

//[schannels.flx]

//$ Sychronous Channels.
//$ Used to exchange control and possibly data
//$ between Felix f-threads (aka fibres).

open class Schannel
{
  //$ The type of a bidirectional synchronous channel.
  _gc_pointer type schannel[t] = "::flx::rtl::schannel_t*";

  //$ The type of an input synchronous channel.
  _gc_pointer type ischannel[t] = "::flx::rtl::schannel_t*";

  //$ The type of an output synchronous channel.
  _gc_pointer type oschannel[t] = "::flx::rtl::schannel_t*";

  gen mk_untyped_schannel: 1 -> address =
    "new(*PTF gcp,::flx::rtl::schannel_ptr_map,false) ::flx::rtl::schannel_t()"
    requires property "needs_gc"
  ;
  //$ Create a bidirectional synchronous channel.
  gen mk_schannel[t]():schannel[t] =>
    C_hack::cast[schannel[t]] #mk_untyped_schannel
  ;

  //$ Model a NULL pointer as an schannel.
  //$ Necessary for killing off schannels,
  //$ so as to make them unreachable, so the gc can reap them.
  //$ Note: null_schannels are safe.
  gen mk_null_schannel[t]: 1->schannel[t] = "NULL";

  //$ Model a NULL pointer as an ischannel.
  //$ Necessary for killing off schannels,
  //$ so as to make them unreachable, so the gc can reap them.
  gen mk_null_ischannel[t]: 1->ischannel[t] = "NULL";

  //$ Model a NULL pointer as an oschannel.
  //$ Necessary for killing off schannels,
  //$ so as to make them unreachable, so the gc can reap them.
  gen mk_null_oschannel[t]: 1->oschannel[t] = "NULL";

  ctor[T] address: oschannel[T] = "$1";
  ctor[T] address: ischannel[T] = "$1";

  //$ Check if an schannel is NULL.
  fun isNULL[T] :schannel[T] -> bool = "NULL==$1";

  //$ Check if an ischannel is NULL.
  fun isNULL[T] :ischannel[T] -> bool = "NULL==$1";

  //$ Check if an oschannel is NULL.
  fun isNULL[T] :oschannel[T] -> bool = "NULL==$1";

  //$ Safe cast from bidirectional to ouput synchronous channel.
  ctor[t] oschannel[t](x:schannel[t]) => C_hack::cast[oschannel[t]] x;

  //$ Safe cast from bidirectional to input synchronous channel.
  ctor[t] ischannel[t](x:schannel[t]) => C_hack::cast[ischannel[t]] x;

  //$ Make an input and an output channel out of a bidirectional channel.
  gen mk_ioschannel_pair[t](var ch:schannel[t]) =>
    ischannel[t] ch, oschannel[t] ch
  ;

  //$ Construct a connected input and output channel pair.
  gen mk_ioschannel_pair[t]() =>
    mk_ioschannel_pair[t]$ mk_schannel[t] ()
  ;

  // pass in address of location to put the pointer to the T data
  proc read[T] (chan:schannel[T], loc: &&T) {
    svc$ svc_sread$ C_hack::cast[_schannel] chan, C_hack::reinterpret[&root::address] (loc);
  }

  // pass in address of location to put the T data
  proc read[T] (chan:schannel[T], p: &T) {
    var loc: &T;
    read (chan, &loc);
    p <- *loc;
  }

  //$ Read an item from a bidirectional channel.
  inline gen read[T] (chan:schannel[T]) = {
    var loc: &T;
    read (chan, &loc);
    return *loc;
  }
  proc read[T] (chan:ischannel[T], loc: &&T) { read (C_hack::cast[schannel[T]] chan, loc); }
  proc read[T] (chan:ischannel[T], p: &T) { read (C_hack::cast[schannel[T]] chan, p); }

  //$ Read an item from an input channel.
  inline gen read[T] (chan:ischannel[T]) => read$ C_hack::cast[schannel[T]] chan;

  //$ Test if channel is read for a read.
  inline gen ready[T] :ischannel[T] -> bool = "$1->top!=nullptr && !(uintptr_t)$1->top &1u)";
  inline gen ready[T] : schannel[T] -> bool = "$1->top!=nullptr && (uintptr_t)$1->top &1u)";

  //$ Return Some value if ready, otherwise None
  inline gen maybe_read[T] (chan:ischannel[T]) =>
    if chan.ready then Some chan.read else None[T]
  ;

  inline gen maybe_read[T] (chan:schannel[T]) =>
    if chan.ready then Some chan.read else None[T]
  ;

  //$ Write an item to a bidirectional channel.
  proc write[T] (chan:schannel[T], v:T) {
    var ps = C_hack::cast[root::address]$ new v;
    svc$ svc_swrite$ C_hack::cast[_schannel] chan, &ps;
  }

  proc write[T] (chan:oschannel[T], v:T) {
    write (C_hack::cast[schannel[T]] chan, v);
  }

  //$ Multi Write an item to a bidirectional channel.
  proc broadcast[T] (chan:schannel[T], v:T) {
    var ps = C_hack::cast[root::address]$ new v;
    svc$ svc_multi_swrite$ C_hack::cast[_schannel] chan, &ps;
  }

  //$ Multi Write an item to an output channel.
  proc broadcast[T] (chan:oschannel[T], v:T) {
    broadcast (C_hack::cast[schannel[T]] chan, v);
  }

  // Very high power though not very efficient conversion
  // from ischannel to iterator.
  // Given i: ischannel[T] you can just write
  // for j in i do .. done
  gen iterator[T] (i:ischannel[T]) () : opt[T] = {
  next:>
    var y = None[T];
    frun { var x = read i; y = Some x; };
    match y do
    | Some _ => yield y; goto next;
    | None => return y;
    done
  }

  // Here is a subroutine call, assuming the
  // fibre is already created
  inline gen subcall[r,w] (chout:%>w, chin:%<r) (arg:w):r =
  {
    write (chout,arg);
    return read chin;
  }

  // Now, we can use the channels AS a function:
  inline fun apply[r,w] (ch:(%>w * %<r), arg:w):r =>
    subcall ch arg
  ;

}

Synchronous multiplexor

The following device acts like a select, that is, the reader get all the input data, but the order is indeterminate.

[Not clear how this is useful .. ]

//[mux.flx]

//$ Schannel multiplexor.
//$ Read multiple input schannels, write to an output schannel.
open class Multiplexor
{
  //$ Schannel copy.
  noinline proc copy[T] (i:ischannel[T],o:oschannel[T]) ()
  {
    while true do
      var x = read i;
      write (o,x);
    done
  }

  //$ Schannel multiplexor based on iterator argument.
  //$ Accepts stream of input schannels.
  //$ Writes to output schannel.
  proc mux[T] (inp:1->opt[ischannel[T]], out:oschannel[T]) ()
  {
    for i in inp do
      spawn_fthread$ copy(i,out);
    done
  }


  //$ Schannel multiplexor based on streamable data structure.
  //$ Creates stream of input schannels.
  //$ Writes to output schannel.
  fun mux[C,T with Streamable[C,ischannel[T]]] (a:C, out:oschannel[T]) =>
    mux (iterator a, out)
  ;
}

Schannel and Pipe syntax

Special syntax for both pipes and also abbreviation for schannel types.

//[schannels.flx]

open class DuplexSchannels
{
_gc_pointer type duplex_schannel[r,w] = "::flx::rtl::schannel_t*";

inline gen read[r,w] (chan:duplex_schannel[r,w]) : r =>
  read (C_hack::cast[ischannel[r]] chan)
;

inline proc write[r,w] (chan:duplex_schannel[r,w], v:w)  =>
  write (C_hack::cast[oschannel[w]] chan, v)
;

ctor[r,w] duplex_schannel[r,w] () =>
  C_hack::cast[duplex_schannel[r,w]] #mk_untyped_schannel
;

// NOTE: assuming the mainline want to read an r
// after passing a w to the subroutine, it must
// use the second channel of the pair to do so.
// passing the first one to the subroutine.
gen mk_duplex_schannel_pair[r,w] () =>
  let c = #mk_untyped_schannel in
  C_hack::cast[duplex_schannel[w,r]] c,
  C_hack::cast[duplex_schannel[r,w]] c
;

// Here is our subroutine call, assuming the
// fibre is already created
inline gen subcall[r,w] (ch:duplex_schannel[r,w]) (arg:w):r =
{
  write (ch,arg);
  return read ch;
}

// Now, we can use the duplex channel AS a function:
inline fun apply[r,w] (ch:duplex_schannel[r,w], arg:w):r =>
  subcall ch arg
;

// Here is a self contained subcall that spawns the fibre
// and creates the channel too. This model is for a one shot.
inline gen subcall[r,w]
  (fib: duplex_schannel[w,r] -> 1 -> 0)
  (arg: w)
: r =
{
  var wr,rw = mk_duplex_schannel_pair[r,w]();
  spawn_fthread$ fib wr;
  write (rw,arg);
  return read rw;
}

inline gen apply[r,w] (
  fib: duplex_schannel[w,r] -> 1 -> 0,
  arg: w)
: r =>
  subcall fib arg
;

} // class DuplexSchannels

Let’s now rewrite our example:

//[subrout-02.flx]
proc int_to_string (ch: %<int%>string)  ()
{
  var x = read ch;
  var r = x.str;
  write(ch, r);
}
var wr, rw = mk_duplex_schannel_pair[string,int]();
spawn_fthread$ int_to_string wr;
println$ rw 42;
42

Even more compactly:

//[subrout-03.flx]
proc int_to_string (ch: %<int%>string)  ()
{
  var x = read ch;
  var r = x.str;
  write(ch, r);
}
println$ int_to_string 42;
42