density
C++11 library for paged memory management, function queues, heterogeneous queues and lifo memory management
conc_function_queue.h
1 
2 // Copyright Giuseppe Campana (giu.campana@gmail.com) 2016-2018.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #pragma once
8 #include <density/conc_heter_queue.h>
9 #include <density/detail/function_runtime_type.h>
10 
11 namespace density
12 {
13  template <
14  typename CALLABLE,
15  typename ALLOCATOR_TYPE = default_allocator,
18 
46 #ifndef DOXYGEN_DOC_GENERATION
47  template <
48  typename RET_VAL,
49  typename... PARAMS,
50  typename ALLOCATOR_TYPE,
51  function_type_erasure ERASURE>
52  class conc_function_queue<RET_VAL(PARAMS...), ALLOCATOR_TYPE, ERASURE>
53 #else
54  template <
55  typename CALLABLE,
56  typename ALLOCATOR_TYPE = default_allocator,
59 #endif
60  {
61  private:
63  detail::FunctionRuntimeType<ERASURE, RET_VAL(PARAMS...)>,
64  ALLOCATOR_TYPE>;
65  UnderlyingQueue m_queue;
66 
67  public:
69  static constexpr bool concurrent_puts = true;
70 
72  static constexpr bool concurrent_consumes = true;
73 
76  static constexpr bool concurrent_put_consumes = true;
77 
79  static constexpr bool is_seq_cst = true;
80 
84  conc_function_queue() noexcept = default;
85 
89  conc_function_queue(conc_function_queue && i_source) noexcept = default;
90 
94  conc_function_queue & operator=(conc_function_queue && i_source) noexcept = default;
95 
99  friend void swap(conc_function_queue & i_first, conc_function_queue & i_second) noexcept
100  {
101  using std::swap;
102  swap(i_first.m_queue, i_second.m_queue);
103  }
104 
107  {
108  auto erasure = ERASURE;
109  if (erasure == function_manual_clear)
110  {
112  }
113  }
114 
116  template <typename ELEMENT_COMPLETE_TYPE>
117  using put_transaction =
118  typename UnderlyingQueue::template put_transaction<ELEMENT_COMPLETE_TYPE>;
119 
121  template <typename ELEMENT_COMPLETE_TYPE>
123  typename UnderlyingQueue::template reentrant_put_transaction<ELEMENT_COMPLETE_TYPE>;
124 
126  using consume_operation = typename UnderlyingQueue::consume_operation;
127 
129  using reentrant_consume_operation = typename UnderlyingQueue::reentrant_consume_operation;
130 
138  template <typename ELEMENT_COMPLETE_TYPE> void push(ELEMENT_COMPLETE_TYPE && i_source)
139  {
140  m_queue.push(std::forward<ELEMENT_COMPLETE_TYPE>(i_source));
141  }
142 
150  template <typename ELEMENT_COMPLETE_TYPE, typename... CONSTRUCTION_PARAMS>
151  void emplace(CONSTRUCTION_PARAMS &&... i_construction_params)
152  {
153  m_queue.template emplace<ELEMENT_COMPLETE_TYPE>(
154  std::forward<CONSTRUCTION_PARAMS>(i_construction_params)...);
155  }
156 
164  template <typename ELEMENT_TYPE>
166  start_push(ELEMENT_TYPE && i_source)
167  {
168  return m_queue.start_push(std::forward<ELEMENT_TYPE>(i_source));
169  }
170 
171 
179  template <typename ELEMENT_TYPE, typename... CONSTRUCTION_PARAMS>
180  put_transaction<ELEMENT_TYPE> start_emplace(CONSTRUCTION_PARAMS &&... i_construction_params)
181  {
182  return m_queue.template start_emplace<ELEMENT_TYPE>(
183  std::forward<ELEMENT_TYPE>(i_construction_params)...);
184  }
185 
191  template <typename ELEMENT_COMPLETE_TYPE>
192  void reentrant_push(ELEMENT_COMPLETE_TYPE && i_source)
193  {
194  m_queue.reentrant_push(std::forward<ELEMENT_COMPLETE_TYPE>(i_source));
195  }
196 
204  template <typename ELEMENT_COMPLETE_TYPE, typename... CONSTRUCTION_PARAMS>
205  void reentrant_emplace(CONSTRUCTION_PARAMS &&... i_construction_params)
206  {
207  m_queue.template reentrant_emplace<ELEMENT_COMPLETE_TYPE>(
208  std::forward<CONSTRUCTION_PARAMS>(i_construction_params)...);
209  }
210 
218  template <typename ELEMENT_TYPE>
220  start_reentrant_push(ELEMENT_TYPE && i_source)
221  {
222  return m_queue.start_reentrant_push(std::forward<ELEMENT_TYPE>(i_source));
223  }
224 
225 
233  template <typename ELEMENT_TYPE, typename... CONSTRUCTION_PARAMS>
235  start_reentrant_emplace(CONSTRUCTION_PARAMS &&... i_construction_params)
236  {
237  return m_queue.template start_reentrant_emplace<ELEMENT_TYPE>(
238  std::forward<ELEMENT_TYPE>(i_construction_params)...);
239  }
240 
256  typename std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL>>::type
257  try_consume(PARAMS... i_params)
258  {
259  return try_consume_impl(std::is_void<RET_VAL>(), std::forward<PARAMS>(i_params)...);
260  }
261 
279  typename std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL>>::type
280  try_consume(consume_operation & i_consume, PARAMS... i_params)
281  {
282  return try_consume_impl_cached(
283  std::is_void<RET_VAL>(), i_consume, std::forward<PARAMS>(i_params)...);
284  }
285 
300  typename std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL>>::type
301  try_reentrant_consume(PARAMS... i_params)
302  {
303  return try_reentrant_consume_impl(
304  std::is_void<RET_VAL>(), std::forward<PARAMS>(i_params)...);
305  }
306 
323  typename std::conditional<std::is_void<RET_VAL>::value, bool, optional<RET_VAL>>::type
324  try_reentrant_consume(reentrant_consume_operation & i_consume, PARAMS... i_params)
325  {
326  return try_reentrant_consume_impl_cached(
327  std::is_void<RET_VAL>(), i_consume, std::forward<PARAMS>(i_params)...);
328  }
329 
337  template <
338  function_type_erasure ERASURE_ = ERASURE,
339  typename std::enable_if<ERASURE_ != function_manual_clear>::type * = nullptr>
340  void clear() noexcept
341  {
342  auto erasure = ERASURE;
343  if (erasure == function_manual_clear)
344  {
345  DENSITY_ASSUME(false);
346  }
347  else
348  {
349  m_queue.clear();
350  }
351  }
352 
354  bool empty() noexcept { return m_queue.empty(); }
355 
356  private:
358  optional<RET_VAL> try_consume_impl(std::false_type, PARAMS... i_params)
359  {
360  if (auto cons = m_queue.try_start_consume())
361  {
362  auto && result = cons.complete_type().align_invoke_destroy(
363  cons.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
364  cons.commit_nodestroy();
365  return optional<RET_VAL>(std::move(result));
366  }
367  else
368  {
369  return optional<RET_VAL>();
370  }
371  }
372 
374  bool try_consume_impl(std::true_type, PARAMS... i_params)
375  {
376  if (auto cons = m_queue.try_start_consume())
377  {
378  cons.complete_type().align_invoke_destroy(
379  cons.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
380  cons.commit_nodestroy();
381  return true;
382  }
383  else
384  {
385  return false;
386  }
387  }
388 
390  optional<RET_VAL> try_consume_impl_cached(
391  std::false_type, consume_operation & i_consume, PARAMS... i_params)
392  {
393  if (m_queue.try_start_consume(i_consume))
394  {
395  auto && result = i_consume.complete_type().align_invoke_destroy(
396  i_consume.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
397  i_consume.commit_nodestroy();
398  return optional<RET_VAL>(std::move(result));
399  }
400  else
401  {
402  return optional<RET_VAL>();
403  }
404  }
405 
407  bool
408  try_consume_impl_cached(std::true_type, consume_operation & i_consume, PARAMS... i_params)
409  {
410  if (m_queue.try_start_consume(i_consume))
411  {
412  i_consume.complete_type().align_invoke_destroy(
413  i_consume.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
414  i_consume.commit_nodestroy();
415  return true;
416  }
417  else
418  {
419  return false;
420  }
421  }
422 
424  optional<RET_VAL> try_reentrant_consume_impl(std::false_type, PARAMS... i_params)
425  {
426  if (auto cons = m_queue.try_start_reentrant_consume())
427  {
428  auto && result = cons.complete_type().align_invoke_destroy(
429  cons.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
430  cons.commit_nodestroy();
431  return optional<RET_VAL>(std::move(result));
432  }
433  else
434  {
435  return optional<RET_VAL>();
436  }
437  }
438 
440  bool try_reentrant_consume_impl(std::true_type, PARAMS... i_params)
441  {
442  if (auto cons = m_queue.try_start_reentrant_consume())
443  {
444  cons.complete_type().align_invoke_destroy(
445  cons.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
446  cons.commit_nodestroy();
447  return true;
448  }
449  else
450  {
451  return false;
452  }
453  }
454 
456  optional<RET_VAL> try_reentrant_consume_impl_cached(
457  std::false_type, reentrant_consume_operation & i_consume, PARAMS... i_params)
458  {
459  if (m_queue.try_start_reentrant_consume(i_consume))
460  {
461  auto && result = i_consume.complete_type().align_invoke_destroy(
462  i_consume.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
463  i_consume.commit_nodestroy();
464  return optional<RET_VAL>(std::move(result));
465  }
466  else
467  {
468  return optional<RET_VAL>();
469  }
470  }
471 
473  bool try_reentrant_consume_impl_cached(
474  std::true_type, reentrant_consume_operation & i_consume, PARAMS... i_params)
475  {
476  if (m_queue.try_start_reentrant_consume(i_consume))
477  {
478  i_consume.complete_type().align_invoke_destroy(
479  i_consume.unaligned_element_ptr(), std::forward<PARAMS>(i_params)...);
480  i_consume.commit_nodestroy();
481  return true;
482  }
483  else
484  {
485  return false;
486  }
487  }
488  };
489 
490 } // namespace density
basic_default_allocator< default_page_capacity > default_allocator
Definition: default_allocator.h:152
reentrant_put_transaction< ELEMENT_TYPE > start_reentrant_emplace(CONSTRUCTION_PARAMS &&...i_construction_params)
Definition: conc_function_queue.h:235
Definition: conc_function_queue.h:11
reentrant_put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_reentrant_push(ELEMENT_TYPE &&i_source)
Definition: conc_heter_queue.h:1652
void reentrant_push(ELEMENT_TYPE &&i_source)
Definition: conc_heter_queue.h:1597
static constexpr bool concurrent_put_consumes
Definition: conc_function_queue.h:76
void emplace(CONSTRUCTION_PARAMS &&...i_construction_params)
Definition: conc_function_queue.h:151
put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_push(ELEMENT_TYPE &&i_source)
Definition: conc_heter_queue.h:867
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_consume(PARAMS...i_params)
Definition: conc_function_queue.h:257
bool empty() noexcept
Definition: conc_function_queue.h:354
reentrant_consume_operation try_start_reentrant_consume() noexcept
Definition: conc_heter_queue.h:1760
typename UnderlyingQueue::reentrant_consume_operation reentrant_consume_operation
Definition: conc_function_queue.h:129
void reentrant_push(ELEMENT_COMPLETE_TYPE &&i_source)
Definition: conc_function_queue.h:192
void clear() noexcept
Definition: conc_function_queue.h:340
#define DENSITY_ASSERT(...)
Definition: density_config.h:19
Definition: conc_function_queue.h:17
static constexpr bool is_seq_cst
Definition: conc_function_queue.h:79
put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_push(ELEMENT_TYPE &&i_source)
Definition: conc_function_queue.h:166
typename UnderlyingQueue::template put_transaction< ELEMENT_COMPLETE_TYPE > put_transaction
Definition: conc_function_queue.h:118
Definition: density_common.h:99
void reentrant_emplace(CONSTRUCTION_PARAMS &&...i_construction_params)
Definition: conc_function_queue.h:205
static constexpr bool concurrent_consumes
Definition: conc_function_queue.h:72
typename UnderlyingQueue::template reentrant_put_transaction< ELEMENT_COMPLETE_TYPE > reentrant_put_transaction
Definition: conc_function_queue.h:123
Definition: density_common.h:101
static constexpr bool concurrent_puts
Definition: conc_function_queue.h:69
typename UnderlyingQueue::consume_operation consume_operation
Definition: conc_function_queue.h:126
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_reentrant_consume(reentrant_consume_operation &i_consume, PARAMS...i_params)
Definition: conc_function_queue.h:324
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_consume(consume_operation &i_consume, PARAMS...i_params)
Definition: conc_function_queue.h:280
put_transaction< ELEMENT_TYPE > start_emplace(CONSTRUCTION_PARAMS &&...i_construction_params)
Definition: conc_function_queue.h:180
builtin_optional< TYPE > optional
Definition: density_config.h:218
std::conditional< std::is_void< RET_VAL >::value, bool, optional< RET_VAL > >::type try_reentrant_consume(PARAMS...i_params)
Definition: conc_function_queue.h:301
void push(ELEMENT_TYPE &&i_source)
Definition: conc_heter_queue.h:749
~conc_function_queue()
Definition: conc_function_queue.h:106
consume_operation try_start_consume() noexcept
Definition: conc_heter_queue.h:1029
function_type_erasure
Definition: density_common.h:97
reentrant_put_transaction< typename std::decay< ELEMENT_TYPE >::type > start_reentrant_push(ELEMENT_TYPE &&i_source)
Definition: conc_function_queue.h:220
#define DENSITY_ASSUME(bool_expr,...)
Definition: density_config.h:46
void clear() noexcept
Definition: conc_heter_queue.h:206
friend void swap(conc_function_queue &i_first, conc_function_queue &i_second) noexcept
Definition: conc_function_queue.h:99
bool empty() const noexcept
Definition: conc_heter_queue.h:193
void push(ELEMENT_COMPLETE_TYPE &&i_source)
Definition: conc_function_queue.h:138
conc_function_queue() noexcept=default