Package: src/packages/async.fdoc

Asynchronous I/O and thread scheduling

key file
flx_async_world.hpp share/lib/rtl/flx_async_world.hpp
flx_async_world.cpp share/src/rtl/flx_async_world.cpp
flx_async.hpp share/lib/rtl/flx_async.hpp
flx_async.cpp share/src/flx_async/flx_async.cpp
flx_async.py $PWD/buildsystem/flx_async.py
unix_flx_async.fpc $PWD/src/config/unix/flx_async.fpc
win_flx_async.fpc $PWD/src/config/win/flx_async.fpc

The Asychronous Support System

//[flx_async_world.hpp]

#ifndef __flx_async_world_H_
#define __flx_async_world_H_

#include "flx_gc.hpp"
#include "flx_collector.hpp"
#include "flx_sync.hpp"

namespace flx { namespace run {

// This class handles pthreads and asynchronous I/O
// It shares operations with sync_sched by interleaving
// based on state variables.
//
// NOTE: currently async_sched is NOT garbage collected
// Hence, the synchronous scheduler is creates must
// be made a GC root
struct RTL_EXTERN async_sched
{
  enum thread_kind_t {mainline,embedded,pthread,process};
  thread_kind_t thread_kind;
  static char const *str(thread_kind_t);

  // weak pointer
  struct flx_world *world;

  bool debug_driver;


  // weak pointer
  ::flx::gc::generic::gc_profile_t *gcp;

  // Strong pointer
  sync_sched *ss;  // (d, gcp, active), (ft, request), (pc, fs)

  async_sched(
    flx_world *world_arg,
    bool d,
    ::flx::gc::generic::gc_profile_t *g,
    fthread_list *a, thread_kind_t
  );
  ~async_sched();

  int prun();
  void do_spawn_pthread();
  void do_spawn_process();
  void spawn_impl(fthread_list*, thread_kind_t);
  void do_general();

  void external_multi_swrite(::flx::rtl::schannel_t *, void *data);
private:
  bool nonblocking_schedule_queued_fthreads();
};

RTL_EXTERN extern ::flx::gc::generic::gc_shape_t async_sched_ptr_map;


}} // namespaces
#endif //__flx_async_world_H_
//[flx_async_world.cpp ]


#include "flx_world.hpp"
#include "flx_async_world.hpp"
#include "flx_sync.hpp"
#include <assert.h>

using namespace ::flx::rtl;
using namespace ::flx::pthread;

namespace flx { namespace run {

// ********************************************************
// SHAPE for async_sched
// ********************************************************

static const std::size_t async_sched_offsets[1]={
    offsetof(async_sched,ss)
};

static ::flx::gc::generic::offset_data_t const async_sched_offset_data = { 1, async_sched_offsets };

::flx::gc::generic::gc_shape_t async_sched_ptr_map = {
  "flx::run::async_sched",
  1,sizeof(async_sched),
  0, // no finaliser,
  0, // fcops
  &async_sched_offset_data,
  ::flx::gc::generic::scan_by_offsets,
  0,0, // no serialisation as yet
  ::flx::gc::generic::gc_flags_default,
  0UL, 0UL
};

// ***************************************************
// Async_sched: Thread kind pretty printer
// ***************************************************


char const *async_sched::str(thread_kind_t k) {
  switch (k) {
  case mainline: return "mainline";
  case embedded: return "embedded";
  case pthread: return "pthread";
  case process: return "process";
  }
}

// ***************************************************
// Async_sched: CONSTRUCTOR
// ***************************************************

async_sched::async_sched(
    flx_world *world_arg,
    bool d,
    ::flx::gc::generic::gc_profile_t *g,
    fthread_list *a, thread_kind_t k
  ) :
    world(world_arg),
    debug_driver(d),
    gcp(g),
    thread_kind(k)
  {
    ss = new(*gcp,sync_sched_ptr_map, false) sync_sched(debug_driver, gcp, a);
    ++a->thread_count;
    ++a->busy_count;
    if (debug_driver)
      fprintf(stderr, "prun %p: async scheduler, creating and rooting synchronous scheduler! threads=1,busy=1\n",(void*)mythrid());
  }


// ***************************************************
// Async_sched: DESTRUCTOR
// ***************************************************
async_sched::~async_sched() {
  try
  {
    --ss->active->thread_count;
    if (debug_driver)
      fprintf(stderr, "prun %p: Terminating async scheduler, threads=%d\n",(void*)mythrid(), ss->active->thread_count.load());
    if (debug_driver)
      fprintf(stderr, "prun %p: async scheduler, fibre queue length %d, async_cound=%d\n",
         (void*)mythrid(), ss->active->size(), ss->active->async_count);
    if (debug_driver)
      fprintf(stderr, "prun %p: async scheduler returns!\n",(void*)mythrid());
  }
  catch (...) { fprintf(stderr, "Unknown exception deleting async!\n"); }
}

// ***************************************************
// Async_sched: Thread procedure
// ***************************************************
static void prun_pthread_entry(void *data) {
  async_sched *d = (async_sched*)data;
  d->prun();
}

// ***************************************************
// Async_sched: SPAWNING
// ***************************************************

// SPAWNING A NEW FELIX PTHREAD
// CREATES ITS OWN PRIVATE ASYNC SCHEDULER
// CREATES ITS OWN PRIVATE SYNC SCHEDULER
// SHARES WORLD INCLUDING COLLECTOR
// REGISTERS IN THREAD_CONTROL
void async_sched::do_spawn_process()
{
  // this is safe (at the moment) because, if the active list
  // is already in use by other processes, we're just overwriting
  // the lock pointer with its existing value. If the list isn't
  // in use by other processes, the lock pointer is NULL,
  // but this thread is the one running the current process,
  // so it can't race with itself.
  ss->active->lockneeded = true;
  spawn_impl (ss->active,process);
}
void async_sched::do_spawn_pthread()
{
  fthread_list *pactive = new(*gcp, ::flx::run::fthread_list_ptr_map,false) fthread_list(gcp);
  spawn_impl (pactive,pthread);
}

void async_sched::spawn_impl(fthread_list *pactive, thread_kind_t k) {

  fthread_t *ftx = ss->request->svc_fthread_req.fthread;
  if (debug_driver)
    fprintf(stderr, "[prun %p: spawn_pthread] Spawn pthread %p\n", (void*)mythrid(), ftx);
  {
    spinguard dummy(pactive->lockneeded, &(pactive->active_lock));
    // SHOULD THIS BE HERE?? The async scheduler isn't created yet.
    // maybe we should do this "properly" after it is (in the next statement!)
    // NO NO! This is ALL BAD! Some OTHER thread might run this routine!
    pactive->push_front(ftx);
  }
  void *data = new  (*gcp, async_sched_ptr_map, false) async_sched(world,debug_driver, gcp, pactive,k);

  if (debug_driver)
    fprintf(stderr, "[prun %p: spawn_pthread] Starting new pthread, thread counter= %zu\n",
      (void*)mythrid(), gcp->collector->get_thread_control()->thread_count());

  {
    // We use a hard (not Felix aware) lock here
    // because the Felix system is in an incoherent state
    // between the OS thread spawn, and the thread's registration
    ::std::mutex spawner_lock;
    ::std::condition_variable_any spawner_cond;
    bool spawner_flag = false;
    ::std::unique_lock< ::std::mutex> locktite(spawner_lock);
    flx_detached_thread_t().init(prun_pthread_entry, data, gcp->collector->get_thread_control(),
      &spawner_lock, &spawner_cond,
      &spawner_flag
    );

    if (debug_driver)
      fprintf(stderr,
        "[prun: spawn_pthread] Thread %p waiting for spawned thread to register itself\n",
        (void*)get_current_native_thread());

    while (!spawner_flag)
      spawner_cond.wait(spawner_lock);

    if (debug_driver)
      fprintf(stderr,
        "[prun: spawn_pthread] Thread %p notes spawned thread has registered itself\n",
        (void*)get_current_native_thread());
  }
}
// ***************************************************
// Async_sched: ASYNC REQUEST DISPATCH
// ***************************************************
void async_sched::do_general()
{
  if (debug_driver)
    fprintf(stderr, "[prun %p: svc_general] from fthread=%p\n", (void*)mythrid(),ss->ft);

  if(debug_driver)
    fprintf(stderr, "[prun %p: svc_general] async=%p, ptr_create_async_hooker=%p\n",
      (void*)mythrid(), ss->active-> async,
      world->c->ptr_create_async_hooker)
    ;
  if (!ss->active->async)
  {
    ss->active->async = world->create_demux();
  }
  ++ss->active->async_count;
  if (debug_driver)
    fprintf(stderr,
       "[prun: svc_general] Async system created: %p, count %zu\n",
       ss->active->async,ss->active->async_count);
  ::flx::async::flx_driver_request_base *dreq =  ss->request->svc_general_req.pgeneral;
  if (debug_driver)
    fprintf(stderr, "[prun: svc_general] Request object %p\n", dreq);

  // requests are now ALWAYS considered asynchronous
  // even if the request handler reschedules them immediately
  ss->active->async->handle_request(dreq, ss->ft);
  if (debug_driver)
    fprintf(stderr, "[prun: svc_general] Request object %p captured fthread %p \n", dreq, ss->ft);
  if (debug_driver)
    fprintf(stderr, "[prun: svc_general] Request object %p\n", dreq);
  gcp->collector->add_root(ss->ft);
  ss->ft = 0;
  if(debug_driver)
    fprintf(stderr,"[prun: svc_general] request dispatched..\n");
}

// calls thread_control_t::yield which does a world stop check
static void sleep(thread_control_base_t *tc, size_t ns)
{
  assert(tc);
  tc->yield();
  ::std::this_thread::sleep_for(::std::chrono::milliseconds(1000)); // 1 second, temporarily
  ::std::this_thread::yield();
}


// ***************************************************
// Async_sched:  MASTER SCHEDULER
// ***************************************************
int async_sched::prun() {
sync_run:
  // RUN SYNCHRONOUS SCHEDULER
  if (debug_driver)
    fprintf(stderr, "prun %s %p: sync_run\n", str(thread_kind),(void*)mythrid());

  if (debug_driver)
    fprintf(stderr, "prun %s %p: Before running: Sync state is %s\n", str(thread_kind),(void*)mythrid(),
      ss->get_fpc_desc());

  sync_sched::fstate_t fs = ss->frun();

  if (debug_driver)
    fprintf(stderr, "prun %s %p: After running: Sync state is %s/%s\n", str(thread_kind),(void*)mythrid(),
      ss->get_fstate_desc(fs), ss->get_fpc_desc());

  switch(fs)
  {
    // HANDLE DELEGATED SERVICE REQUESTS
    case sync_sched::delegated:
      if (debug_driver)
        fprintf(stderr, "sync_sched %p:delegated request %d\n", str(thread_kind),(void*)mythrid(), ss->request->svc_req);
      switch (ss->request->svc_req)
      {
        case svc_spawn_pthread: do_spawn_pthread(); goto sync_run;
        case svc_spawn_process: do_spawn_process(); goto sync_run;

        case svc_general: do_general(); goto sync_run;

        default:
          fprintf(stderr,
            "prun: Unknown service request code 0x%x\n", ss->request->svc_req);
          abort();
      }

    // SCHEDULE ANY ASYNCHRONOUSLY QUEUED FTHREADS
    case sync_sched::blocked: // ran out of active threads - are there any in the async queue?
      --ss->active->busy_count;
      switch (thread_kind)
      {
        case mainline:
        case pthread:
          // gain exclusive control
          while(!ss->active->qisblocked.test_and_set());
          if (ss->active->async_count > 0)
          {
            if (debug_driver)
              fprintf(stderr, "prun: %s %p Async blocking\n", str(thread_kind), (void*)mythrid());
            ss->ft = ss->active->async->dequeue(); // get fibre
            gcp->collector->remove_root(ss->ft); // transfer ownership
            --ss->active->async_count; // accounting
            ss->active->qisblocked.clear(); // release control
            ++ss->active->busy_count;
            goto sync_run; // do work
          }
          if (ss->active->busy_count.load() == 0) {
            // no work to do, no jobs pending, and no workers to make work, so return
            if (debug_driver)
              fprintf(stderr, "prun: %s %p Async returning\n", str(thread_kind), (void*)mythrid());
            return 0;
          }
          else  // some processes are busy, they might make work so delay and retry later
          {
            ss->active->qisblocked.clear(); // release control
            // DELAY
            if (debug_driver)
              fprintf(stderr, "prun: %s %p Async delaying thread_count=%d, busy_count=%d\n",
                str(thread_kind), (void*)mythrid(), ss->active->thread_count.load(),ss->active->busy_count.load());
            sleep(gcp->collector->get_thread_control(), 10.00); // nanoseconds
            ++ss->active->busy_count;
            goto sync_run;
          }

        case process:
          if (ss->active->qisblocked.test_and_set())
          {
            if (ss->active->async_count > 0)
            {
              if (debug_driver)
                fprintf(stderr, "prun: %s %p Async WAIT\n", str(thread_kind), (void*)mythrid());
              auto ftp = ss->active->async->maybe_dequeue(); // get fibre
              if(ftp != nullptr) {
                ss->push_front(ftp);
                gcp->collector->remove_root(ftp); // transfer ownership
                --ss->active->async_count; // accounting
              }
              ss->active->qisblocked.clear(); // release control
              ++ss->active->busy_count;
              goto sync_run; // do work
            }
            if (ss->active->busy_count.load() == 0) {
              // no work to do, no jobs pending, and no workers to make work, so return
              if (debug_driver)
                fprintf(stderr, "prun: %s %p Async returning\n", str(thread_kind), (void*)mythrid());
              return 0;
            }
          }

          // DELAY
          sleep(gcp->collector->get_thread_control(), 10.00); // nanoseconds
          ++ss->active->busy_count;
          goto sync_run;

        case embedded:
          if (ss->active->qisblocked.test_and_set())
            if(nonblocking_schedule_queued_fthreads()) goto sync_run;
          return ss->active->async_count;
       }

    default:
      fprintf(stderr, "prun: Unknown frun return status 0x%4x\n", fs);
      abort();
  } // switch

}

// ***************************************************
// Async_sched:  COMPLETED ASYNC RETRIEVAL
// ***************************************************

// this routine is used when there are no fthreads left on the
// sync scheduler list
//
// assuming async is enabled, it checks to see if there are
// pending async jobs. If so and the block flag is set,
// it blocks the pthread until at least one of the pending jobs completes.
// The routine returns true of some async jobs completed and put on
// the sync scheduler active list.

// As it is, this routine cannot be called with block_flag set
// by multiple threads. First, critical sections are not protected.
// However even if they were, if two threads block with async->dequeue,
// then one might empty all the pending fibres out and return,
// leaving all the rest of the thread locked up.

// One solution is to simply poll to see if there's anything
// read to fetch. If so fetch it, fine. If not, return, wait a bit,
// and try again. This introduces an uncomfortable lag though.
//
// Another solution is to have the first thread block,
// and then have the other threads suspend with a condition variable.
// They check:
//  (a) there is no stuff on the active list
//  (b) there is stuff on the async list
//  (c) there is no thread already waiting on the async list
// If these conditions are met the thread goes to sleep and waits
// for a signal.
//
// Note if there is no thread waiting but (a) and (b) are met,
// the thread can dive in and become the waiter.


bool async_sched::nonblocking_schedule_queued_fthreads() {
  if (debug_driver) {
    fprintf(stderr,
      "prun %s %p: out of active synchronous threads, trying async, pending=%zu\n", str(thread_kind), (void*)mythrid(), ss->active->async_count);
  }
  int scheduled_some = 0;
  if (ss->active->async && ss->active->async_count > 0) {
      fthread_t* ftp = ss->active->async->maybe_dequeue();
      while (ftp) {
        if (debug_driver)
          fprintf(stderr, "prun %p:ret mode: Async Retrieving fthread %p\n", (void*)mythrid(), ftp);
        gcp->collector->remove_root(ftp);
        ss->push_front(ftp);
        --ss->active->async_count;
        ++scheduled_some;
        ftp = ss->active->async->maybe_dequeue();
    }
  }
  return scheduled_some != 0;
}


// ***************************************************
// Async_sched:  EXTERNAL MULTIWRITE HOOK
// ***************************************************

void async_sched::external_multi_swrite(::flx::rtl::schannel_t *chan, void *data)
  {
    ss->external_multi_swrite (chan,data);
  }


}} // namespaces

The Asynchronous I/O interface.

The embedding system depends on the interface but not the implementation.

//[flx_async.hpp]
#ifndef __FLX_ASYNC_H__
#define __FLX_ASYNC_H__
#include "flx_rtl_config.hpp"
#include "flx_rtl.hpp"
#include "pthread_bound_queue.hpp"

#ifdef BUILD_ASYNC
#define ASYNC_EXTERN FLX_EXPORT
#else
#define ASYNC_EXTERN FLX_IMPORT
#endif

// GLOBAL NAMESPACE!

class ASYNC_EXTERN async_hooker {
public:
  virtual flx::rtl::fthread_t *dequeue()=0;
  virtual flx::rtl::fthread_t *maybe_dequeue()=0;
  virtual void handle_request(::flx::async::flx_driver_request_base *data, flx::rtl::fthread_t *ss)=0;
  virtual ~async_hooker();
};

typedef
async_hooker *
create_async_hooker_t
(
  ::flx::pthread::thread_control_base_t*,
  int n0,   // bound on resumable thread queue
  int n1,   // bound on general input job queue
  int m1,   // number of threads in job pool
  int n2,   // bound on async fileio job queue
  int m2    // number of threads doing async fileio
);

extern "C" {
ASYNC_EXTERN async_hooker *
create_async_hooker
(
  ::flx::pthread::thread_control_base_t*,
  int n0,   // bound on resumable thread queue
  int n1,   // bound on general input job queue
  int m1,   // number of threads in job pool
  int n2,   // bound on async fileio job queue
  int m2    // number of threads doing async fileio
);
}

namespace flx { namespace async {
struct ASYNC_EXTERN finote_t
{
  virtual void signal()=0;
  virtual ~finote_t();
};

class ASYNC_EXTERN wakeup_fthread_t : public finote_t
{
  ::flx::rtl::fthread_t *f;
  ::flx::pthread::bound_queue_t *q;
public:
  wakeup_fthread_t(::flx::pthread::bound_queue_t *q_a, ::flx::rtl::fthread_t *f_a);
  void signal () { q->enqueue(f); }
};


class ASYNC_EXTERN flx_driver_request_base {
    finote_t *fn;
    virtual bool start_async_op_impl() = 0;
public:
    flx_driver_request_base();
    virtual ~flx_driver_request_base(); // so destructors work

    // returns finished flag (async may fail or immediately finish)
    void start_async_op(finote_t *fn_a);
    void notify_finished();
};

}}

#endif
//[flx_async.cpp]
#include "flx_async.hpp"
#include "pthread_bound_queue.hpp"
#include "flx_rtl.hpp"
#include <cassert>
#include <stdio.h>

using namespace ::flx::rtl;
using namespace ::flx::pthread;
using namespace ::flx::async;

async_hooker::~async_hooker(){ }

namespace flx { namespace async {

// FINISHED NOTIFIER
finote_t::~finote_t(){}

// DERIVED NOTIFIER WHICH DOES FTHREAD WAKEUP
// BY ENQUEUING THE FTHREAD INTO THE READY QUEUE
wakeup_fthread_t::wakeup_fthread_t(
  ::flx::pthread::bound_queue_t *q_a,
  ::flx::rtl::fthread_t *f_a)
: f(f_a), q(q_a) {}

// ASYNC HOOKER IMPLEMENTATION STAGE 1
// Introduces new virtual get_ready_queue().
class async_hooker_impl : public async_hooker {
public:
  virtual bound_queue_t *get_ready_queue()=0;
  ~async_hooker_impl() {}
  void handle_request(flx::async::flx_driver_request_base *pgeneral,fthread_t *ss)
  {
    flx::async::flx_driver_request_base* dreq = pgeneral;
    finote_t *fn = new wakeup_fthread_t(get_ready_queue(),ss);
    dreq->start_async_op(fn);
  }
};


// ASYNC HOOKER IMPLEMENTATION STAGE 2
// Provides the ready queue and the dequeuing operations
class proto_async : public async_hooker_impl
{
    bound_queue_t async_ready;

public:
   proto_async(thread_control_base_t *tc, int n0, int n1, int m1, int n2, int m2) :
     async_ready(tc,n0)
   {}

  ~proto_async(){}

  bound_queue_t *get_ready_queue() { return &async_ready; }

  fthread_t* dequeue()
  {
    return (fthread_t*)async_ready.dequeue();
  }
  fthread_t* maybe_dequeue()
  {
    return (fthread_t*)async_ready.maybe_dequeue();
  }
};


// DRIVER REQUEST BASE
// THIS IS USED TO BUILD REQUESTS
// PROVIDES DEFAULT NOTIFY_FINISHED ROUTINE WHICH USE FINOTE SIGNAL
// DO ASYNC OP JUST CALLS DRIVED CLASS DO_ASYNC_OP_IMPL
flx_driver_request_base::flx_driver_request_base() : fn(0) {}
flx_driver_request_base::~flx_driver_request_base() {}       // so destructors work

void flx_driver_request_base:: start_async_op(finote_t *fn_a)
{
  //fprintf(stderr,"start async op %p, set fn = %p\n",this,fn_a);
  assert(fn==0);
  fn = fn_a;
  bool completed =  start_async_op_impl();
  if(completed)
  {
    fprintf(stderr,"instant complete\n");
    notify_finished();
  }
  else
  {
    //fprintf(stderr,"Pending\n");
  }
}

void flx_driver_request_base:: notify_finished()
{
  //fprintf(stderr, "faio_req=%p, Notify finished %p\n", this,fn);
  assert(fn!=0);
  finote_t *fin = fn;
  fn=0;
  fin->signal();
  delete fin;
  //fprintf(stderr, "faio_req=%p, FINISHED\n",this);
}

}}

async_hooker *create_async_hooker(thread_control_base_t *tc, int n0,int n1,int m1,int n2,int m2) {
  return new ::flx::async::proto_async(tc,n0,n1,m1,n2,m2);
}

Config

//[unix_flx_async.fpc]
Name: flx_async
Description: Async hook
provides_dlib: -lflx_async_dynamic
provides_slib: -lflx_async_static
includes: '"flx_async.hpp"'
Requires: flx_pthread flx_gc flx
macros: BUILD_ASYNC
library: flx_async
srcdir: src/flx_async
src: .*\.cpp
//[win_flx_async.fpc]
Name: flx_async
Description: Async hook
provides_dlib: /DEFAULTLIB:flx_async_dynamic
provides_slib: /DEFAULTLIB:flx_async_static
includes: '"flx_async.hpp"'
Requires: flx_pthread flx_gc flx
macros: BUILD_ASYNC
library: flx_async
srcdir: src/flx_async
src: .*\.cpp
#[flx_async.py]
import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record
from fbuild.builders.file import copy

import buildsystem

# ------------------------------------------------------------------------------

def build_runtime(phase):
    path = Path (phase.ctx.buildroot/'share'/'src/flx_async')
    #buildsystem.copy_hpps_to_rtl(phase.ctx,
    #    path / 'flx_async.hpp',
    #)

    dst = 'host/lib/rtl/flx_async'
    suffix = '.so'
    srcs = [phase.ctx.buildroot/'share'/'src/flx_async/flx_async.cpp']
    includes = [
        phase.ctx.buildroot / 'host/lib/rtl',
        phase.ctx.buildroot / 'share/lib/rtl'
    ]
    macros = ['BUILD_ASYNC']
    libs = [
        call('buildsystem.flx_pthread.build_runtime', phase),
        call('buildsystem.flx_gc.build_runtime', phase),
    ]

    return Record(
        static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            libs=[lib.static for lib in libs]),
        shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            libs=[lib.shared for lib in libs]))