Package: src/packages/faio.fdoc

Faio: Felix Async I/O support

key file
faio_drv.hpp share/src/faio/faio_drv.hpp
faio_posixio.hpp share/lib/rtl/faio_posixio.hpp
faio_posixio.cpp share/src/faio/faio_posixio.cpp
faio_winio.hpp share/lib/rtl/faio_winio.hpp
faio_winio.cpp share/src/faio/faio_winio.cpp
faio_timer.hpp share/lib/rtl/faio_timer.hpp
faio_timer.cpp share/src/faio/faio_timer.cpp
faio.py $PWD/buildsystem/faio.py
timer.fpc $PWD/src/config/timer.fpc
unix_faio.fpc $PWD/src/config/unix/faio.fpc
win_faio.fpc $PWD/src/config/win/faio.fpc
flx_faio_config.hpp share/lib/rtl/flx_faio_config.hpp

Faio Driver

//[faio_drv.hpp]
#ifndef __FLX_FAIO_DRV_H__
#define __FLX_FAIO_DRV_H__
#include <flx_faio_config.hpp>

#include "pthread_bound_queue.hpp"
#include "demux_timer_queue.hpp"
#include "demux_demuxer.hpp"

namespace flx { namespace faio {

// this may be needed but I've lost track of where
// we get SIGPIPE, SIG_IGN from ..

#if 0
void FAIO_EXTERN sigpipe_ignorer()
{
    void (*prev_handler)(int);  // solaris is FUN.
    prev_handler = signal(SIGPIPE, SIG_IGN);

    if(SIG_ERR == prev_handler)
    {
        fprintf(stderr, "failed to install SIGPIPE ignorer\n");
        throw -1;
    }
    else if(prev_handler != SIG_IGN && prev_handler != SIG_DFL)
    {
        fprintf(stderr,"warning: blew away prev SIGPIPE handler: %p\n",
            prev_handler);
    }
}
#endif

}}
#endif

Faio I/O

//[faio_posixio.hpp]
#ifndef __FLX_FAIO_POSIXIO_H__
#define __FLX_FAIO_POSIXIO_H__
#include <flx_faio_config.hpp>

#include "flx_async.hpp"

// we don't need to piggyback much data at all. for now just the demuxer,
// so that we can be woken up, and the buffer info (this replaces the
// felix "socket" thread type, which was ugly.

#include "demux_posix_demuxer.hpp"
#include "demux_timer_queue.hpp"

namespace flx { namespace faio {

class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
public:
  demux::sel_param   pb;     // in: what you want, out: what you get
  int       sio_flags;  // either one of PDEMUX_{READ|WRITE}A
  class socketio_request *request;

  virtual void wakeup(demux::posix_demuxer& demux);
};

// this can handle most unix style io, that is, read & write on sockets,
// files & pipes. NICE. the fact that the socket is now in here may mean
// I can get rid of the epoll hack
// Not sure if this can be used for file fds.
class FAIO_EXTERN socketio_request : public ::flx::async::flx_driver_request_base {
public:
    socketio_wakeup sv;
    demux::posix_demuxer *pd;
    socketio_request() {}       // Lord Felix demands it. Like STL.
    socketio_request(socketio_request const&);
    void operator = (socketio_request const&);

    socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool r);
    bool start_async_op_impl();
};

// client open
class FAIO_EXTERN connect_request
  : public ::flx::async::flx_driver_request_base, public demux::connect_control_block {
public:
  demux::posix_demuxer *pd;
  connect_request() {}      // flx linkage

  connect_request(demux::posix_demuxer *pd_a,const char* addr, int port);
  bool start_async_op_impl();
  virtual void wakeup(demux::posix_demuxer&);
};

// server open
class FAIO_EXTERN accept_request
  : public ::flx::async::flx_driver_request_base, public demux::accept_control_block {
public:
  // we sometimes know that there'll be several connections to accept.
  // this'll need a different wakeup - and a different interface between
  // event source & wakeups

  demux::posix_demuxer *pd;
  accept_request() {} // flx linkage

  // eeh, give that a better name
  accept_request(demux::posix_demuxer *pd_a, int listener) : pd(pd_a) { s = listener; }

  // from flx_driver_request_base
  bool start_async_op_impl();

  // from accept_control_block
  virtual void wakeup(demux::posix_demuxer& demux);
};

}}
#endif
//[faio_posixio.cpp]
#include <stdio.h>      // printf
#include "faio_posixio.hpp"
#include "demux_sockety.hpp"    // async_connect

#include <sys/types.h>  // getsockopt & co
#include <sys/socket.h>

#include <unistd.h>     // close
#include <string.h>     // strerror - probably not portable
#include <assert.h>

using namespace flx::demux;
namespace flx { namespace faio {

connect_request::connect_request(demux::posix_demuxer *pd_a,const char* addr, int port) :pd(pd_a) { addy = addr; p = port; s=-1; }

socketio_request::socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool read)
: pd(pd_a)
{
  //fprintf(stderr,"socketio_request %p making socketio_wakeup for socket %d\n",this,s);
  sv.s = s;
  sv.request = this;
  // demux supports reading AND writing. We don't. Yet.
  sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);

  sv.pb.buffer = buf;
  sv.pb.buffer_size = len;
  sv.pb.bytes_written = 0;        // really bytes_processed
}

socketio_request::socketio_request(socketio_request const &a) : pd(a.pd)
{
  //fprintf(stderr, "copying socketio_request to %p\n",this);
  sv = a.sv;
  sv.request = this;
}

// EXTREME HACKERY!
void socketio_request::operator=(socketio_request const &a)
{
  //fprintf(stderr, "assigning socketio_request to %p\n",this);

  flx_driver_request_base::operator=(a);
  sv = a.sv;
  sv.request = this;
  pd = a.pd;
}

bool
socketio_request::start_async_op_impl()
{
  //fprintf(stderr,"socketio_request: socket %d start async_op_impl %p\n",sv.s,this);
  // fprintf(stderr, "adding wakeup: len %i, done %i\n",
  //   sv.pb.buffer_size, sv.pb.bytes_written);

  if(sv.s == -1) {
    fprintf(stderr, "Attempt to start_async_op on socket -1\n");
    exit(1);
  }

  // wake thread if call failed
  bool failed = (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
  if (failed)
    fprintf(stderr,"socketio_request FAILED %p, sock=%d, dir=%d\n",this, sv.s, sv.sio_flags);
  //else
  //  fprintf(stderr,"socketio_request OK %p\n",this);
  return failed;
}


void
socketio_wakeup::wakeup(posix_demuxer& demux)
{
  //fprintf(stderr, "Wakeup, socket = %d\n",s);
  // handle read/write, return true if not finished.
  // otherwise wakeup return false.
  bool  connection_closed;

  //fprintf(stderr, "making socketio_wakeup %p\n",this);
  //fprintf(stderr,"prehandle wakeup, this: %p, read: %i, len: %i, done %i\n",
  //  this, read, pb.buffer_size, pb.bytes_written);

  // NOTE: this code does not handle the possibility of both read AND
  // write being set. That would require thinking about the what
  // the connect_closed return value meant. In any case, we don't
  // do that stuff here yet.

  if(wakeup_flags & PDEMUX_ERROR)
  {
    connection_closed = true;
    //pb.bytes_written=0;
    fprintf(stderr,"posix faio wakeup PDEMUX_ERROR, connection closed = %d\n", connection_closed);
  }

  else if(wakeup_flags & PDEMUX_EOF)
  {
    connection_closed = true;
    fprintf(stderr,"posix faio wakeup PDEMUX_EOF, connection closed = %d\n", connection_closed);
    //pb.bytes_written=0;
  }

  else if(wakeup_flags & PDEMUX_READ)
  {
    // just check that our above assumption hasn't been violated.
    assert(wakeup_flags == PDEMUX_READ);
    //fprintf(stderr,"posix faio wakeup PDEMUX_READ, reading..\n");
    connection_closed = posix_demuxer::socket_recv(s, &pb);
    //fprintf(stderr,"posix faio wakeup PDEMUX_READ, connection closed = %d\n", connection_closed);
  }
  else
  {
    // never hurts to be paranoid.
    assert(wakeup_flags == PDEMUX_WRITE);
    //fprintf(stderr,"posix faio wakeup PDEMUX_WRITE, writing..\n");
    connection_closed = posix_demuxer::socket_send(s, &pb);
    //if(connection_closed)
    //  fprintf(stderr,"posix faio wakeup PDEMUX_WRITE, connection closed = %d\n", connection_closed);
  }

  // fprintf(stderr,"posthandle wakeup, this: %p, read: %i, len: %i, done %i\n",
  //  this, read, pb.buffer_size, pb.bytes_written);
  // fprintf(stderr,"wakeup of %p, closed = %i\n", this, connection_closed);

  // wake up: time to process some data
  if(connection_closed || pb.bytes_written == pb.buffer_size)
  {
    // fprintf(stderr,"schedding %p, drv: %p, f: %p\n", this, drv, f);
    // if the connection closed, this notify should tell the caller
    // not to keep trying to write, but it doesn't .. why not?
    // who called it anyhow?
    // I think the writing code ignores error returns ..
    request->notify_finished();
    return;
  }

  // fprintf(stderr,"not schedding %p\n", this);
  fprintf(stderr, "Incomplete request on %d, waiting for more I/O\n",s);
  if(demux.add_socket_wakeup(this, sio_flags) == -1)
  fprintf(stderr,"failed to re-add_socket_wakeup\n");
}

// asynchronous connect
bool
connect_request::start_async_op_impl()
{
  //fprintf(stderr,"connect_request %p: start async_op_impl\n",this);

  // call failed or finished (!), wake up thread as no wakeup coming
  if(start(*pd) == -1) {
    fprintf(stderr, "FAILED TO SPAWN CONNECT REQUEST\n");
    return true;
  }

  // NONONONONO! Referring to this's variables after a successful start
  // gives rise to a race condition, which is bad.
  //fprintf(stderr, "CONNECT REQUEST SPAWNED\n");
  return false;     // do not reschedule after a successful start

/*
  // I've not seen this yet, don't know why.
  if(0 == socket_err) fprintf(stderr, "WOW, instant CONNECT\n");

  // call didn't fail, could be pending or finished.
  // return socket_err != EINPROGRESS, the contrapositive, sort of
  return 0 == socket_err;   // no err => finished immediately
*/
}

void
connect_request::wakeup(posix_demuxer& demux)
{
  //fprintf(stderr, "connect_request::wakeup\n");

  // fprintf(stderr,"connect woke up\n");
  connect_control_block::wakeup(demux);

  // felix thread can pick out error itself.
  notify_finished();
}


// async accept
bool
accept_request::start_async_op_impl()
{
  //fprintf(stderr,"accept_request %p: start async_op_impl\n",this);
  bool failed = (start(*pd) == -1);      // accept_control_block function
  if(failed)
    fprintf(stderr, "FAILED TO SPAWN ACCEPT REQUEST\n");
  //else
  //  fprintf(stderr, "ACCEPT REQUEST SPAWNED\n");
  return failed;
}

void
accept_request::wakeup(posix_demuxer& demux)
{
  // does the leg work.
  accept_control_block::wakeup(demux);
  //'fprintf(stderr, "faio_posix::accept_request::wakeup\n");

  if(accepted == -1)
  {
    // I don't know if this is a good idea...
    fprintf(stderr, "accept request failed (%i), retrying...\n",
      socket_err);
    // didn't get it - go back to sleep
    if(start(demux) == -1)
      fprintf(stderr, "failed again... probably was a bad idea\n");
    return;
  }

  notify_finished();
}

}}
//[faio_winio.hpp]
#ifndef __FLX_FAIO_WINIO_H__
#define __FLX_FAIO_WINIO_H__
#include <flx_faio_config.hpp>

// visual studio is quite sensitve about how you do these includes.
// THIS is the way (WinSock2.h must include Windows.h).
#include <WinSock2.h>
#include <MSWSock.h>        // AcceptEx, TF_REUSE_SOCKET, etc

#include "flx_async.hpp"
#include "demux_overlapped.hpp"   // nicely wrapped async windows calls

namespace flx { namespace faio {

// interestingly, because in windows the async objects are associated
// with an IOCP before their use, we don't actually need a demuxer here
// at all. That's kind of nice. (actually iocp_associator uses it now)

// a flx driver request to the add socket s to the drivers iocp
// this is currently the only windows driver request that uses the demuxer.
class FAIO_EXTERN iocp_associator : public ::flx::async::flx_driver_request_base {
  SOCKET  s;
public:
  demux::iocp_demuxer *iod;
  // should have result & errcode
  iocp_associator() : iod(0) {} // shouldn't this also set s?
  iocp_associator(demux::iocp_demuxer *iod_a, SOCKET associatee)
  : s(associatee), iod(iod_a) {}

  bool start_async_op_impl();
};

// flx <-> c++ stuff for async io (well, it was)

// transition to new windows async control block
class FAIO_EXTERN waio_base : public ::flx::async::flx_driver_request_base {
protected:
  ::flx::async::finote_t *fn_a;
public:
  demux::iocp_demuxer *iod;
  bool  success;          // eh?

  waio_base() : iod(0), success(false) {}
  waio_base(demux::iocp_demuxer *iod_a) : iod(iod_a), success(false) {}

  // actually wakes up thread
  virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    LPOVERLAPPED olp, int err);
};


// listener socket must be already associated with an IOCP
// in doing an AcceptEx, it might succeed immediately - do you still
// get the IOCP wakeup?
class FAIO_EXTERN wasync_accept
  : public waio_base, public demux::acceptex_control_block
{
public:
  wasync_accept() {}  // felix linkage demands it

  wasync_accept(demux::iocp_demuxer *iod_a,SOCKET l, SOCKET a) : waio_base(iod_a) { listener = l; acceptor = a; }

  bool start_async_op_impl();

  virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    LPOVERLAPPED olp, int err);
};

class FAIO_EXTERN connect_ex
  : public waio_base, public demux::connectex_control_block
{
public:

  connect_ex() {}     // flx linkage

  connect_ex(demux::iocp_demuxer *iod_a,SOCKET soc, const char* addr, int port)
    : waio_base(iod_a) { s = soc; addy = addr; p = port; }

  bool start_async_op_impl();

  virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    LPOVERLAPPED olp, int err);
};

// TransmitFile here (requires file handle)
class FAIO_EXTERN wasync_transmit_file
  : public waio_base, public demux::transmitfile_control_block
{
public:
  wasync_transmit_file()
    : waio_base(0), transmitfile_control_block(INVALID_SOCKET, NULL) {}   // flx linkage

  wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst)      // for reuse of socket
    : waio_base(iod_a), transmitfile_control_block(dst) {}

  wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst, HANDLE src)  // actual transmitfile
    : waio_base(iod_a), transmitfile_control_block(dst, src) {}

  // from flx_request_base
  bool start_async_op_impl();

  virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
    LPOVERLAPPED olp, int err);
};

// handles both WSASend & WSARecv
class FAIO_EXTERN wsa_socketio
  : public waio_base, public demux::wsasocketio_control_block
{
public:
  wsa_socketio()
    : wsasocketio_control_block(INVALID_SOCKET, NULL, false) {}

  wsa_socketio(demux::iocp_demuxer *iod_a,SOCKET src, demux::sel_param* ppb, bool read)
    : waio_base(iod_a), wsasocketio_control_block(src, ppb, read) {}

  bool start_async_op_impl();

  virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    LPOVERLAPPED olp, int err);
};


}}
#endif  // __DWINIO__
//[faio_winio.cpp]
#include "faio_winio.hpp"
#include <stdio.h>      // printf

using namespace flx::demux;
namespace flx { namespace faio {

// way of adding sockets to the IOCP.
bool
iocp_associator::start_async_op_impl()
{
  //fprintf(stderr,"iocp_associator: start async_op_impl\n");

  // nasty: note how I'm making the user cookie constant (0).
  if(iod->associate_with_iocp((HANDLE)s, 0) != 0)
    fprintf(stderr,"associate request failed - get result here!\n");

  return true;      // wake caller
}

void
waio_base::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
  LPOVERLAPPED olp, int err)
{
  // fprintf(stderr,"general wakeup thing - rescheduling\n");
  //fprintf(stderr,"this: %p, q: %p, f: %p, err: %i\n", this, q, f, err);

  // this tells us when things went wrong (store it)
  if(NO_ERROR != err)
    fprintf(stderr,"catchall wakeup got error: %i (should store it)\n", err);

  success = (NO_ERROR == err);  // this works pretty well
  notify_finished();
}

bool
wasync_accept::start_async_op_impl()
{
  //fprintf(stderr,"wasync_accept: start async_op_impl\n");
  return start_overlapped();
}

void
wasync_accept::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
  LPOVERLAPPED olp, int err)
{
  waio_base::iocp_op_finished(nbytes, udat, olp, err);
}


bool
connect_ex::start_async_op_impl()
{
  //fprintf(stderr,"connect_ex: start async_op_impl\n");
  return start_overlapped();
}

void
connect_ex::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
  LPOVERLAPPED olp, int err)
{
  waio_base::iocp_op_finished(nbytes, udat, olp, err);
}


bool
wasync_transmit_file::start_async_op_impl()
{
  //fprintf(stderr,"wasync_transmit_file: start async_op_impl\n");
  return start_overlapped();
}

void
wasync_transmit_file::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
  LPOVERLAPPED olp, int err)
{
  waio_base::iocp_op_finished(nbytes, udat, olp, err);
}

bool
wsa_socketio::start_async_op_impl()
{
  //fprintf(stderr,"wsa_socketio: start async_op_impl\n");
  return start_overlapped();    // start overlapped op
}

// this could be factored into demux... or it might need
// to stay here... this is really a finished that isn't finished
// same goes for winfileio (I think)
void
wsa_socketio::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
  LPOVERLAPPED olp, int err)
{
  // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err );
// Doing the handling myself - this can restart the op giving us
// a possible race condition... or not? It should be sync with this call.
  // wsasocketio_control_block::iocp_op_finished(nbytes, udat, olp, err);

  ppb->bytes_written += nbytes;

  // if we're not finished, we have to reinstall our request
  // zero bytes indicates shutdown/closure, right?
  // might be using this for WSASend. Instead of broken pipes on win32,
  // instead we get WSAECONNRESET (pretty sure) on write. On read?
  if(0 == nbytes || ppb->finished())
  {
    // this'll wake us up
    waio_base::iocp_op_finished(nbytes, udat, olp, err);
  }
  else
  {
    // go back around again
    // this returns a finished flag (bad idea). it can also fail.
    // I think it would be better to know that.
    if(start_overlapped())
      fprintf(stderr, "socketio restart finished! WHAT TO DO!?!\n");
  }
}

}}

Faio Timer

//[faio_timer.hpp]
#ifndef __FLX_FAIO_TIMER_H__
#define __FLX_FAIO_TIMER_H__
#include <flx_faio_config.hpp>

#include "demux_demuxer.hpp"        // sel_param, demuxer base
#include "flx_async.hpp"
#include "demux_timer_queue.hpp"

#include "flx_rtl.hpp"

namespace flx { namespace faio {


// sleeping
class FAIO_EXTERN sleep_request
  : public ::flx::async::flx_driver_request_base, public demux::sleep_task
{
  demux::timer_queue *sleepers;
  double      delta;
public:
  sleep_request() {}        // flx linkage

  sleep_request(demux::timer_queue *sleepers_a, double d) :
    sleepers(sleepers_a), delta(d)
  {}

  // from driver request
  bool start_async_op_impl();

  void fire();

};

}} // namespace faio, flx
#endif
//[faio_timer.cpp]
#include "faio_timer.hpp"

using namespace flx::demux;
namespace flx { namespace faio {
bool
sleep_request::start_async_op_impl()
{
  //fprintf(stderr,"Sleep: start async_op_impl %p\n",this);
  sleepers->add_sleep_request(this, delta);
  return false;   // no wakeup
}

void sleep_request::fire() {
  //fprintf (stderr,"FIRE req=%p\n",this);
  notify_finished();
}

}}
//[timer.fpc]
Name: Timer
Description: Real time clock services
Requires: faio
includes:  '"faio_timer.hpp"'
//[unix_faio.fpc]
Name: faio
Description: Asynchronous I/O support
provides_dlib: -lfaio_dynamic
provides_slib: -lfaio_static
includes: '"faio_posixio.hpp"'
Requires: flx_async flx_pthread demux flx flx_gc
library: faio
macros: BUILD_FAIO
srcdir: src/faio
src: faio_(timer|posixio)\.cpp
headers: faio_(drv|timer|posixio)\.hpp
//[win_faio.fpc]
Name: faio
Description: Asynchronous I/O support
provides_dlib: /DEFAULTLIB:faio_dynamic
provides_slib: /DEFAULTLIB:faio_static
includes: '"faio_winio.hpp"'
Requires: flx_async flx_pthread demux flx flx_gc
library: faio
macros: BUILD_FAIO
srcdir: src/faio
src: faio_(timer|winio)\.cpp
headers: faio_(drv|timer|winio)\.hpp
#[faio.py]
import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record

import buildsystem

# ------------------------------------------------------------------------------

def build_runtime(phase):
    print('[fbuild] [faio]')
    path = Path(phase.ctx.buildroot/'share'/'src/faio')

    dst = 'host/lib/rtl/faio'
    srcs = [
        path / 'faio_timer.cpp',
    ]
    includes = [
        phase.ctx.buildroot / 'host/lib/rtl',
        phase.ctx.buildroot / 'share/lib/rtl'
    ]
    macros = ['BUILD_FAIO']
    libs=[
        call('buildsystem.flx_pthread.build_runtime', phase),
        call('buildsystem.flx_async.build_runtime', phase),
        call('buildsystem.demux.build_runtime', phase),
    ]

    if 'win32' in phase.platform:
        srcs.append(path / 'faio_winio.cpp')
        includes.append(Path('src', 'demux', 'win'))

    if 'posix' in phase.platform:
        srcs.append(path / 'faio_posixio.cpp')
        includes.append(Path('src', 'demux', 'posix'))

    return Record(
        static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            libs=[lib.static for lib in libs]),
        shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            libs=[lib.shared for lib in libs]))

def build_flx(phase):
    return
    #return buildsystem.copy_flxs_to_lib(phase.ctx,
    #    Path('src/faio/*.flx').glob())
//[flx_faio_config.hpp]
#ifndef __FLX_FAIO_CONFIG_H__
#define __FLX_FAIO_CONFIG_H__
#include "flx_rtl_config.hpp"
#ifdef BUILD_FAIO
#define FAIO_EXTERN FLX_EXPORT
#else
#define FAIO_EXTERN FLX_IMPORT
#endif
#endif