density
C++11 library for paged memory management, function queues, heterogeneous queues and lifo memory management
Public Types | Public Member Functions | Static Public Attributes | Friends | List of all members
conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE > Class Template Reference

#include <conc_function_queue.h>

Public Types

template<typename ELEMENT_COMPLETE_TYPE >
using put_transaction = typename UnderlyingQueue::template put_transaction< ELEMENT_COMPLETE_TYPE >
 
template<typename ELEMENT_COMPLETE_TYPE >
using reentrant_put_transaction = typename UnderlyingQueue::template reentrant_put_transaction< ELEMENT_COMPLETE_TYPE >
 
using consume_operation = typename UnderlyingQueue::consume_operation
 
using reentrant_consume_operation = typename UnderlyingQueue::reentrant_consume_operation
 

Public Member Functions

 conc_function_queue () noexcept=default
 
 conc_function_queue (conc_function_queue &&i_source) noexcept=default
 
conc_function_queueoperator= (conc_function_queue &&i_source) noexcept=default
 
 ~conc_function_queue ()
 
template<typename ELEMENT_COMPLETE_TYPE >
void push (ELEMENT_COMPLETE_TYPE &&i_source)
 
template<typename ELEMENT_COMPLETE_TYPE , typename... CONSTRUCTION_PARAMS>
void emplace (CONSTRUCTION_PARAMS &&...i_construction_params)
 
template<typename ELEMENT_TYPE >
put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_push (ELEMENT_TYPE &&i_source)
 
template<typename ELEMENT_TYPE , typename... CONSTRUCTION_PARAMS>
put_transaction< ELEMENT_TYPE > start_emplace (CONSTRUCTION_PARAMS &&...i_construction_params)
 
template<typename ELEMENT_COMPLETE_TYPE >
void reentrant_push (ELEMENT_COMPLETE_TYPE &&i_source)
 
template<typename ELEMENT_COMPLETE_TYPE , typename... CONSTRUCTION_PARAMS>
void reentrant_emplace (CONSTRUCTION_PARAMS &&...i_construction_params)
 
template<typename ELEMENT_TYPE >
reentrant_put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_reentrant_push (ELEMENT_TYPE &&i_source)
 
template<typename ELEMENT_TYPE , typename... CONSTRUCTION_PARAMS>
reentrant_put_transaction< ELEMENT_TYPE > start_reentrant_emplace (CONSTRUCTION_PARAMS &&...i_construction_params)
 
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_consume (PARAMS...i_params)
 
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_consume (consume_operation &i_consume, PARAMS...i_params)
 
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_reentrant_consume (PARAMS...i_params)
 
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_reentrant_consume (reentrant_consume_operation &i_consume, PARAMS...i_params)
 
template<function_type_erasure ERASURE_ = ERASURE, typename std::enable_if< ERASURE_!=function_manual_clear >::type * = nullptr>
void clear () noexcept
 
bool empty () noexcept
 

Static Public Attributes

static constexpr bool concurrent_puts = true
 
static constexpr bool concurrent_consumes = true
 
static constexpr bool concurrent_put_consumes = true
 
static constexpr bool is_seq_cst = true
 

Friends

void swap (conc_function_queue &i_first, conc_function_queue &i_second) noexcept
 

Detailed Description

template<typename CALLABLE, typename ALLOCATOR_TYPE = default_allocator, function_type_erasure ERASURE = function_standard_erasure>
class density::conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE >

Thread-safe heterogeneous FIFO pseudo-container specialized to hold callable objects. conc_function_queue is an adaptor for conc_heter_queue.

Template Parameters
CALLABLESignature required to the callable objects. Must be in the form RET_VAL (PARAMS...)
ALLOCATOR_TYPEAllocator type to be used. This type must satisfy the requirements of both UntypedAllocator and PagedAllocator. The default is density::default_allocator.
ERASUREType erasure to use the callable objects. Must be a member of density::function_type_erasure.

conc_function_queue is basically a function_queue protected from data-races by a mutex. Non-reentrant operations (put or consumes) lock the mutex only once. Reentrant operations lock the mutex once when starting, and a second time to commit or cancel. Anyway in the middle of the operation the queue is not locked, and it can be used by the same thread or by other threads.
reentrant_put_transaction::commit, reentrant_put_transaction::cancel, reentrant_consume_operation::commit and reentrant_consume_operation::cancel are all noexcept functions that lock a mutex in a controlled context (no reentrancy, no callbacks, no possibility of deadlocks). The lock of the mutex should never throw in this context, but if for some reasons it does, the runtime will call std::terminate.

If ERASURE == function_manual_clear, conc_function_queue is not able to destroy the callable objects without invoking them. This produces a performance benefit, but:


Implementation note: If ERASURE == function_manual_clear, a runtime type associated with a value is actually a pointer to a function that invokes and destroys the callable object. In this case the size of a value is a pair of pointers, plus the capture (if any).
If ERASURE == function_standard_erasure the runtime type has an additional pointer to a function that destroys the callable object without invoking it.


Thread safeness: Put and consumes can be executed concurrently.
Exception safeness: Any function of conc_function_queue is noexcept or provides the strong exception guarantee.

Member Typedef Documentation

using put_transaction = typename UnderlyingQueue::template put_transaction<ELEMENT_COMPLETE_TYPE>
using reentrant_put_transaction = typename UnderlyingQueue::template reentrant_put_transaction<ELEMENT_COMPLETE_TYPE>
using consume_operation = typename UnderlyingQueue::consume_operation
using reentrant_consume_operation = typename UnderlyingQueue::reentrant_consume_operation

Constructor & Destructor Documentation

conc_function_queue ( )
defaultnoexcept

Default constructor.

conc_function_queue<int(float, double), default_allocator, ERASURE> queue;
assert(queue.empty());
conc_function_queue ( conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE > &&  i_source)
defaultnoexcept

Move constructor.

conc_function_queue<int(), default_allocator, ERASURE> queue;
queue.push([] { return 6; });
auto queue_1(std::move(queue));
assert(queue.empty());
auto result = queue_1.try_consume();
assert(result && *result == 6);
~conc_function_queue ( )
inline

Destructor

Member Function Documentation

conc_function_queue& operator= ( conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE > &&  i_source)
defaultnoexcept

Move assignment.

conc_function_queue<int(), default_allocator, ERASURE> queue, queue_1;
queue.push([] { return 6; });
queue_1 = std::move(queue);
auto result = queue_1.try_consume();
assert(result && *result == 6);
void push ( ELEMENT_COMPLETE_TYPE &&  i_source)
inline

Adds at the end of the queue a callable object.

See conc_heter_queue::push for a detailed description.

conc_function_queue<void(), default_allocator, ERASURE> queue;
queue.push([] { std::cout << "Hello"; });
queue.push([] { std::cout << " world"; });
queue.push([] { std::cout << "!!!"; });
queue.push([] { std::cout << std::endl; });
while (queue.try_consume())
;
double last_val = 1.;
auto func = [&last_val] { return last_val /= 2.; };
conc_function_queue<double(), default_allocator, ERASURE> queue;
for (int i = 0; i < 10; i++)
queue.push(func);
while (auto const return_value = queue.try_consume())
std::cout << *return_value << std::endl;
#if !defined(_MSC_VER) || !defined(_M_X64) /* the size of a type must always be a multiple of
the alignment, but in the Microsoft's compiler, on 64-bit targets, pointers to data
member are 4 bytes big, but are aligned to 8 bytes.
Test code:
using T = int Struct::*;
std::cout << sizeof(T) << std::endl;
std::cout << alignof(T) << std::endl;
*/
struct Struct
{
int func_1() { return 1; }
int func_2() { return 2; }
int var_1 = 3;
int var_2 = 4;
};
conc_function_queue<int(Struct *)> queue;
queue.push(&Struct::func_1);
queue.push(&Struct::func_2);
queue.push(&Struct::var_1);
queue.push(&Struct::var_2);
Struct struct_instance;
int sum = 0;
while (auto const return_value = queue.try_consume(&struct_instance))
sum += *return_value;
assert(sum == 10);
#endif
void emplace ( CONSTRUCTION_PARAMS &&...  i_construction_params)
inline

Adds at the end of the queue a callable object of type ELEMENT_COMPLETE_TYPE, in-place-constructing it from a perfect forwarded parameter pack.
Note: the template argument ELEMENT_COMPLETE_TYPE can't be deduced from the parameters so it must explicitly specified.

See conc_heter_queue::emplace for a detailed description.

/* This local struct is unmovable and uncopyable, so emplace is the only
option to add it to the queue. Note that operator () returns an int,
but we add it to a void() function queue. This is ok, as we are just
discarding the return value. */
struct Func
{
int const m_value;
Func(int i_value) : m_value(i_value) {}
Func(const Func &) = delete;
Func & operator=(const Func &) = delete;
int operator()() const
{
std::cout << m_value << std::endl;
return m_value;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
queue.template emplace<Func>(7);
bool const invoked = queue.try_consume();
assert(invoked);
put_transaction<typename std::decay<ELEMENT_TYPE>::type> start_push ( ELEMENT_TYPE &&  i_source)
inline

Begins a transaction that appends an element of type ELEMENT_TYPE, copy-constructing or move-constructing it from the source.

See conc_heter_queue::start_push for a detailed description.

Examples

struct Func
{
const char * m_string_1;
const char * m_string_2;
void operator()()
{
std::cout << m_string_1 << std::endl;
std::cout << m_string_2 << std::endl;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
auto transaction = queue.start_push(Func{});
// in case of exception here, since the transaction is not committed, it is discarded with no observable effects
transaction.element().m_string_1 = transaction.raw_allocate_copy("Hello world");
transaction.element().m_string_2 =
transaction.raw_allocate_copy("\t(I'm so happy)!!");
transaction.commit();
bool const invoked = queue.try_consume();
assert(invoked);
put_transaction<ELEMENT_TYPE> start_emplace ( CONSTRUCTION_PARAMS &&...  i_construction_params)
inline

Begins a transaction that appends an element of a type ELEMENT_TYPE, in-place-constructing it from a perfect forwarded parameter pack.

See conc_heter_queue::start_emplace for a detailed description.

Examples

struct Func
{
const char * m_string_1;
const char * m_string_2;
void operator()()
{
std::cout << m_string_1 << std::endl;
std::cout << m_string_2 << std::endl;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
auto transaction = queue.template start_emplace<Func>();
// in case of exception here, since the transaction is not committed, it is discarded with no observable effects
transaction.element().m_string_1 = transaction.raw_allocate_copy("Hello world");
transaction.element().m_string_2 =
transaction.raw_allocate_copy("\t(I'm so happy)!!");
transaction.commit();
bool const invoked = queue.try_consume();
assert(invoked);
void reentrant_push ( ELEMENT_COMPLETE_TYPE &&  i_source)
inline

Adds at the end of the queue a callable object.

See conc_heter_queue::reentrant_push for a detailed description.

conc_function_queue<void(), default_allocator, ERASURE> queue;
queue.reentrant_push([] { std::cout << "Hello"; });
queue.reentrant_push([] { std::cout << " world"; });
queue.reentrant_push([] { std::cout << "!!!"; });
queue.reentrant_push([] { std::cout << std::endl; });
while (queue.try_reentrant_consume())
;
void reentrant_emplace ( CONSTRUCTION_PARAMS &&...  i_construction_params)
inline

Adds at the end of the queue a callable object of type ELEMENT_COMPLETE_TYPE, in-place-constructing it from a perfect forwarded parameter pack.
Note: the template argument ELEMENT_COMPLETE_TYPE can't be deduced from the parameters so it must explicitly specified.

See conc_heter_queue::reentrant_emplace for a detailed description.

/* This local struct is unmovable and uncopyable, so emplace is the only
option to add it to the queue. Note that operator () returns an int,
but we add it to a void() function queue. This is ok, as we are just
discarding the return value. */
struct Func
{
int const m_value;
Func(int i_value) : m_value(i_value) {}
Func(const Func &) = delete;
Func & operator=(const Func &) = delete;
int operator()() const
{
std::cout << m_value << std::endl;
return m_value;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
queue.template reentrant_emplace<Func>(7);
bool const invoked = queue.try_reentrant_consume();
assert(invoked);
reentrant_put_transaction<typename std::decay<ELEMENT_TYPE>::type> start_reentrant_push ( ELEMENT_TYPE &&  i_source)
inline

Begins a transaction that appends an element of type ELEMENT_TYPE, copy-constructing or move-constructing it from the source.

See conc_heter_queue::start_reentrant_push for a detailed description.

Examples

struct Func
{
const char * m_string_1;
const char * m_string_2;
void operator()()
{
std::cout << m_string_1 << std::endl;
std::cout << m_string_2 << std::endl;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
auto transaction = queue.start_reentrant_push(Func{});
// in case of exception here, since the transaction is not committed, it is discarded with no observable effects
transaction.element().m_string_1 = transaction.raw_allocate_copy("Hello world");
transaction.element().m_string_2 =
transaction.raw_allocate_copy("\t(I'm so happy)!!");
transaction.commit();
// now transaction is empty
bool const invoked = queue.try_reentrant_consume();
assert(invoked);
reentrant_put_transaction<ELEMENT_TYPE> start_reentrant_emplace ( CONSTRUCTION_PARAMS &&...  i_construction_params)
inline

Begins a transaction that appends an element of a type ELEMENT_TYPE, in-place-constructing it from a perfect forwarded parameter pack.

See conc_heter_queue::start_reentrant_emplace for a detailed description.

Examples

struct Func
{
const char * m_string_1;
const char * m_string_2;
void operator()()
{
std::cout << m_string_1 << std::endl;
std::cout << m_string_2 << std::endl;
}
};
conc_function_queue<void(), default_allocator, ERASURE> queue;
auto transaction = queue.template start_reentrant_emplace<Func>();
// in case of exception here, since the transaction is not committed, it is discarded with no observable effects
transaction.element().m_string_1 = transaction.raw_allocate_copy("Hello world");
transaction.element().m_string_2 =
transaction.raw_allocate_copy("\t(I'm so happy)!!");
transaction.commit();
bool const invoked = queue.try_reentrant_consume();
assert(invoked);
std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL> >::type try_consume ( PARAMS...  i_params)
inline

If the queue is not empty, invokes the first function object of the queue and then deletes it from the queue. Otherwise no operation is performed.

Parameters
i_params...parameters to be forwarded to the function object
Returns
If RET_VAL is void, the return value is a boolean indicating whether a callable object was consumed. Otherwise the return value is an optional that contains the value returned by the callable object, or an empty optional in case the queue was empty.

This function is not reentrant: if the callable object accesses in any way this queue, the behavior is undefined. Use conc_function_queue::try_reentrant_consume if you are not sure about what the callable object may do.

Throws: unspecified
Exception guarantee: strong (in case of exception the function has no observable effects).

int(std::vector<std::string> & vect),
ERASURE>
queue;
queue.push([](std::vector<std::string> & vect) {
vect.push_back("Hello");
return 2;
});
queue.push([](std::vector<std::string> & vect) {
vect.push_back(" world!");
return 3;
});
std::vector<std::string> strings;
int sum = 0;
while (auto const return_value = queue.try_consume(strings))
{
sum += *return_value;
}
assert(sum == 5);
for (auto const & str : strings)
std::cout << str;
std::cout << std::endl;
std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL> >::type try_consume ( consume_operation i_consume,
PARAMS...  i_params 
)
inline

If the queue is not empty, invokes the first function object of the queue and then deletes it from the queue. Otherwise no operation is performed. The consume operation is performed using the provided consume_operation object.

Parameters
i_consumeobject to use for the consume operation
i_params...parameters to be forwarded to the function object
Returns
If RET_VAL is void, the return value is a boolean indicating whether a callable object was consumed. Otherwise the return value is an optional that contains the value returned by the callable object, or an empty optional in case the queue was empty.

This function is not reentrant: if the callable object accesses in any way this queue, the behavior is undefined. Use conc_function_queue::try_reentrant_consume if you are not sure about what the callable object may do.

Throws: unspecified
Exception guarantee: strong (in case of exception the function has no observable effects).

using Queue = conc_function_queue<
int(std::vector<std::string> & vect),
ERASURE>;
Queue queue;
queue.push([](std::vector<std::string> & vect) {
vect.push_back("Hello");
return 2;
});
queue.push([](std::vector<std::string> & vect) {
vect.push_back(" world!");
return 3;
});
std::vector<std::string> strings;
// providing a cached consume_operation gives better performances
typename Queue::consume_operation consume;
int sum = 0;
while (auto const return_value = queue.try_consume(consume, strings))
{
sum += *return_value;
}
assert(sum == 5);
for (auto const & str : strings)
std::cout << str;
std::cout << std::endl;
std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL> >::type try_reentrant_consume ( PARAMS...  i_params)
inline

If the queue is not empty, invokes the first function object of the queue and then deletes it from the queue. Otherwise no operation is performed.

Parameters
i_params...parameters to be forwarded to the function object
Returns
If RET_VAL is void, the return value is a boolean indicating whether a callable object was consumed. Otherwise the return value is an optional that contains the value returned by the callable object, or an empty optional in case the queue was empty.

This function is reentrant: the callable object can access in any way this queue.

Throws: unspecified
Exception guarantee: strong (in case of exception the function has no observable effects).

conc_function_queue<void(), default_allocator, ERASURE> queue;
auto func1 = [&queue] {
std::cout << (queue.empty() ? "The queue is empty" : "The queue is not empty")
<< std::endl;
};
auto func2 = [&queue, func1] { queue.push(func1); };
queue.push(func1);
queue.push(func2);
/* The callable objects we are going to invoke will access the queue, so we
must use a reentrant consume. Note: during the invoke of the last function
the queue is empty to any observer. */
while (queue.try_reentrant_consume())
;
// Output:
// The queue is not empty
// The queue is empty
std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL> >::type try_reentrant_consume ( reentrant_consume_operation i_consume,
PARAMS...  i_params 
)
inline

If the queue is not empty, invokes the first function object of the queue and then deletes it from the queue. Otherwise no operation is performed. The consume operation is performed using the provided consume_operation object.

Parameters
i_consumeobject to use for the consume operation
i_params...parameters to be forwarded to the function object
Returns
If RET_VAL is void, the return value is a boolean indicating whether a callable object was consumed. Otherwise the return value is an optional that contains the value returned by the callable object, or an empty optional in case the queue was empty.

This function is reentrant: the callable object can access in any way this queue.

Throws: unspecified
Exception guarantee: strong (in case of exception the function has no observable effects).

conc_function_queue<void(), default_allocator, ERASURE> queue;
auto func1 = [&queue] {
std::cout << (queue.empty() ? "The queue is empty" : "The queue is not empty")
<< std::endl;
};
auto func2 = [&queue, func1] { queue.push(func1); };
queue.push(func1);
queue.push(func2);
// providing a cached consume_operation gives much better performances
typename decltype(queue)::reentrant_consume_operation consume;
/* The callable objects we are going to invoke will access the queue, so we
must use a reentrant consume. Note: during the invoke of the last function
the queue is empty to any observer. */
while (queue.try_reentrant_consume(consume))
;
// Output:
// The queue is not empty
// The queue is empty
void clear ( )
inlinenoexcept

Deletes all the callable objects in the queue. This function is disabled at compile-time if ERASURE is function_manual_clear.


Effects on iterators : all the iterators are invalidated
Throws: nothing
Complexity: linear.

conc_function_queue<int(), default_allocator, function_standard_erasure> queue;
queue.push([] { return 6; });
queue.clear();
assert(queue.empty());
bool empty ( )
inlinenoexcept

Returns whether this container is empty

Friends And Related Function Documentation

void swap ( conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE > &  i_first,
conc_function_queue< CALLABLE, ALLOCATOR_TYPE, ERASURE > &  i_second 
)
friend

Swaps two function queues.

conc_function_queue<int(), default_allocator, ERASURE> queue, queue_1;
queue.push([] { return 6; });
std::swap(queue, queue_1);
assert(queue.empty());
auto result = queue_1.try_consume();
assert(result && *result == 6);

Member Data Documentation

constexpr bool concurrent_puts = true
static

Whether multiple threads can do put operations on the same queue without any further synchronization.

constexpr bool concurrent_consumes = true
static

Whether multiple threads can do consume operations on the same queue without any further synchronization.

constexpr bool concurrent_put_consumes = true
static

Whether puts and consumes can be done concurrently without any further synchronization. In any case unsynchronized concurrency is constrained by concurrent_puts and concurrent_consumes.

constexpr bool is_seq_cst = true
static

Whether this queue is sequential consistent.


The documentation for this class was generated from the following file: