Package: src/packages/rtl-pthread-impl.fdoc

Pthread implementation

key file
pthread_thread.hpp share/lib/rtl/pthread_thread.hpp
pthread_posix_thread.cpp share/src/pthread/pthread_posix_thread.cpp
pthread_win_thread.cpp share/src/pthread/pthread_win_thread.cpp

Pthreads

The pthread_thread.hpp defines pthreads and the system pthread control class. The control is implemented separately.

Felix pthreads are tightly integrated with the garbage collector via the pthread control class. This is necessary because the collector must stop all the pthreads before it can reliably sweep the thread stacks for roots.

Thread control also ensure Felix programs do not terminate until all managed pthreads have completed.

The Felix system uses detached threads. We provide joinable threads here too, but Felix programmers should use detached threads and pchannels for synchronisation.

//[pthread_thread.hpp]
#ifndef __FLX_PTHREAD_THREAD_H__
#define __FLX_PTHREAD_THREAD_H__
#include "flx_pthread_config.hpp"

#if FLX_WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

// auto pthread, because I forget how to deallocate them nicely
// could init in the constructor, but ultimately you don't want the thread
// barging in before you've finished doing other stuff
// Addendum (20051128): doing stdio in turns out to be not very safe.
// I don't know if printf et al are supposed to be thread safe (most impls
// seem to try to be) but I sometimes get deadlocks in ppc64 os x 10.4.2
// with 4.0.1 when printfing to stdout. Nasty.

#include "pthread_thread_control_base.hpp"

#include <utility>
#include <map>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>

namespace flx { namespace pthread {

// ********************************************************
/// Posix Threads. This class simply wraps the creation
/// and joining of threads. It is not safe.
// ********************************************************

#ifdef _WIN32
typedef HANDLE flx_native_thread_t;
#else
typedef pthread_t flx_native_thread_t;
#endif

flx_native_thread_t PTHREAD_EXTERN get_current_native_thread();


// FELIX THREAD IDENTIFIER (native thread id cast to uintptr_t)
uintptr_t PTHREAD_EXTERN mythrid();

// THREAD REGISTRY: a map from the FELIX THREAD ID to thread data
typedef std::map<uintptr_t, thread_data_t, std::less<uintptr_t> > thread_registry_t;

// ********************************************************
// MASTER THREAD CONTROL OBJECT
// PROVIDES WORLD STOP SERVICES for the GARBAGE COLLECTOR
//
// A singleton unmanaged object owned by the flx_world object
// ********************************************************

class PTHREAD_EXTERN thread_control_t : public virtual thread_control_base_t
{
    thread_control_t (thread_control_t const &) = delete; // uncopyable
    void operator=(thread_control_t const&) = delete; // uncopyable

    // THREAD CONTROL VARIABLES
    bool do_world_stop;     // flag to say we want to stop the world
    size_t thread_counter;  // total Felix threads (not counting demux etc)
    size_t active_counter;  // total active Felix threads (not suspended for world stop yet_

    ::std::condition_variable_any stop_guard;
    ::std::mutex stop_mutex;

    // WORLD STOP NOTIFICATION REGISTRY
    // .. a set of condition variables that a thread could be waiting on
    // .. the registry is used to wake them up so they notice the
    // world stop flag
    world_stop_notifier_t **world_stop_notifier_array;
    size_t world_stop_notifier_array_length;

    // THREAD REGISTRY
    thread_registry_t threads;


    // PRIVATE METHODS
    void unsafe_stop_check();
    void unsafe_suspend();
    void unsafe_resume();
    void world_stop_notify();

public:
    bool debug;

    // PUBLIC METHODS
    bool get_debug()const override;
    thread_control_t (bool);
    size_t thread_count() override;
    size_t active_count();
    void add_thread(void*) override;
    void remove_thread() override;
    bool world_stop() override;
    void join_all()  override;
    void world_start() override;
    void yield() override;
    void suspend() override;
    void resume() override;
    void register_world_stop_notifier(world_stop_notifier_t *) override;
    void unregister_world_stop_notifier(world_stop_notifier_t *) override;
    memory_ranges_t *get_block_list() override; // called owns result and should delete it
};

struct tstart_t
{
  void (*sr)(void*);
  void *cd;
  thread_control_base_t *tc;
  ::std::mutex *spawner_lock;
  ::std::condition_variable_any *spawner_cond;
  bool *spawner_flag;

  tstart_t(void (*s)(void*),void* c,thread_control_base_t *t, ::std::mutex *sl, ::std::condition_variable_any *sc, bool *sf)
    : sr(s), cd(c), tc(t), spawner_lock(sl), spawner_cond(sc), spawner_flag(sf)
  {}
};

// a class for threads that can't be joined. upon exit all their resources
// are freed. they just evaporate. probably the best type of thread.
class PTHREAD_EXTERN flx_detached_thread_t {
  flx_native_thread_t thr;        ///< the thread
  flx_detached_thread_t(flx_detached_thread_t const&); // uncopyable
  void operator=(flx_detached_thread_t const&); // uncopyable
public:
  flx_detached_thread_t();
  ~flx_detached_thread_t();
  int init(void (*start)(void*), void* udat, thread_control_base_t*, ::std::mutex *, ::std::condition_variable_any *, bool*);
};

// rf: joinable threads. is it an error to not join joinable threads?
class PTHREAD_EXTERN flx_thread_t {
  flx_native_thread_t thr;        ///< the thread
  flx_thread_t(flx_thread_t const&); // uncopyable
  void operator=(flx_thread_t const&); // uncopyable
public:
  flx_thread_t();
  ~flx_thread_t();
  int init(void (*start)(void*), void* udat, thread_control_base_t*);
  void join();
};

/// RAII wrapper for thread class
class PTHREAD_EXTERN flx_thread_wrapper_t {
  flx_thread_t thread;
  flx_thread_wrapper_t(flx_thread_wrapper_t const&); // uncopyable
  void operator=(flx_thread_wrapper_t const&); // uncopyable
public:
  ~flx_thread_wrapper_t();
  flx_thread_wrapper_t(void (*start)(void*), void* udat, thread_control_base_t *tc);
};

}}
#endif
//[pthread_posix_thread.cpp]
#include "pthread_thread.hpp"
#if FLX_POSIX
#include <stdio.h>
#include <string.h>  // strerror
#include <cstdlib>
#include <setjmp.h>
#include <functional> // less
#include <assert.h>

namespace flx { namespace pthread {

flx_native_thread_t get_current_native_thread() { return pthread_self(); }
uintptr_t mythrid() { return (uintptr_t)pthread_self(); }

static void *get_stack_pointer() {
  void *x;
  void *y = (void*)&x;
  return y;
}

extern "C" void *flx_pthread_start_wrapper(void *e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  thread_control_base_t *tc = ehd -> tc;
  if(tc == 0)
  {
    fprintf(stderr, "ERROR: flx_pthread_start_wrapper got NULL thread control object\n");
    assert(tc);
  }
  bool debug = tc->get_debug();
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",
       (void*)mythrid(),stack_base, tc);
  if(debug)
      fprintf(stderr,"Thread registering itself\n");
  tc->add_thread(stack_base);
  if(debug)
    fprintf(stderr,"Registered: Spawned Thread %p stack base = %p\n",
      (void*)mythrid(),stack_base, tc);


  void (*sr)(void*)=ehd->sr; // client function
  void *cd = ehd->cd;        // client data
  if(debug)
    fprintf(stderr,"ehd->spawner_lock = %p\n",ehd->spawner_lock);

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    if (debug)
      fprintf(stderr,"Thread %p acquired mutex\n", (void*)mythrid());
    if (debug)
      fprintf(stderr,"Thread %p notifying spawner it has registered itself\n", (void*)mythrid());
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
    if (debug)
      fprintf(stderr,"Thread %p releasing mutex\n", (void*)mythrid());
  }
  delete ehd;
  if (debug)
    fprintf(stderr,"Thread %p yielding\n", (void*)mythrid());
  tc->yield();
  try {
    if (debug)
      fprintf(stderr,"Thread %p running client code\n", (void*)mythrid());
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  if (debug)
    fprintf(stderr,"Thread %p unregistering\n", (void*)mythrid());
  tc->remove_thread();
  return NULL;
}


extern "C" void *nonflx_pthread_start_wrapper(void *e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  void (*sr)(void*)=ehd->sr; // client function
  void *cd = ehd->cd;        // client data

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
  }
  delete ehd;
  try {
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  return NULL;
}


// ---- detached threads ----------

flx_detached_thread_t::flx_detached_thread_t(flx_detached_thread_t const&){} // uncopyable
void flx_detached_thread_t::operator=(flx_detached_thread_t const&){} // uncopyable

int
flx_detached_thread_t::init(void (*start)(void*), void* udat, thread_control_base_t *tc,
  ::std::mutex * m, ::std::condition_variable_any *c,bool *flag)
{
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  pthread_attr_setstacksize(&attr, 1048576ul * 100ul ); // 100Meg .. should get from control/env of something
  int res = pthread_create(&thr, &attr, flx_pthread_start_wrapper,
    new tstart_t(start, udat, tc, m,c,flag));
  if(res)
  {
     fprintf(stderr, "WARNING: flx_detached_thread_t: pthread_create failed: %s\n",
       strerror(res));
  }
  pthread_attr_destroy(&attr);
  return res;
}

flx_detached_thread_t::~flx_detached_thread_t() { }
flx_detached_thread_t::flx_detached_thread_t() { }

// ---- joinable threads ----------
flx_thread_t::flx_thread_t(flx_thread_t const&){} // uncopyable
void flx_thread_t::operator=(flx_thread_t const&){} // uncopyable

int
flx_thread_t::init(void (*start)(void*), void* udat, thread_control_base_t*tc)
{
  int res = pthread_create(&thr, NULL, nonflx_pthread_start_wrapper,
    new tstart_t(start, udat, tc,NULL,NULL,NULL));
  if(res)
  {
     fprintf(stderr, "WARNING: flx_thread_t: pthread_create failed: %s\n",
       strerror(res));
  }
  return res;
}

void flx_thread_t::join() {
  int res = pthread_join(thr, NULL);
  if(res)
  {
     fprintf(stderr, "flx_thread_t: FATAL: pthread_join failed: %s\n",
       strerror(res));
#ifdef exit
     // Someone wants to replace exit with their own thing ...
     exit(1);
#else
     std::exit(1);
#endif
  }
}

flx_thread_t::~flx_thread_t() { }
flx_thread_t::flx_thread_t() { }

// ---- joinable thread wrapper ----------

flx_thread_wrapper_t::flx_thread_wrapper_t(flx_thread_wrapper_t const&){} // uncopyable
void flx_thread_wrapper_t::operator=(flx_thread_wrapper_t const&){} // uncopyable

flx_thread_wrapper_t::flx_thread_wrapper_t(void (*start)(void*), void* udat, thread_control_base_t*tc)
{
  int res = thread.init(start,udat,tc);
  {
    if(res)
    {
       fprintf(stderr, "FATAL: flx_thread_wapper_t: flx_thread_t.init failed: %s\n",
         strerror(res));
#ifdef exit
     // Someone wants to replace exit with their own thing ...
     exit(1);
#else
     std::exit(1);
#endif
    }
  }
}

flx_thread_wrapper_t::~flx_thread_wrapper_t() { thread.join(); }
}}

#endif
//[pthread_win_thread.cpp]
#include "pthread_thread.hpp"
#if FLX_WIN32
#include <stdio.h>
#include <cstdlib>
#include <assert.h>

namespace flx { namespace pthread {

flx_native_thread_t get_current_native_thread() { return GetCurrentThread(); }
uintptr_t mythrid() { return (uintptr_t)GetCurrentThreadId(); }

static void *get_stack_pointer() {
  void *x;
  void *y = (void*)&x;
  return y;
}

DWORD WINAPI flx_pthread_start_wrapper(LPVOID e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  thread_control_base_t *tc = ehd -> tc;
  if(tc == 0)
  {
    fprintf(stderr, "ERROR: flx_pthread_start_wrapper got NULL thread control object\n");
    assert(tc);
  }
  bool debug = tc->get_debug();
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",
       (void*)mythrid(),stack_base, tc);
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",(void*)mythrid(),stack_base, tc);
  if(tc->get_debug())
    fprintf(stderr,"Thread registering itself\n");
  tc->add_thread(stack_base);
  if(debug)
    fprintf(stderr,"Registered: Spawned Thread %p stack base = %p\n",
      (void*)mythrid(),stack_base, tc);
  void (*sr)(void*)=ehd->sr;
  void *cd = ehd->cd;
  if(debug)
    fprintf(stderr,"ehd->spawner_lock = %p\n",ehd->spawner_lock);

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    if (debug)
      fprintf(stderr,"Thread %p acquired mutex\n", (void*)mythrid());
    if (debug)
      fprintf(stderr,"Thread %p notifying spawner it has registered itself\n", (void*)mythrid());
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
    if (debug)
      fprintf(stderr,"Thread %p releasing mutex\n", (void*)mythrid());
  }
  delete ehd;
  if (debug)
    fprintf(stderr,"Thread %p yielding\n", (void*)mythrid());
  tc->yield();
  try {
    if (debug)
      fprintf(stderr,"Thread %p running client code\n", (void*)mythrid());
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  if (debug)
    fprintf(stderr,"Thread %p unregistering\n", (void*)mythrid());
  tc->remove_thread();
  return 0;
}

DWORD WINAPI nonflx_pthread_start_wrapper(LPVOID e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  void (*sr)(void*)=ehd->sr;
  void *cd = ehd->cd;
  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
  }
  delete ehd;
  try {
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  return 0;
}


// ---- detached threads ----------

flx_detached_thread_t::flx_detached_thread_t(flx_detached_thread_t const&){} // uncopyable
void flx_detached_thread_t::operator=(flx_detached_thread_t const&){} // uncopyable

// returns -1 on failure with error in GetLastError, 0 if all good.
int
flx_detached_thread_t::init(void (*start)(void*), void *lParam, thread_control_base_t *tc,
  ::std::mutex * m, ::std::condition_variable_any *c,bool *flag)
{
  DWORD thread_id = 0;
  thr = (HANDLE)CreateThread(NULL, 1048576ul * 100ul, // 100Meg .. should use control/env
    (LPTHREAD_START_ROUTINE)flx_pthread_start_wrapper,
    new tstart_t(start,lParam, tc, m, c, flag), 0,
    &thread_id
  );

  if(!thr)
  {
    DWORD err = GetLastError();
    fprintf(stderr, "flx_detached_thread_t: CreateThread failed: %i\n", err);
    return err;
  }
  return 0;
}

flx_detached_thread_t::~flx_detached_thread_t() { CloseHandle(thr); }
flx_detached_thread_t::flx_detached_thread_t() { }

// ---- joinable threads ----------
flx_thread_t::flx_thread_t(flx_thread_t const&){} // uncopyable
void flx_thread_t::operator=(flx_thread_t const&){} // uncopyable


flx_thread_t::flx_thread_t() { }
flx_thread_t::~flx_thread_t() { }

// this should be idempotent
void
flx_thread_t::join()
{
  // Let's try and wait for the thread to finish, however first I have to
  // tell it to finish up.

  DWORD  wait_res = WaitForSingleObject(thr, INFINITE);

  // will this give me my return status? how do I get that?
  if(WAIT_FAILED == wait_res)
  {
    fprintf(stderr,"WARNING: thread wait failed (%li)\n", GetLastError());
  }

  // I've already tried waiting on the  thread's #include <stdlib> exit
  if(!CloseHandle(thr))
  {
    fprintf(stderr,"FATAL: failed to delete thread (%li)\n", GetLastError());
    std::exit(1);
  }
}

// returns -1 on failure with error in GetLastError, 0 if all good.
int
flx_thread_t::init(void (*fn)(void*), void *lParam, thread_control_base_t *tc)
{
  DWORD thread_id = 0;
  thr= (HANDLE)CreateThread(NULL, 0,
    (LPTHREAD_START_ROUTINE)nonflx_pthread_start_wrapper,
    new tstart_t(fn,lParam, tc,NULL,NULL,NULL), 0,
    &thread_id
  );

  if(!thr)
  {
    DWORD err = GetLastError();
    fprintf(stderr, "WARNING: flx_thread_t: CreateThread failed: %i\n", err);
    return err;
  }

  return 0;
}

// ---- joinable thread wrapper ----------
flx_thread_wrapper_t::flx_thread_wrapper_t(void (*f)(void*), void *lParam, thread_control_base_t*tc)
{
  int res = thread.init(f,lParam,tc);
  if(res)
  {
    fprintf(stderr,"flx_thread_wrapper_t: FATAL: flx_thread_t.init failed\n");
    std::exit(1);
  }
}
flx_thread_wrapper_t::~flx_thread_wrapper_t() { thread.join(); }

}}

#endif