Package: src/packages/rtl-lfbag.fdoc
Lock Free Bag¶
key | file |
---|---|
pthread_lf_bag.hpp | share/lib/rtl/pthread_lf_bag.hpp |
pthread_lf_bag.cpp | share/src/pthread/pthread_lf_bag.cpp |
pthread_lf_bag.flx | share/lib/std/pthread/pthread_lf_bag.flx |
Lock Free Bag¶
A lock free thread safe bag for holding non-null pointers.
//[pthread_lf_bag.hpp]
#ifndef __FLX_PTHREAD_LF_BAG_H__
#define __FLX_PTHREAD_LF_BAG_H__
#include "flx_pthread_config.hpp"
#include <stdint.h>
#include <atomic>
#include "pthread_thread_control_base.hpp"
namespace flx { namespace pthread {
struct PTHREAD_EXTERN pthread_lf_bag {
::std::atomic <void *> * volatile a;
size_t n;
thread_control_base_t *tc;
// for statistics
size_t throughput;
// these indices are for optimisation purposes ONLY
// the head points at the next element to dequeue or a bit earlier
::std::atomic<size_t> head;
// we can't use unsigned type because the value may go negative
// if dequeue operations decrement the counter before the enqueue
// that pushed the data does.
::std::atomic<int32_t> used;
pthread_lf_bag (thread_control_base_t *tc_, size_t n_);
// the destructor is not safe!
// to make it safe one needs to be sure the queue is empty
// AND that no more values will be enqueued.
// This is very hard to do. Using a smart ptr for the bag
// ensures there will be no more enqueue operations started
// but not that one is not in progress. The queue may appear
// empty during the progress of such final enqueue operations.
// there is no safe way to ensure the queue will remain empty.
~pthread_lf_bag();
void enqueue(void *d);
void *dequeue ();
};
}} // namespaces
#endif
//[pthread_lf_bag.cpp]
// simple very efficient lock free bag
#include <atomic>
#include <chrono>
#include <algorithm>
#include <thread>
#include <stdlib.h>
#include "pthread_lf_bag.hpp"
#include <assert.h>
#include <pthread_thread.hpp>
using namespace flx::pthread;
// 10 ms max sleep, that's 10,000,000 nanoseconds
#define MAXSLEEP (size_t)10000000
static void sleep(thread_control_base_t *tc, size_t ns)
{
fprintf(stderr, "pthread_lf_bag: sleep: thread %p calling std::this_thread::yield()",::flx::pthread::mythrid());
assert(tc);
tc->yield();
//::std::this_thread::sleep_for(::std::chrono::nanoseconds(ns));
::std::this_thread::yield();
}
#define NQFENCE ::std::memory_order_seq_cst
#define DQFENCE ::std::memory_order_seq_cst
pthread_lf_bag::pthread_lf_bag (thread_control_base_t *tc_, size_t n_) :
n (n_), tc(tc_), head(0), used(0),
throughput(0),
a((::std::atomic<void*>*)calloc (n_ , sizeof (void*)))
{}
// the destructor is not safe!
// to make it safe one needs to be sure the queue is empty
// AND that no more values will be enqueued.
// This is very hard to do. Using a smart ptr for the bag
// ensures there will be no more enqueue operations started
// but not that one is not in progress. The queue may appear
// empty during the progress of such final enqueue operations.
// there is no safe way to ensure the queue will remain empty.
pthread_lf_bag::~pthread_lf_bag() { }
void pthread_lf_bag::enqueue(void *d)
{
wait:
size_t stime = 1;
while (used.load(::std::memory_order_seq_cst) == n) sleep(tc,stime);
size_t i = (head + used) % n;
while
(
(d = ::std::atomic_exchange_explicit(a + i, d,
NQFENCE))
)
{
if (used.load(::std::memory_order_seq_cst) == n) goto wait; // lost the race
i = (i + 1) % n;
if (i == head) sleep(tc,stime);
}
++used;
}
void *pthread_lf_bag::dequeue ()
{
wait:
size_t stime = 1;
while (used.load(::std::memory_order_seq_cst) == 0) sleep(tc,stime );
size_t i = head.load(::std::memory_order_seq_cst);
void *d = nullptr;
while
(
!(d = ::std::atomic_exchange_explicit(a + i, d,
DQFENCE))
)
{
if (used.load(::std::memory_order_seq_cst) == 0) goto wait; // lost the race
i = (i + 1) % n;
if (i == head) sleep(tc,stime);
}
head.store (i,::std::memory_order_seq_cst);
--used;
++throughput;
return d;
}
//[pthread_lf_bag.flx]
class LockFreeBag
{
type lf_bag = "::std::shared_ptr<::flx::pthread::pthread_lf_bag>"
requires
header '#include "pthread_lf_bag.hpp"',
package "pthread",
Cxx11_headers::memory
;
// note: unmanaged container at the moment!!
ctor lf_bag : size = """
::std::shared_ptr<::flx::pthread::pthread_lf_bag>
(new ::flx::pthread::pthread_lf_bag(PTF gcp->collector->get_thread_control(),$1))
""";
proc enqueue : lf_bag * address = "$1->enqueue ($2);";
gen dequeue : lf_bag -> address = "$1->dequeue ()";
gen len : lf_bag -> size = "$1->n";
gen used : lf_bag -> size = "$1->used.load()";
}