Package: src/packages/async.fdoc
Asynchronous I/O and thread scheduling¶
key | file |
---|---|
flx_async_world.hpp | share/lib/rtl/flx_async_world.hpp |
flx_async_world.cpp | share/src/rtl/flx_async_world.cpp |
flx_async.hpp | share/lib/rtl/flx_async.hpp |
flx_async.cpp | share/src/flx_async/flx_async.cpp |
flx_async.py | $PWD/buildsystem/flx_async.py |
unix_flx_async.fpc | $PWD/src/config/unix/flx_async.fpc |
win_flx_async.fpc | $PWD/src/config/win/flx_async.fpc |
The Asychronous Support System¶
//[flx_async_world.hpp]
#ifndef __flx_async_world_H_
#define __flx_async_world_H_
#include "flx_gc.hpp"
#include "flx_collector.hpp"
#include "flx_sync.hpp"
namespace flx { namespace run {
// This class handles pthreads and asynchronous I/O
// It shares operations with sync_sched by interleaving
// based on state variables.
//
// NOTE: currently async_sched is NOT garbage collected
// Hence, the synchronous scheduler is creates must
// be made a GC root
struct RTL_EXTERN async_sched
{
enum thread_kind_t {mainline,embedded,pthread,process};
thread_kind_t thread_kind;
static char const *str(thread_kind_t);
// weak pointer
struct flx_world *world;
bool debug_driver;
// weak pointer
::flx::gc::generic::gc_profile_t *gcp;
// Strong pointer
sync_sched *ss; // (d, gcp, active), (ft, request), (pc, fs)
async_sched(
flx_world *world_arg,
bool d,
::flx::gc::generic::gc_profile_t *g,
fthread_list *a, thread_kind_t
);
~async_sched();
int prun();
void do_spawn_pthread();
void do_spawn_process();
void spawn_impl(fthread_list*, thread_kind_t);
void do_general();
void external_multi_swrite(::flx::rtl::schannel_t *, void *data);
private:
bool nonblocking_schedule_queued_fthreads();
};
RTL_EXTERN extern ::flx::gc::generic::gc_shape_t async_sched_ptr_map;
}} // namespaces
#endif //__flx_async_world_H_
//[flx_async_world.cpp ]
#include "flx_world.hpp"
#include "flx_async_world.hpp"
#include "flx_sync.hpp"
#include <assert.h>
using namespace ::flx::rtl;
using namespace ::flx::pthread;
namespace flx { namespace run {
// ********************************************************
// SHAPE for async_sched
// ********************************************************
static const std::size_t async_sched_offsets[1]={
offsetof(async_sched,ss)
};
static ::flx::gc::generic::offset_data_t const async_sched_offset_data = { 1, async_sched_offsets };
::flx::gc::generic::gc_shape_t async_sched_ptr_map = {
"flx::run::async_sched",
1,sizeof(async_sched),
0, // no finaliser,
0, // fcops
&async_sched_offset_data,
::flx::gc::generic::scan_by_offsets,
0,0, // no serialisation as yet
::flx::gc::generic::gc_flags_default,
0UL, 0UL
};
// ***************************************************
// Async_sched: Thread kind pretty printer
// ***************************************************
char const *async_sched::str(thread_kind_t k) {
switch (k) {
case mainline: return "mainline";
case embedded: return "embedded";
case pthread: return "pthread";
case process: return "process";
}
}
// ***************************************************
// Async_sched: CONSTRUCTOR
// ***************************************************
async_sched::async_sched(
flx_world *world_arg,
bool d,
::flx::gc::generic::gc_profile_t *g,
fthread_list *a, thread_kind_t k
) :
world(world_arg),
debug_driver(d),
gcp(g),
thread_kind(k)
{
ss = new(*gcp,sync_sched_ptr_map, false) sync_sched(debug_driver, gcp, a);
++a->thread_count;
++a->busy_count;
if (debug_driver)
fprintf(stderr, "prun %p: async scheduler, creating and rooting synchronous scheduler! threads=1,busy=1\n",(void*)mythrid());
}
// ***************************************************
// Async_sched: DESTRUCTOR
// ***************************************************
async_sched::~async_sched() {
try
{
--ss->active->thread_count;
if (debug_driver)
fprintf(stderr, "prun %p: Terminating async scheduler, threads=%d\n",(void*)mythrid(), ss->active->thread_count.load());
if (debug_driver)
fprintf(stderr, "prun %p: async scheduler, fibre queue length %d, async_cound=%d\n",
(void*)mythrid(), ss->active->size(), ss->active->async_count);
if (debug_driver)
fprintf(stderr, "prun %p: async scheduler returns!\n",(void*)mythrid());
}
catch (...) { fprintf(stderr, "Unknown exception deleting async!\n"); }
}
// ***************************************************
// Async_sched: Thread procedure
// ***************************************************
static void prun_pthread_entry(void *data) {
async_sched *d = (async_sched*)data;
d->prun();
}
// ***************************************************
// Async_sched: SPAWNING
// ***************************************************
// SPAWNING A NEW FELIX PTHREAD
// CREATES ITS OWN PRIVATE ASYNC SCHEDULER
// CREATES ITS OWN PRIVATE SYNC SCHEDULER
// SHARES WORLD INCLUDING COLLECTOR
// REGISTERS IN THREAD_CONTROL
void async_sched::do_spawn_process()
{
// this is safe (at the moment) because, if the active list
// is already in use by other processes, we're just overwriting
// the lock pointer with its existing value. If the list isn't
// in use by other processes, the lock pointer is NULL,
// but this thread is the one running the current process,
// so it can't race with itself.
ss->active->lockneeded = true;
spawn_impl (ss->active,process);
}
void async_sched::do_spawn_pthread()
{
fthread_list *pactive = new(*gcp, ::flx::run::fthread_list_ptr_map,false) fthread_list(gcp);
spawn_impl (pactive,pthread);
}
void async_sched::spawn_impl(fthread_list *pactive, thread_kind_t k) {
fthread_t *ftx = ss->request->svc_fthread_req.fthread;
if (debug_driver)
fprintf(stderr, "[prun %p: spawn_pthread] Spawn pthread %p\n", (void*)mythrid(), ftx);
{
spinguard dummy(pactive->lockneeded, &(pactive->active_lock));
// SHOULD THIS BE HERE?? The async scheduler isn't created yet.
// maybe we should do this "properly" after it is (in the next statement!)
// NO NO! This is ALL BAD! Some OTHER thread might run this routine!
pactive->push_front(ftx);
}
void *data = new (*gcp, async_sched_ptr_map, false) async_sched(world,debug_driver, gcp, pactive,k);
if (debug_driver)
fprintf(stderr, "[prun %p: spawn_pthread] Starting new pthread, thread counter= %zu\n",
(void*)mythrid(), gcp->collector->get_thread_control()->thread_count());
{
// We use a hard (not Felix aware) lock here
// because the Felix system is in an incoherent state
// between the OS thread spawn, and the thread's registration
::std::mutex spawner_lock;
::std::condition_variable_any spawner_cond;
bool spawner_flag = false;
::std::unique_lock< ::std::mutex> locktite(spawner_lock);
flx_detached_thread_t().init(prun_pthread_entry, data, gcp->collector->get_thread_control(),
&spawner_lock, &spawner_cond,
&spawner_flag
);
if (debug_driver)
fprintf(stderr,
"[prun: spawn_pthread] Thread %p waiting for spawned thread to register itself\n",
(void*)get_current_native_thread());
while (!spawner_flag)
spawner_cond.wait(spawner_lock);
if (debug_driver)
fprintf(stderr,
"[prun: spawn_pthread] Thread %p notes spawned thread has registered itself\n",
(void*)get_current_native_thread());
}
}
// ***************************************************
// Async_sched: ASYNC REQUEST DISPATCH
// ***************************************************
void async_sched::do_general()
{
if (debug_driver)
fprintf(stderr, "[prun %p: svc_general] from fthread=%p\n", (void*)mythrid(),ss->ft);
if(debug_driver)
fprintf(stderr, "[prun %p: svc_general] async=%p, ptr_create_async_hooker=%p\n",
(void*)mythrid(), ss->active-> async,
world->c->ptr_create_async_hooker)
;
if (!ss->active->async)
{
ss->active->async = world->create_demux();
}
++ss->active->async_count;
if (debug_driver)
fprintf(stderr,
"[prun: svc_general] Async system created: %p, count %zu\n",
ss->active->async,ss->active->async_count);
::flx::async::flx_driver_request_base *dreq = ss->request->svc_general_req.pgeneral;
if (debug_driver)
fprintf(stderr, "[prun: svc_general] Request object %p\n", dreq);
// requests are now ALWAYS considered asynchronous
// even if the request handler reschedules them immediately
ss->active->async->handle_request(dreq, ss->ft);
if (debug_driver)
fprintf(stderr, "[prun: svc_general] Request object %p captured fthread %p \n", dreq, ss->ft);
if (debug_driver)
fprintf(stderr, "[prun: svc_general] Request object %p\n", dreq);
gcp->collector->add_root(ss->ft);
ss->ft = 0;
if(debug_driver)
fprintf(stderr,"[prun: svc_general] request dispatched..\n");
}
// calls thread_control_t::yield which does a world stop check
static void sleep(thread_control_base_t *tc, size_t ns)
{
assert(tc);
tc->yield();
::std::this_thread::sleep_for(::std::chrono::milliseconds(1000)); // 1 second, temporarily
::std::this_thread::yield();
}
// ***************************************************
// Async_sched: MASTER SCHEDULER
// ***************************************************
int async_sched::prun() {
sync_run:
// RUN SYNCHRONOUS SCHEDULER
if (debug_driver)
fprintf(stderr, "prun %s %p: sync_run\n", str(thread_kind),(void*)mythrid());
if (debug_driver)
fprintf(stderr, "prun %s %p: Before running: Sync state is %s\n", str(thread_kind),(void*)mythrid(),
ss->get_fpc_desc());
sync_sched::fstate_t fs = ss->frun();
if (debug_driver)
fprintf(stderr, "prun %s %p: After running: Sync state is %s/%s\n", str(thread_kind),(void*)mythrid(),
ss->get_fstate_desc(fs), ss->get_fpc_desc());
switch(fs)
{
// HANDLE DELEGATED SERVICE REQUESTS
case sync_sched::delegated:
if (debug_driver)
fprintf(stderr, "sync_sched %p:delegated request %d\n", str(thread_kind),(void*)mythrid(), ss->request->svc_req);
switch (ss->request->svc_req)
{
case svc_spawn_pthread: do_spawn_pthread(); goto sync_run;
case svc_spawn_process: do_spawn_process(); goto sync_run;
case svc_general: do_general(); goto sync_run;
default:
fprintf(stderr,
"prun: Unknown service request code 0x%x\n", ss->request->svc_req);
abort();
}
// SCHEDULE ANY ASYNCHRONOUSLY QUEUED FTHREADS
case sync_sched::blocked: // ran out of active threads - are there any in the async queue?
--ss->active->busy_count;
switch (thread_kind)
{
case mainline:
case pthread:
// gain exclusive control
while(!ss->active->qisblocked.test_and_set());
if (ss->active->async_count > 0)
{
if (debug_driver)
fprintf(stderr, "prun: %s %p Async blocking\n", str(thread_kind), (void*)mythrid());
ss->ft = ss->active->async->dequeue(); // get fibre
gcp->collector->remove_root(ss->ft); // transfer ownership
--ss->active->async_count; // accounting
ss->active->qisblocked.clear(); // release control
++ss->active->busy_count;
goto sync_run; // do work
}
if (ss->active->busy_count.load() == 0) {
// no work to do, no jobs pending, and no workers to make work, so return
if (debug_driver)
fprintf(stderr, "prun: %s %p Async returning\n", str(thread_kind), (void*)mythrid());
return 0;
}
else // some processes are busy, they might make work so delay and retry later
{
ss->active->qisblocked.clear(); // release control
// DELAY
if (debug_driver)
fprintf(stderr, "prun: %s %p Async delaying thread_count=%d, busy_count=%d\n",
str(thread_kind), (void*)mythrid(), ss->active->thread_count.load(),ss->active->busy_count.load());
sleep(gcp->collector->get_thread_control(), 10.00); // nanoseconds
++ss->active->busy_count;
goto sync_run;
}
case process:
if (ss->active->qisblocked.test_and_set())
{
if (ss->active->async_count > 0)
{
if (debug_driver)
fprintf(stderr, "prun: %s %p Async WAIT\n", str(thread_kind), (void*)mythrid());
auto ftp = ss->active->async->maybe_dequeue(); // get fibre
if(ftp != nullptr) {
ss->push_front(ftp);
gcp->collector->remove_root(ftp); // transfer ownership
--ss->active->async_count; // accounting
}
ss->active->qisblocked.clear(); // release control
++ss->active->busy_count;
goto sync_run; // do work
}
if (ss->active->busy_count.load() == 0) {
// no work to do, no jobs pending, and no workers to make work, so return
if (debug_driver)
fprintf(stderr, "prun: %s %p Async returning\n", str(thread_kind), (void*)mythrid());
return 0;
}
}
// DELAY
sleep(gcp->collector->get_thread_control(), 10.00); // nanoseconds
++ss->active->busy_count;
goto sync_run;
case embedded:
if (ss->active->qisblocked.test_and_set())
if(nonblocking_schedule_queued_fthreads()) goto sync_run;
return ss->active->async_count;
}
default:
fprintf(stderr, "prun: Unknown frun return status 0x%4x\n", fs);
abort();
} // switch
}
// ***************************************************
// Async_sched: COMPLETED ASYNC RETRIEVAL
// ***************************************************
// this routine is used when there are no fthreads left on the
// sync scheduler list
//
// assuming async is enabled, it checks to see if there are
// pending async jobs. If so and the block flag is set,
// it blocks the pthread until at least one of the pending jobs completes.
// The routine returns true of some async jobs completed and put on
// the sync scheduler active list.
// As it is, this routine cannot be called with block_flag set
// by multiple threads. First, critical sections are not protected.
// However even if they were, if two threads block with async->dequeue,
// then one might empty all the pending fibres out and return,
// leaving all the rest of the thread locked up.
// One solution is to simply poll to see if there's anything
// read to fetch. If so fetch it, fine. If not, return, wait a bit,
// and try again. This introduces an uncomfortable lag though.
//
// Another solution is to have the first thread block,
// and then have the other threads suspend with a condition variable.
// They check:
// (a) there is no stuff on the active list
// (b) there is stuff on the async list
// (c) there is no thread already waiting on the async list
// If these conditions are met the thread goes to sleep and waits
// for a signal.
//
// Note if there is no thread waiting but (a) and (b) are met,
// the thread can dive in and become the waiter.
bool async_sched::nonblocking_schedule_queued_fthreads() {
if (debug_driver) {
fprintf(stderr,
"prun %s %p: out of active synchronous threads, trying async, pending=%zu\n", str(thread_kind), (void*)mythrid(), ss->active->async_count);
}
int scheduled_some = 0;
if (ss->active->async && ss->active->async_count > 0) {
fthread_t* ftp = ss->active->async->maybe_dequeue();
while (ftp) {
if (debug_driver)
fprintf(stderr, "prun %p:ret mode: Async Retrieving fthread %p\n", (void*)mythrid(), ftp);
gcp->collector->remove_root(ftp);
ss->push_front(ftp);
--ss->active->async_count;
++scheduled_some;
ftp = ss->active->async->maybe_dequeue();
}
}
return scheduled_some != 0;
}
// ***************************************************
// Async_sched: EXTERNAL MULTIWRITE HOOK
// ***************************************************
void async_sched::external_multi_swrite(::flx::rtl::schannel_t *chan, void *data)
{
ss->external_multi_swrite (chan,data);
}
}} // namespaces
The Asynchronous I/O interface.¶
The embedding system depends on the interface but not the implementation.
//[flx_async.hpp]
#ifndef __FLX_ASYNC_H__
#define __FLX_ASYNC_H__
#include "flx_rtl_config.hpp"
#include "flx_rtl.hpp"
#include "pthread_bound_queue.hpp"
#ifdef BUILD_ASYNC
#define ASYNC_EXTERN FLX_EXPORT
#else
#define ASYNC_EXTERN FLX_IMPORT
#endif
// GLOBAL NAMESPACE!
class ASYNC_EXTERN async_hooker {
public:
virtual flx::rtl::fthread_t *dequeue()=0;
virtual flx::rtl::fthread_t *maybe_dequeue()=0;
virtual void handle_request(::flx::async::flx_driver_request_base *data, flx::rtl::fthread_t *ss)=0;
virtual ~async_hooker();
};
typedef
async_hooker *
create_async_hooker_t
(
::flx::pthread::thread_control_base_t*,
int n0, // bound on resumable thread queue
int n1, // bound on general input job queue
int m1, // number of threads in job pool
int n2, // bound on async fileio job queue
int m2 // number of threads doing async fileio
);
extern "C" {
ASYNC_EXTERN async_hooker *
create_async_hooker
(
::flx::pthread::thread_control_base_t*,
int n0, // bound on resumable thread queue
int n1, // bound on general input job queue
int m1, // number of threads in job pool
int n2, // bound on async fileio job queue
int m2 // number of threads doing async fileio
);
}
namespace flx { namespace async {
struct ASYNC_EXTERN finote_t
{
virtual void signal()=0;
virtual ~finote_t();
};
class ASYNC_EXTERN wakeup_fthread_t : public finote_t
{
::flx::rtl::fthread_t *f;
::flx::pthread::bound_queue_t *q;
public:
wakeup_fthread_t(::flx::pthread::bound_queue_t *q_a, ::flx::rtl::fthread_t *f_a);
void signal () { q->enqueue(f); }
};
class ASYNC_EXTERN flx_driver_request_base {
finote_t *fn;
virtual bool start_async_op_impl() = 0;
public:
flx_driver_request_base();
virtual ~flx_driver_request_base(); // so destructors work
// returns finished flag (async may fail or immediately finish)
void start_async_op(finote_t *fn_a);
void notify_finished();
};
}}
#endif
//[flx_async.cpp]
#include "flx_async.hpp"
#include "pthread_bound_queue.hpp"
#include "flx_rtl.hpp"
#include <cassert>
#include <stdio.h>
using namespace ::flx::rtl;
using namespace ::flx::pthread;
using namespace ::flx::async;
async_hooker::~async_hooker(){ }
namespace flx { namespace async {
// FINISHED NOTIFIER
finote_t::~finote_t(){}
// DERIVED NOTIFIER WHICH DOES FTHREAD WAKEUP
// BY ENQUEUING THE FTHREAD INTO THE READY QUEUE
wakeup_fthread_t::wakeup_fthread_t(
::flx::pthread::bound_queue_t *q_a,
::flx::rtl::fthread_t *f_a)
: f(f_a), q(q_a) {}
// ASYNC HOOKER IMPLEMENTATION STAGE 1
// Introduces new virtual get_ready_queue().
class async_hooker_impl : public async_hooker {
public:
virtual bound_queue_t *get_ready_queue()=0;
~async_hooker_impl() {}
void handle_request(flx::async::flx_driver_request_base *pgeneral,fthread_t *ss)
{
flx::async::flx_driver_request_base* dreq = pgeneral;
finote_t *fn = new wakeup_fthread_t(get_ready_queue(),ss);
dreq->start_async_op(fn);
}
};
// ASYNC HOOKER IMPLEMENTATION STAGE 2
// Provides the ready queue and the dequeuing operations
class proto_async : public async_hooker_impl
{
bound_queue_t async_ready;
public:
proto_async(thread_control_base_t *tc, int n0, int n1, int m1, int n2, int m2) :
async_ready(tc,n0)
{}
~proto_async(){}
bound_queue_t *get_ready_queue() { return &async_ready; }
fthread_t* dequeue()
{
return (fthread_t*)async_ready.dequeue();
}
fthread_t* maybe_dequeue()
{
return (fthread_t*)async_ready.maybe_dequeue();
}
};
// DRIVER REQUEST BASE
// THIS IS USED TO BUILD REQUESTS
// PROVIDES DEFAULT NOTIFY_FINISHED ROUTINE WHICH USE FINOTE SIGNAL
// DO ASYNC OP JUST CALLS DRIVED CLASS DO_ASYNC_OP_IMPL
flx_driver_request_base::flx_driver_request_base() : fn(0) {}
flx_driver_request_base::~flx_driver_request_base() {} // so destructors work
void flx_driver_request_base:: start_async_op(finote_t *fn_a)
{
//fprintf(stderr,"start async op %p, set fn = %p\n",this,fn_a);
assert(fn==0);
fn = fn_a;
bool completed = start_async_op_impl();
if(completed)
{
fprintf(stderr,"instant complete\n");
notify_finished();
}
else
{
//fprintf(stderr,"Pending\n");
}
}
void flx_driver_request_base:: notify_finished()
{
//fprintf(stderr, "faio_req=%p, Notify finished %p\n", this,fn);
assert(fn!=0);
finote_t *fin = fn;
fn=0;
fin->signal();
delete fin;
//fprintf(stderr, "faio_req=%p, FINISHED\n",this);
}
}}
async_hooker *create_async_hooker(thread_control_base_t *tc, int n0,int n1,int m1,int n2,int m2) {
return new ::flx::async::proto_async(tc,n0,n1,m1,n2,m2);
}
Config¶
//[unix_flx_async.fpc]
Name: flx_async
Description: Async hook
provides_dlib: -lflx_async_dynamic
provides_slib: -lflx_async_static
includes: '"flx_async.hpp"'
Requires: flx_pthread flx_gc flx
macros: BUILD_ASYNC
library: flx_async
srcdir: src/flx_async
src: .*\.cpp
//[win_flx_async.fpc]
Name: flx_async
Description: Async hook
provides_dlib: /DEFAULTLIB:flx_async_dynamic
provides_slib: /DEFAULTLIB:flx_async_static
includes: '"flx_async.hpp"'
Requires: flx_pthread flx_gc flx
macros: BUILD_ASYNC
library: flx_async
srcdir: src/flx_async
src: .*\.cpp
#[flx_async.py]
import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record
from fbuild.builders.file import copy
import buildsystem
# ------------------------------------------------------------------------------
def build_runtime(phase):
path = Path (phase.ctx.buildroot/'share'/'src/flx_async')
#buildsystem.copy_hpps_to_rtl(phase.ctx,
# path / 'flx_async.hpp',
#)
dst = 'host/lib/rtl/flx_async'
suffix = '.so'
srcs = [phase.ctx.buildroot/'share'/'src/flx_async/flx_async.cpp']
includes = [
phase.ctx.buildroot / 'host/lib/rtl',
phase.ctx.buildroot / 'share/lib/rtl'
]
macros = ['BUILD_ASYNC']
libs = [
call('buildsystem.flx_pthread.build_runtime', phase),
call('buildsystem.flx_gc.build_runtime', phase),
]
return Record(
static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
includes=includes,
macros=macros,
libs=[lib.static for lib in libs]),
shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
includes=includes,
macros=macros,
libs=[lib.shared for lib in libs]))