Package: src/packages/rtl-lfbag.fdoc

Lock Free Bag

key file
pthread_lf_bag.hpp share/lib/rtl/pthread_lf_bag.hpp
pthread_lf_bag.cpp share/src/pthread/pthread_lf_bag.cpp
pthread_lf_bag.flx share/lib/std/pthread/pthread_lf_bag.flx

Lock Free Bag

A lock free thread safe bag for holding non-null pointers.

//[pthread_lf_bag.hpp]
#ifndef __FLX_PTHREAD_LF_BAG_H__
#define __FLX_PTHREAD_LF_BAG_H__

#include "flx_pthread_config.hpp"
#include <stdint.h>
#include <atomic>
#include "pthread_thread_control_base.hpp"

namespace flx { namespace pthread {

struct PTHREAD_EXTERN pthread_lf_bag {
  ::std::atomic <void *> * volatile a;
  size_t n;
  thread_control_base_t *tc;

  // for statistics
  size_t throughput;

  // these indices are for optimisation purposes ONLY
  // the head points at the next element to dequeue or a bit earlier
  ::std::atomic<size_t> head;

  // we can't use unsigned type because the value may go negative
  // if dequeue operations decrement the counter before the enqueue
  // that pushed the data does.
  ::std::atomic<int32_t> used;

  pthread_lf_bag (thread_control_base_t *tc_, size_t n_);

  // the destructor is not safe!
  // to make it safe one needs to be sure the queue is empty
  // AND that no more values will be enqueued.
  // This is very hard to do. Using a smart ptr for the bag
  // ensures there will be no more enqueue operations started
  // but not that one is not in progress. The queue may appear
  // empty during the progress of such final enqueue operations.
  // there is no safe way to ensure the queue will remain empty.
  ~pthread_lf_bag();

  void enqueue(void *d);
  void *dequeue ();
};

}} // namespaces
#endif
//[pthread_lf_bag.cpp]
// simple very efficient lock free bag
#include <atomic>
#include <chrono>
#include <algorithm>
#include <thread>
#include <stdlib.h>
#include "pthread_lf_bag.hpp"
#include <assert.h>
#include <pthread_thread.hpp>

using namespace flx::pthread;

// 10 ms max sleep, that's 10,000,000 nanoseconds
#define MAXSLEEP (size_t)10000000

static void sleep(thread_control_base_t *tc, size_t ns)
{
fprintf(stderr, "pthread_lf_bag: sleep: thread %p calling std::this_thread::yield()",::flx::pthread::mythrid());
  assert(tc);
  tc->yield();
  //::std::this_thread::sleep_for(::std::chrono::nanoseconds(ns));
  ::std::this_thread::yield();
}

#define NQFENCE ::std::memory_order_seq_cst
#define DQFENCE ::std::memory_order_seq_cst


  pthread_lf_bag::pthread_lf_bag (thread_control_base_t *tc_, size_t n_) :
    n (n_), tc(tc_), head(0), used(0),
    throughput(0),
    a((::std::atomic<void*>*)calloc (n_ , sizeof (void*)))
  {}

  // the destructor is not safe!
  // to make it safe one needs to be sure the queue is empty
  // AND that no more values will be enqueued.
  // This is very hard to do. Using a smart ptr for the bag
  // ensures there will be no more enqueue operations started
  // but not that one is not in progress. The queue may appear
  // empty during the progress of such final enqueue operations.
  // there is no safe way to ensure the queue will remain empty.
  pthread_lf_bag::~pthread_lf_bag() { }

  void pthread_lf_bag::enqueue(void *d)
  {
wait:
    size_t stime = 1;
    while (used.load(::std::memory_order_seq_cst) == n) sleep(tc,stime);
    size_t i = (head + used) % n;
    while
    (
      (d = ::std::atomic_exchange_explicit(a + i, d,
        NQFENCE))
    )
    {
      if (used.load(::std::memory_order_seq_cst) == n) goto wait; // lost the race
      i = (i + 1) % n;
      if (i == head) sleep(tc,stime);
    }
    ++used;
  }

  void *pthread_lf_bag::dequeue ()
  {
wait:
    size_t stime = 1;
    while (used.load(::std::memory_order_seq_cst) == 0) sleep(tc,stime );

    size_t i = head.load(::std::memory_order_seq_cst);
    void *d = nullptr;
    while
    (
      !(d = ::std::atomic_exchange_explicit(a + i, d,
        DQFENCE))
    )
    {
      if (used.load(::std::memory_order_seq_cst) == 0) goto wait; // lost the race
      i = (i + 1) % n;
      if (i == head) sleep(tc,stime);
    }
    head.store (i,::std::memory_order_seq_cst);
    --used;
    ++throughput;
    return d;
  }
//[pthread_lf_bag.flx]
class LockFreeBag
{
  type lf_bag = "::std::shared_ptr<::flx::pthread::pthread_lf_bag>"
    requires
      header '#include "pthread_lf_bag.hpp"',
      package "pthread",
      Cxx11_headers::memory
  ;
  // note: unmanaged container at the moment!!
  ctor lf_bag : size = """
     ::std::shared_ptr<::flx::pthread::pthread_lf_bag>
     (new ::flx::pthread::pthread_lf_bag(PTF gcp->collector->get_thread_control(),$1))
  """;
  proc enqueue : lf_bag * address = "$1->enqueue ($2);";
  gen dequeue : lf_bag -> address = "$1->dequeue ()";
  gen len : lf_bag -> size = "$1->n";
  gen used : lf_bag -> size = "$1->used.load()";
}