Package: src/packages/sync.fdoc

Synchronous Scheduler

key file
flx_sync.hpp share/lib/rtl/flx_sync.hpp
flx_sync.cpp share/src/rtl/flx_sync.cpp

Synchronous Support System

This class encapsulate the core Felix synchronous scheduling mechanism and services synchronous service calls.

The scheduler method frun executes fthread_t fibres from the scheduler queue active, performing synchronous service calls made by the fibres until it is unable to proceed.

It then suspends and returns a code indication one of two conditions. Either the scheduler is blocked because there are no more active fibres on the queue to resume, or, it has received a non-synchronous service request it is unable to satisfy, in which case it returns delegated indicating it is delegating the responsibility to satisfy the service request to its caller.

The variable request contains the service call which the scheduler is delegating.

The scheduler itself is a finite state machine with three states: it is ready to resume the current fibre, it is ready to get the next fibre from the queue, or it is blocked because the current fibre has gone and the queue is empty.

Synchronous reads and writes can suspend or activate fibres. The special external multiwrite provides a way to populate the scheduler queue externally by pushing waiting fibres off a synchronous channel into the active queue.

//[flx_sync.hpp]

#ifndef __FLX_SYNC_H__
#define __FLX_SYNC_H__

#include "flx_gc.hpp"
#include "flx_rtl.hpp"
#include <list>
#include <atomic>
#include "flx_async.hpp"
#include "pthread_thread.hpp"

namespace flx { namespace run {

// *************************************
// fthread_list has grown to include the async control object
// and its ready list
//
// this object contains the data shared by multiple pthreads
// pooled to run coroutines concurrently
// *************************************

struct RTL_EXTERN fthread_list {
  ::flx::gc::generic::gc_profile_t *gcp;
  fthread_list(fthread_list const&) = delete;
  fthread_list& operator=(fthread_list const&) = delete;
public:
  // INVARIANT fthread_first==nullptr equiv fthread_last=nullptr
  ::flx::rtl::fthread_t *fthread_first; // has to be public for shape spec
  ::flx::rtl::fthread_t *fthread_last; // WEAK

  ::std::atomic_flag qisblocked;

  // FIXME: THESE SHOULDNT BE ATOMIC BECAUSE IDIOT C++ MIGHT MUTEX WRAP THEM
  // Instead they should only be used inside our spinlock
  ::std::atomic<int> thread_count; // n threads sharing list
  ::std::atomic<int> busy_count; // n threads actually working

//  ::std::mutex active_lock;
//  ::std::mutex *pactive_lock;

  bool lockneeded;
  ::std::atomic_flag active_lock;

  size_t async_count; // pending async jobs
  async_hooker* async; // async dispatch and ready list object

  fthread_list (::flx::gc::generic::gc_profile_t *gcp);
  ~fthread_list ();

  void push_back(::flx::rtl::fthread_t *);
  void push_front(::flx::rtl::fthread_t *);
  ::flx::rtl::fthread_t *pop_front();

  // DIAGNOSTICS ONLY
  size_t size() const;
  ::flx::rtl::fthread_t *front()const;
};
RTL_EXTERN extern ::flx::gc::generic::gc_shape_t fthread_list_ptr_map;


// This class handles synchronous channel I/O and fthreads
struct RTL_EXTERN sync_sched {
  sync_sched () = delete;
  sync_sched (sync_sched const&) = delete;
  sync_sched &operator=(sync_sched const&) = delete;

  bool debug_driver;

  // the garbage collector and general control object
  ::flx::gc::generic::collector_t *collector;

  // scheduler queue
  fthread_list *active;

  // temporary for currently running fibre
  ::flx::rtl::fthread_t *ft;

  // variable to hold service request
  ::flx::rtl::svc_req_t *request;

  // type for the state of the scheduler
  // when it suspends by returning.
  enum fstate_t { blocked, delegated };

  // debugging helper to get a description of
  // the suspended scheduler state
  static char const * get_fstate_desc(fstate_t);

  // debugging helper to get a description of
  // the running scheduler state
  char const * get_fpc_desc();

  sync_sched (
    bool debug_driver_,
    ::flx::gc::generic::gc_profile_t *gcp_,
    fthread_list *active_
  );

private:
  // helper routines.
  void impl_push_front(::flx::rtl::fthread_t*);

public:
  void push_front(::flx::rtl::fthread_t*);
  fstate_t frun();

  // a special routine to allow a multiwrite to be performed
  // from outside the scheduler whilst it is suspended.
  void external_multi_swrite(::flx::rtl::schannel_t*, void*);
protected:
  // handlers for synchronous service calls.
  void do_yield();
  void do_spawn_fthread();
  void do_schedule_fthread();
  void do_sread();
  void do_swrite();
  void do_multi_swrite();
  void do_kill();
  void show_state();
};

RTL_EXTERN extern ::flx::gc::generic::gc_shape_t sync_sched_ptr_map;


}}

#endif
//[flx_sync.cpp]

#include <stdio.h>

#include "flx_sync.hpp"

using namespace flx::rtl;

namespace flx { namespace run {

// ********************************************************
// SHAPE for sync_sched
// ********************************************************

static const std::size_t sync_sched_offsets[2]={
    offsetof(sync_sched,active),
    offsetof(sync_sched,ft)
};

static ::flx::gc::generic::offset_data_t const sync_sched_offset_data = { 2, sync_sched_offsets };

::flx::gc::generic::gc_shape_t sync_sched_ptr_map = {
  "rtl::sync_sched",
  1,sizeof(sync_sched),
  0, // no finaliser,
  0, // fcops
  &sync_sched_offset_data,
  ::flx::gc::generic::scan_by_offsets,
  0,0, // no serialisation as yet
  ::flx::gc::generic::gc_flags_default,
  0UL, 0UL
};



// ***************************************************
// fthread_list
// ***************************************************
fthread_list::fthread_list(::flx::gc::generic::gc_profile_t *gcp_) :
  thread_count(1),
  busy_count(0),
  async_count(0),
  async(nullptr),
  //pactive_lock(nullptr),
  lockneeded(false),
  active_lock(),
  gcp(gcp_),
  fthread_first(nullptr),
  fthread_last(nullptr)
{
  qisblocked.clear();
  active_lock.clear();
}
fthread_list::~fthread_list () {
  fprintf(stderr,"[fthread_list: destructor] Pthread %p delete async queue\n",(void*)::flx::pthread::mythrid());
  delete async;
}


fthread_t *fthread_list::front() const {
  return fthread_first;
}

fthread_t *fthread_list::pop_front() {
  auto tmp = fthread_first;
  if (!tmp) return nullptr; // queue empty

  // point at next
  fthread_first = tmp->next;
  // if next is null, null out last pointer
  if(!fthread_first) fthread_last = nullptr;

  tmp->next = nullptr; // for GC, null out link
  return tmp;
}

// INVARIANT fthread_first==nullptr equiv fthread_last=nullptr
// PRECONDITION: p != nullptr
void fthread_list::push_front(fthread_t *p) {
  p->next = fthread_first;
  fthread_first = p;
  if (!fthread_last) fthread_last = p;
}

// INVARIANT fthread_first==nullptr equiv fthread_last=nullptr
// PRECONDITION: p != nullptr
void fthread_list::push_back(fthread_t *p) {
  if(!fthread_last) fthread_first=fthread_last=p;
  else {
    fthread_last->next = p;
    fthread_last = p;
  }
}

size_t fthread_list::size()const {
  auto count = 0;
  for(auto it=fthread_first; it; it=it->next)++count; return count;
}

// ********************************************************
// SHAPE for fthread_list
// ********************************************************

static const std::size_t fthread_list_offsets[1]={
    offsetof(fthread_list,fthread_first) // fthread_last is weak
};

static ::flx::gc::generic::offset_data_t const fthread_list_offset_data = { 1, fthread_list_offsets };

::flx::gc::generic::gc_shape_t fthread_list_ptr_map = {
  "rtl::fthread_list",
  1,sizeof(fthread_list),
  0, // no finaliser,
  0, // fcops
  &fthread_list_offset_data,
  ::flx::gc::generic::scan_by_offsets,
  0,0, // no serialisation as yet
  ::flx::gc::generic::gc_flags_default,
  0UL, 0UL
};


// ***************************************************
// sync_sched
// ***************************************************
char const *sync_sched::get_fstate_desc(fstate_t fs)
{
  switch(fs)
  {
    case blocked: return "blocked";
    case delegated: return "delegated";
    default: return "Illegal fstate_t";
  }
}

char const *sync_sched::get_fpc_desc()
{
  if (ft)
    return "Next request pos";
  else
  {
    if (active->size() > 0) return "Next fthread pos";
    else return "Out of active threads";
  }
}


sync_sched::sync_sched (
  bool debug_driver_,
  ::flx::gc::generic::gc_profile_t *gcp_,
  fthread_list *active_
) :
  debug_driver(debug_driver_),
  collector(gcp_->collector),
  active(active_),
  ft(nullptr)
{}


void sync_sched::show_state () {
    if (debug_driver)
      fprintf(stderr, "CUR[%p] ACT[%p]\n",ft,
        active->size()?active->front():NULL);
  }

// used by async to activate fthread in ready (async complete) queue
void sync_sched::push_front(fthread_t *f) {
  spinguard dummy(active->lockneeded,&(active->active_lock));
  impl_push_front(f);
}
void sync_sched::impl_push_front(fthread_t *f)
  {
    if(ft) active->push_front(ft);
    ft = f;
  }

void sync_sched::do_yield()
    {
      if(debug_driver)
         fprintf(stderr,"[sync: svc_yield] yield");

      spinguard dummy(active->lockneeded,&(active->active_lock));
      active->push_back(ft);
      ft = active->pop_front();
    }

void sync_sched::do_spawn_fthread()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      fthread_t *ftx = request->svc_fthread_req.fthread;
      if(debug_driver)
        fprintf(stderr,"[sync: svc_spawn_fthread] Spawn fthread %p\n",ftx);
      impl_push_front(ftx);
    }

void sync_sched::do_schedule_fthread()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      fthread_t *ftx = request->svc_fthread_req.fthread;
      if(debug_driver)
        fprintf(stderr,"[sync: svc_schedule_fthread] Schedule fthread %p\n",ftx);
      active->push_back(ftx);
    }

// FIXME: HANDLE NULL. Read & Write variable addresses can be NULL
// if the data type is unit
void sync_sched::do_sread()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      svc_sio_req_t pr = request->svc_sio_req;
      schannel_t *chan = pr.chan;
      if(debug_driver)
        fprintf(stderr,"[sync: svc_read] Fibre %p Request to read on channel %p\n",ft,chan);
      if(chan==NULL) goto svc_read_none;
    svc_read_next:
      {
        fthread_t *writer= chan->pop_writer();
        if(writer == 0) goto svc_read_none;       // no writers
        if(writer->cc == 0) goto svc_read_next;   // killed
        svc_sio_req_t pw = writer->get_svc()->svc_sio_req;
        if (pr.data && pw.data) {
          if(debug_driver)
            fprintf(stderr,"[sync: svc_read] Writer @%p=%p, read into %p\n",
              pw.data,*pw.data, pr.data);
          *pr.data= *pw.data;
        }
        if(debug_driver)
          fprintf(stderr,"[sync: svc_read] current fibre %p FED, fibre %p UNBLOCKED\n",ft, writer);

        // WE are the reader, stay current, push writer
        // onto active list
        active->push_front(writer);
show_state();
        return;
      }

    svc_read_none:
      if(debug_driver)
        fprintf(stderr,"[sync: svc_read] No writers on channel %p: fibre %p HUNGRY\n",chan,ft);
      chan->push_reader(ft);
      ft = active->pop_front();
show_state();
      return;
    }

void sync_sched::do_swrite()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      svc_sio_req_t pw = request->svc_sio_req;
      schannel_t *chan = pw.chan;
      if(debug_driver)
         fprintf(stderr,"[sync: svc_write] Fibre %p Request to write on channel %p\n",ft,chan);
      if(chan==NULL)goto svc_write_none;
    svc_write_next:
      {
        fthread_t *reader= chan->pop_reader();
        if(reader == 0) goto svc_write_none;     // no readers
        if(reader->cc == 0) goto svc_write_next; // killed
        svc_sio_req_t pr = reader->get_svc()->svc_sio_req;
        if (pr.data && pw.data) {
          if(debug_driver)
            fprintf(stderr,"[sync: svc_write] Writer @%p=%p, read into %p\n",
              pw.data,*pw.data, pr.data);
          *pr.data= *pw.data;
        }
        if(debug_driver)
          fprintf(stderr,"[sync: svc_write] hungry fibre %p FED\n",reader);

        // WE are the writer, push us onto the active list
        // and make the reader on the channel current
        impl_push_front(reader);
show_state();
        return;
      }
    svc_write_none:
      if(debug_driver)
        fprintf(stderr,"[sync: svc_write] No readers on channel %p: fibre %p BLOCKING\n",chan,ft);
      chan->push_writer(ft);
      ft = active->pop_front();
show_state();
      return;
    }

// NOTE: not protected by mutex
void sync_sched::external_multi_swrite (schannel_t *chan, void *data)
    {
      if(chan==NULL) return;
    svc_multi_write_next:
      fthread_t *reader= chan->pop_reader();
      if(reader == 0)  return;    // no readers left
      if(reader->cc == 0) goto svc_multi_write_next; // killed
      {
        svc_sio_req_t pr = reader->get_svc()->svc_sio_req;
        if(debug_driver)
           fprintf(stderr,"[sync: svc_multi_write] Write data %p, read into %p\n",
             data, pr.data);
        *pr.data = data;
        impl_push_front(reader);
      }
      goto svc_multi_write_next;
    }

void sync_sched::do_multi_swrite()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      svc_sio_req_t pw = request->svc_sio_req;
      void *data = pw.data;
      schannel_t *chan = pw.chan;
      if(debug_driver)
        fprintf(stderr,"[sync: svc_multi_write] Request to write on channel %p\n",chan);
      external_multi_swrite (chan, data);
    }

void sync_sched::do_kill()
    {
      spinguard dummy(active->lockneeded,&(active->active_lock));
      fthread_t *ftx = request->svc_fthread_req.fthread;
      if(debug_driver)fprintf(stderr,"[sync: svc_kill] Request to kill fthread %p\n",ftx);
      ftx -> kill();
      return;
    }


// NOTE: the currently running fibre variable is owned
// by this sync scheduler and is not shared, so access to
// it does not required serialisation

sync_sched::fstate_t sync_sched::frun()
{
  if (debug_driver)
     fprintf(stderr,"[sync] frun: pthread %p, entry ft=%p, active size=%d\n",
        (void*)::flx::pthread::mythrid(), ft,(int)active->size());
dispatch:
  if (ft == 0) {
     spinguard dummy(active->lockneeded,&(active->active_lock));
     ft = active->pop_front();
     if (debug_driver)
       fprintf(stderr,"[sync] pthread %p fetching fthread %p\n",(void*)::flx::pthread::mythrid(),ft);
  }
  if (ft == 0) {
    return blocked;
  }
  request = ft->run();        // run fthread to get request
  if(request == 0)            // euthenasia request
  {
    spinguard dummy(active->lockneeded,&(active->active_lock));
    ft = 0;
    goto dispatch;
  }

  if (debug_driver)
    fprintf(stderr,"[flx_sync:sync_sched] dispatching service request %d\n", request->svc_req);
  switch(request->svc_req)
  {
    case svc_yield: do_yield(); goto dispatch;

    case svc_spawn_fthread : do_spawn_fthread(); goto dispatch;
    case svc_schedule_fthread: do_schedule_fthread(); goto dispatch;

    case svc_sread: do_sread(); goto dispatch;

    case svc_swrite: do_swrite(); goto dispatch;

    case svc_multi_swrite: do_multi_swrite(); goto dispatch;

    case svc_kill: do_kill(); goto dispatch;

    default:
      return delegated;
  }
}

}}