Package: src/packages/rtl-boundqueue.fdoc

Bound Queue

key file
pthread_bound_queue.hpp share/lib/rtl/pthread_bound_queue.hpp
pthread_bound_queue.cpp share/src/pthread/pthread_bound_queue.cpp
flx_bound_queue.fpc $PWD/src/config/flx_bound_queue.fpc

Bound Queue

//[pthread_bound_queue.hpp]
#ifndef __FLX_PTHREAD_BOUND_QUEUE_H__
#define __FLX_PTHREAD_BOUND_QUEUE_H__
#include "flx_pthread_config.hpp"
#include "flx_gc.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>

// interface for a consumer/producer queue. threads requesting a resource
// that isn't there block until one is available. push/pop re-entrant

namespace flx { namespace pthread {

// ********************************************************
/// Thread safe bounded queue.
///
/// The queue can be locked by setting bound=0.
/// In this state it can only be unlocked by setting a non-zero bound.
///
/// If the bound is set to 1 (the default),
/// then the queue is always either empty or full.
/// An empty queue blocks readers until a writer sends some data.
/// A full queue blocks writers, until a reader reads the data.
/// Note that when the queue is empty a writer can write data
/// and continues without waiting for the data to be read.
// ********************************************************

class PTHREAD_EXTERN bound_queue_t :public world_stop_notifier_t {
  thread_control_base_t *tc;
  ::std::condition_variable_any size_changed;
  ::std::mutex member_lock;
  size_t bound;
  void notify_world_stop() override;
  void wait();
  void wait_no_world_stop_check(); // used by async system
public:
  void *lame_opaque; // has to be public for the scanner to find it
  bound_queue_t(thread_control_base_t *tc_, size_t);
  ~bound_queue_t();
  void enqueue(void*);
  void enqueue_no_world_stop_check(void*); // used by async system
  void* dequeue();
  void* maybe_dequeue();
  void resize(size_t);
  void wait_until_empty();
  size_t len();
};

PTHREAD_EXTERN ::flx::gc::generic::scanner_t bound_queue_scanner;

}} // namespace pthread, flx
#endif
//[pthread_bound_queue.cpp]
#include "pthread_bound_queue.hpp"
#include <queue>        // stl to the bloated rescue
#include <stdio.h>      // debugging in scanner

using namespace std;

namespace flx { namespace pthread {
typedef deque<void*> void_queue;

#define ELTQ ((void_queue*)lame_opaque)

void bound_queue_t::notify_world_stop()
{
  size_changed.notify_all();
}

bound_queue_t::bound_queue_t(thread_control_base_t *tc_, size_t n) : bound(n), tc(tc_)
{
//fprintf(stderr, "Creating bound queue %p, thread_control base=%p\n", this,tc);
  lame_opaque = new void_queue;
  tc->register_world_stop_notifier(this);
}

// Much care is needed deleting a queue.
// A safe method is possible .. but not provided here
bound_queue_t::~bound_queue_t()
{
//fprintf(stderr,"Deleting bound queue %p\n",this);
  tc->unregister_world_stop_notifier(this);
  delete ELTQ;
}

void bound_queue_t::wait() {
//fprintf(stderr, "Bound queue waiting.. %p\n", this);
  member_lock.unlock();
//fprintf(stderr, "Unocked mutex, now doing a tc yield q=%p, tc=%p\n", this,tc);
  tc->yield();
//fprintf(stderr, "tc yield done, relocking mutex q=%p\n", this);
  member_lock.lock();
//fprintf(stderr, "locked mutex again, waiting on possible size change in queue %p\n",this);
  size_changed.wait_for(member_lock, ::std::chrono::duration<int>(1)); // 1second
//fprintf(stderr, "possible size change in queue detected %p\n", this);
}

void bound_queue_t::wait_no_world_stop_check() {
  size_changed.wait_for(member_lock, ::std::chrono::duration<int>(1)); // 1second
}


// get the number of element in the queue
// (NOT the bound!)
size_t bound_queue_t::len() {
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  return ELTQ->size();
}

void bound_queue_t::wait_until_empty() {
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(!ELTQ->empty()) wait();
}

void
bound_queue_t::enqueue(void* elt)
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(ELTQ->size() >= bound) wait(); // guard against spurious wakeups!
  ELTQ->push_back(elt);
  size_changed.notify_all(); // cannot return an error
}

void
bound_queue_t::enqueue_no_world_stop_check(void* elt)
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(ELTQ->size() >= bound) wait_no_world_stop_check(); // guard against spurious wakeups!
  ELTQ->push_back(elt);
  size_changed.notify_all(); // cannot return an error
}


void*
bound_queue_t::dequeue()
{
//fprintf(stderr, "Trying to dequeue from bound queue\n");
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(ELTQ->empty())  wait(); // guard against spurious wakeups!
  void *elt = ELTQ->front();
  ELTQ->pop_front();
  size_changed.notify_all();
  return elt;
}

void*
bound_queue_t::maybe_dequeue()
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  void *elt = NULL;
  if (ELTQ->size() > 0)
  {
    elt = ELTQ->front();
    ELTQ->pop_front();
    size_changed.notify_all();
  }
  return elt;
}


void
bound_queue_t::resize(size_t n)
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  bound = n;
  // get things rolling again
  size_changed.notify_all();
}

using namespace flx;;
using namespace gc;
using namespace generic;

void *bound_queue_scanner(
  collector_t *collector,
  gc_shape_t *shape, void *pp,
  size_t dyncount,
  int reclimit
)
{
  // input is a pointer to a pointer to a bound queue object
  void *p = *(void**)pp;
  bound_queue_t *bq = (bound_queue_t*)p;
  void_queue *pq = (void_queue*) bq->lame_opaque;
  printf("Scanning bound queue %p->%p\n", pp, p);

  ::std::deque<void*>::const_iterator stl_end = pq->end();
  for(
    ::std::deque<void*>::const_iterator iter= pq->begin();
    iter < stl_end;
    ++iter
  ) {
    void *value = *iter;
    printf("bound_queue scanning p=%p\n",value);
    collector->register_pointer(value,reclimit);
  }
  return 0;
}


}}
//[flx_bound_queue.fpc]
Name: Pthread Bound Queue
Requires: flx_pthread flx_gc
includes: '"pthread_bound_queue.hpp"'