Back out some parallel_for_each features
Now that the DWARF reader does not use parallel_for_each, we can remove some of the features that were added just for it: return values and task sizing. The thread_pool typed tasks feature could also be removed, but I haven't done so here. This one seemed less intrusive and perhaps more likely to be needed at some point.
This commit is contained in:
parent
667ed4b14d
commit
afdd136635
2 changed files with 30 additions and 251 deletions
|
@ -120,34 +120,6 @@ TEST (int n_threads)
|
|||
});
|
||||
SELF_CHECK (counter == 0);
|
||||
|
||||
auto task_size_max_ = [] (int iter)
|
||||
{
|
||||
return (size_t)SIZE_MAX;
|
||||
};
|
||||
auto task_size_max = gdb::make_function_view (task_size_max_);
|
||||
|
||||
counter = 0;
|
||||
FOR_EACH (1, 0, NUMBER,
|
||||
[&] (int start, int end)
|
||||
{
|
||||
counter += end - start;
|
||||
}, task_size_max);
|
||||
SELF_CHECK (counter == NUMBER);
|
||||
|
||||
auto task_size_one_ = [] (int iter)
|
||||
{
|
||||
return (size_t)1;
|
||||
};
|
||||
auto task_size_one = gdb::make_function_view (task_size_one_);
|
||||
|
||||
counter = 0;
|
||||
FOR_EACH (1, 0, NUMBER,
|
||||
[&] (int start, int end)
|
||||
{
|
||||
counter += end - start;
|
||||
}, task_size_one);
|
||||
SELF_CHECK (counter == NUMBER);
|
||||
|
||||
#undef NUMBER
|
||||
|
||||
/* Check that if there are fewer tasks than threads, then we won't
|
||||
|
@ -169,25 +141,6 @@ TEST (int n_threads)
|
|||
{
|
||||
return entry != nullptr;
|
||||
}));
|
||||
|
||||
/* The same but using the task size parameter. */
|
||||
intresults.clear ();
|
||||
any_empty_tasks = false;
|
||||
FOR_EACH (1, 0, 1,
|
||||
[&] (int start, int end)
|
||||
{
|
||||
if (start == end)
|
||||
any_empty_tasks = true;
|
||||
return std::make_unique<int> (end - start);
|
||||
},
|
||||
task_size_one);
|
||||
SELF_CHECK (!any_empty_tasks);
|
||||
SELF_CHECK (std::all_of (intresults.begin (),
|
||||
intresults.end (),
|
||||
[] (const std::unique_ptr<int> &entry)
|
||||
{
|
||||
return entry != nullptr;
|
||||
}));
|
||||
}
|
||||
|
||||
#endif /* FOR_EACH */
|
||||
|
|
|
@ -28,104 +28,6 @@
|
|||
namespace gdb
|
||||
{
|
||||
|
||||
namespace detail
|
||||
{
|
||||
|
||||
/* This is a helper class that is used to accumulate results for
|
||||
parallel_for. There is a specialization for 'void', below. */
|
||||
template<typename T>
|
||||
struct par_for_accumulator
|
||||
{
|
||||
public:
|
||||
|
||||
explicit par_for_accumulator (size_t n_threads)
|
||||
: m_futures (n_threads)
|
||||
{
|
||||
}
|
||||
|
||||
/* The result type that is accumulated. */
|
||||
typedef std::vector<T> result_type;
|
||||
|
||||
/* Post the Ith task to a background thread, and store a future for
|
||||
later. */
|
||||
void post (size_t i, std::function<T ()> task)
|
||||
{
|
||||
m_futures[i]
|
||||
= gdb::thread_pool::g_thread_pool->post_task (std::move (task));
|
||||
}
|
||||
|
||||
/* Invoke TASK in the current thread, then compute all the results
|
||||
from all background tasks and put them into a result vector,
|
||||
which is returned. */
|
||||
result_type finish (gdb::function_view<T ()> task)
|
||||
{
|
||||
result_type result (m_futures.size () + 1);
|
||||
|
||||
result.back () = task ();
|
||||
|
||||
for (size_t i = 0; i < m_futures.size (); ++i)
|
||||
result[i] = m_futures[i].get ();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Resize the results to N. */
|
||||
void resize (size_t n)
|
||||
{
|
||||
m_futures.resize (n);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
/* A vector of futures coming from the tasks run in the
|
||||
background. */
|
||||
std::vector<gdb::future<T>> m_futures;
|
||||
};
|
||||
|
||||
/* See the generic template. */
|
||||
template<>
|
||||
struct par_for_accumulator<void>
|
||||
{
|
||||
public:
|
||||
|
||||
explicit par_for_accumulator (size_t n_threads)
|
||||
: m_futures (n_threads)
|
||||
{
|
||||
}
|
||||
|
||||
/* This specialization does not compute results. */
|
||||
typedef void result_type;
|
||||
|
||||
void post (size_t i, std::function<void ()> task)
|
||||
{
|
||||
m_futures[i]
|
||||
= gdb::thread_pool::g_thread_pool->post_task (std::move (task));
|
||||
}
|
||||
|
||||
result_type finish (gdb::function_view<void ()> task)
|
||||
{
|
||||
task ();
|
||||
|
||||
for (auto &future : m_futures)
|
||||
{
|
||||
/* Use 'get' and not 'wait', to propagate any exception. */
|
||||
future.get ();
|
||||
}
|
||||
}
|
||||
|
||||
/* Resize the results to N. */
|
||||
void resize (size_t n)
|
||||
{
|
||||
m_futures.resize (n);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::vector<gdb::future<void>> m_futures;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
/* A very simple "parallel for". This splits the range of iterators
|
||||
into subranges, and then passes each subrange to the callback. The
|
||||
work may or may not be done in separate threads.
|
||||
|
@ -136,23 +38,13 @@ private:
|
|||
|
||||
The parameter N says how batching ought to be done -- there will be
|
||||
at least N elements processed per thread. Setting N to 0 is not
|
||||
allowed.
|
||||
|
||||
If the function returns a non-void type, then a vector of the
|
||||
results is returned. The size of the resulting vector depends on
|
||||
the number of threads that were used. */
|
||||
allowed. */
|
||||
|
||||
template<class RandomIt, class RangeFunction>
|
||||
typename gdb::detail::par_for_accumulator<
|
||||
typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type
|
||||
>::result_type
|
||||
void
|
||||
parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
||||
RangeFunction callback,
|
||||
gdb::function_view<size_t(RandomIt)> task_size = nullptr)
|
||||
RangeFunction callback)
|
||||
{
|
||||
using result_type
|
||||
= typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type;
|
||||
|
||||
/* If enabled, print debug info about how the work is distributed across
|
||||
the threads. */
|
||||
const bool parallel_for_each_debug = false;
|
||||
|
@ -162,87 +54,37 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
size_t n_elements = last - first;
|
||||
size_t elts_per_thread = 0;
|
||||
size_t elts_left_over = 0;
|
||||
size_t total_size = 0;
|
||||
size_t size_per_thread = 0;
|
||||
size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements;
|
||||
|
||||
if (n_threads > 1)
|
||||
{
|
||||
if (task_size != nullptr)
|
||||
{
|
||||
gdb_assert (n == 1);
|
||||
for (RandomIt i = first; i != last; ++i)
|
||||
{
|
||||
size_t element_size = task_size (i);
|
||||
gdb_assert (element_size > 0);
|
||||
if (element_size > max_element_size)
|
||||
/* We could start scaling here, but that doesn't seem to be
|
||||
worth the effort. */
|
||||
element_size = max_element_size;
|
||||
size_t prev_total_size = total_size;
|
||||
total_size += element_size;
|
||||
/* Check for overflow. */
|
||||
gdb_assert (prev_total_size < total_size);
|
||||
}
|
||||
size_per_thread = total_size / n_threads;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Require that there should be at least N elements in a
|
||||
thread. */
|
||||
gdb_assert (n > 0);
|
||||
if (n_elements / n_threads < n)
|
||||
n_threads = std::max (n_elements / n, (size_t) 1);
|
||||
elts_per_thread = n_elements / n_threads;
|
||||
elts_left_over = n_elements % n_threads;
|
||||
/* n_elements == n_threads * elts_per_thread + elts_left_over. */
|
||||
}
|
||||
/* Require that there should be at least N elements in a
|
||||
thread. */
|
||||
gdb_assert (n > 0);
|
||||
if (n_elements / n_threads < n)
|
||||
n_threads = std::max (n_elements / n, (size_t) 1);
|
||||
elts_per_thread = n_elements / n_threads;
|
||||
elts_left_over = n_elements % n_threads;
|
||||
/* n_elements == n_threads * elts_per_thread + elts_left_over. */
|
||||
}
|
||||
|
||||
size_t count = n_threads == 0 ? 0 : n_threads - 1;
|
||||
gdb::detail::par_for_accumulator<result_type> results (count);
|
||||
std::vector<gdb::future<void>> results;
|
||||
|
||||
if (parallel_for_each_debug)
|
||||
{
|
||||
debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements);
|
||||
if (task_size != nullptr)
|
||||
{
|
||||
debug_printf (_("Parallel for: total_size: %zu\n"), total_size);
|
||||
debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread);
|
||||
}
|
||||
else
|
||||
{
|
||||
debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
|
||||
debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
|
||||
}
|
||||
debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
|
||||
debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
|
||||
}
|
||||
|
||||
size_t remaining_size = total_size;
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
RandomIt end;
|
||||
size_t chunk_size = 0;
|
||||
if (task_size == nullptr)
|
||||
{
|
||||
end = first + elts_per_thread;
|
||||
if (i < elts_left_over)
|
||||
/* Distribute the leftovers over the worker threads, to avoid having
|
||||
to handle all of them in a single thread. */
|
||||
end++;
|
||||
}
|
||||
else
|
||||
{
|
||||
RandomIt j;
|
||||
for (j = first; j < last && chunk_size < size_per_thread; ++j)
|
||||
{
|
||||
size_t element_size = task_size (j);
|
||||
if (element_size > max_element_size)
|
||||
element_size = max_element_size;
|
||||
chunk_size += element_size;
|
||||
}
|
||||
end = j;
|
||||
remaining_size -= chunk_size;
|
||||
}
|
||||
end = first + elts_per_thread;
|
||||
if (i < elts_left_over)
|
||||
/* Distribute the leftovers over the worker threads, to avoid having
|
||||
to handle all of them in a single thread. */
|
||||
end++;
|
||||
|
||||
/* This case means we don't have enough elements to really
|
||||
distribute them. Rather than ever submit a task that does
|
||||
|
@ -257,7 +99,6 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
the result list here. This avoids submitting empty tasks
|
||||
to the thread pool. */
|
||||
count = i;
|
||||
results.resize (count);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -265,12 +106,12 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
{
|
||||
debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
|
||||
i, (size_t)(end - first));
|
||||
if (task_size != nullptr)
|
||||
debug_printf (_("\t(size: %zu)"), chunk_size);
|
||||
debug_printf (_("\n"));
|
||||
}
|
||||
results.post (i, [=] ()
|
||||
{ return callback (first, end); });
|
||||
results.push_back (gdb::thread_pool::g_thread_pool->post_task ([=] ()
|
||||
{
|
||||
return callback (first, end);
|
||||
}));
|
||||
first = end;
|
||||
}
|
||||
|
||||
|
@ -278,8 +119,6 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
if (parallel_for_each_debug)
|
||||
{
|
||||
debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i);
|
||||
if (task_size != nullptr)
|
||||
debug_printf (_("\t(size: 0)"));
|
||||
debug_printf (_("\n"));
|
||||
}
|
||||
|
||||
|
@ -288,14 +127,12 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
{
|
||||
debug_printf (_("Parallel for: elements on main thread\t\t: %zu"),
|
||||
(size_t)(last - first));
|
||||
if (task_size != nullptr)
|
||||
debug_printf (_("\t(size: %zu)"), remaining_size);
|
||||
debug_printf (_("\n"));
|
||||
}
|
||||
return results.finish ([=] ()
|
||||
{
|
||||
return callback (first, last);
|
||||
});
|
||||
callback (first, last);
|
||||
|
||||
for (auto &fut : results)
|
||||
fut.get ();
|
||||
}
|
||||
|
||||
/* A sequential drop-in replacement of parallel_for_each. This can be useful
|
||||
|
@ -303,22 +140,11 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
|
|||
multi-threading in a fine-grained way. */
|
||||
|
||||
template<class RandomIt, class RangeFunction>
|
||||
typename gdb::detail::par_for_accumulator<
|
||||
typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type
|
||||
>::result_type
|
||||
void
|
||||
sequential_for_each (unsigned n, RandomIt first, RandomIt last,
|
||||
RangeFunction callback,
|
||||
gdb::function_view<size_t(RandomIt)> task_size = nullptr)
|
||||
RangeFunction callback)
|
||||
{
|
||||
using result_type = typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type;
|
||||
|
||||
gdb::detail::par_for_accumulator<result_type> results (0);
|
||||
|
||||
/* Process all the remaining elements in the main thread. */
|
||||
return results.finish ([=] ()
|
||||
{
|
||||
return callback (first, last);
|
||||
});
|
||||
callback (first, last);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue