src: remove threadpool and use taskflow instead

This commit is contained in:
0xFFFC0000 2025-02-08 16:20:02 +00:00
parent 90359e31fd
commit 31b43ec80a
14 changed files with 79 additions and 522 deletions

3
.gitmodules vendored
View File

@ -14,3 +14,6 @@
[submodule "external/gtest"]
path = external/gtest
url = https://github.com/google/googletest.git
[submodule "external/taskflow"]
path = external/taskflow
url = https://github.com/taskflow/taskflow.git

1
external/taskflow vendored Submodule

@ -0,0 +1 @@
Subproject commit 130f7952469c01eef8d7b635710bf9a8043f3172

View File

@ -42,7 +42,6 @@ set(common_sources
perf_timer.cpp
pruning.cpp
spawn.cpp
threadpool.cpp
updates.cpp
aligned.c
timings.cc

View File

@ -36,5 +36,4 @@ namespace tools
struct login;
class password_container;
class t_http_connection;
class threadpool;
}

View File

@ -35,12 +35,14 @@
#include <set>
#include <stdlib.h>
#include "include_base_utils.h"
#include "common/threadpool.h"
#include "crypto/crypto.h"
#include <boost/thread/mutex.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/optional.hpp>
#include <boost/utility/string_ref.hpp>
#include <boost/thread/thread.hpp>
#include <taskflow/taskflow/taskflow.hpp>
using namespace epee;
#undef MONERO_DEFAULT_LOG_CATEGORY
@ -487,17 +489,17 @@ bool load_txt_records_from_dns(std::vector<std::string> &good_records, const std
// send all requests in parallel
std::deque<bool> avail(dns_urls.size(), false), valid(dns_urls.size(), false);
tools::threadpool& tpool = tools::threadpool::getInstanceForIO();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
for (size_t n = 0; n < dns_urls.size(); ++n)
{
tpool.submit(&waiter,[n, dns_urls, &records, &avail, &valid](){
executor.run(taskflow, [n, dns_urls, &records, &avail, &valid](){
const auto res = tools::DNSResolver::instance().get_txt_record(dns_urls[n], avail[n], valid[n]);
for (const auto &s: res)
records[n].insert(s);
});
}
waiter.wait();
executor.wait_for_all();
size_t cur_index = first_index;
do

View File

@ -1,180 +0,0 @@
// Copyright (c) 2017-2024, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "misc_log_ex.h"
#include "common/threadpool.h"
#include "cryptonote_config.h"
#include "common/util.h"
static __thread int depth = 0;
static __thread bool is_leaf = false;
namespace tools
{
threadpool::threadpool(unsigned int max_threads) : running(true), active(0) {
create(max_threads);
}
threadpool::~threadpool() {
destroy();
}
void threadpool::destroy() {
try
{
const boost::unique_lock<boost::mutex> lock(mutex);
running = false;
has_work.notify_all();
}
catch (...)
{
// if the lock throws, we're just do it without a lock and hope,
// since the alternative is terminate
running = false;
has_work.notify_all();
}
for (size_t i = 0; i<threads.size(); i++) {
try { threads[i].join(); }
catch (...) { /* ignore */ }
}
threads.clear();
}
void threadpool::recycle() {
destroy();
create(max);
}
void threadpool::create(unsigned int max_threads) {
const boost::unique_lock<boost::mutex> lock(mutex);
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
max = max_threads ? max_threads : tools::get_max_concurrency();
size_t i = max ? max - 1 : 0;
running = true;
while(i--) {
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false)));
}
}
void threadpool::submit(waiter *obj, std::function<void()> f, bool leaf) {
CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool");
boost::unique_lock<boost::mutex> lock(mutex);
if (!leaf && ((active == max && !queue.empty()) || depth > 0)) {
// if all available threads are already running
// and there's work waiting, just run in current thread
lock.unlock();
++depth;
is_leaf = leaf;
f();
--depth;
is_leaf = false;
} else {
if (obj)
obj->inc();
if (leaf)
queue.push_front({obj, f, leaf});
else
queue.push_back({obj, f, leaf});
has_work.notify_one();
}
}
unsigned int threadpool::get_max_concurrency() const {
return max;
}
threadpool::waiter::~waiter()
{
try
{
boost::unique_lock<boost::mutex> lock(mt);
if (num)
MERROR("wait should have been called before waiter dtor - waiting now");
}
catch (...) { /* ignore */ }
try
{
wait();
}
catch (const std::exception &e)
{
/* ignored */
}
}
bool threadpool::waiter::wait() {
pool.run(true);
boost::unique_lock<boost::mutex> lock(mt);
while(num)
cv.wait(lock);
return !error();
}
void threadpool::waiter::inc() {
const boost::unique_lock<boost::mutex> lock(mt);
num++;
}
void threadpool::waiter::dec() {
const boost::unique_lock<boost::mutex> lock(mt);
num--;
if (!num)
cv.notify_all();
}
void threadpool::run(bool flush) {
boost::unique_lock<boost::mutex> lock(mutex);
while (running) {
entry e;
while(queue.empty() && running)
{
if (flush)
return;
has_work.wait(lock);
}
if (!running) break;
active++;
e = std::move(queue.front());
queue.pop_front();
lock.unlock();
++depth;
is_leaf = e.leaf;
try { e.f(); }
catch (const std::exception &ex) { e.wo->set_error(); try { MERROR("Exception in threadpool job: " << ex.what()); } catch (...) {} }
--depth;
is_leaf = false;
if (e.wo)
e.wo->dec();
lock.lock();
active--;
}
}
}

View File

@ -1,107 +0,0 @@
// Copyright (c) 2017-2024, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <cstddef>
#include <deque>
#include <functional>
#include <utility>
#include <vector>
#include <stdexcept>
namespace tools
{
//! A global thread pool
class threadpool
{
public:
static threadpool& getInstanceForCompute() {
static threadpool instance;
return instance;
}
static threadpool& getInstanceForIO() {
static threadpool instance(8);
return instance;
}
static threadpool *getNewForUnitTests(unsigned max_threads = 0) {
return new threadpool(max_threads);
}
// The waiter lets the caller know when all of its
// tasks are completed.
class waiter {
boost::mutex mt;
boost::condition_variable cv;
threadpool &pool;
int num;
bool error_flag;
public:
void inc();
void dec();
bool wait(); //! Wait for a set of tasks to finish, returns false iff any error
void set_error() noexcept { error_flag = true; }
bool error() const noexcept { return error_flag; }
waiter(threadpool &pool) : pool(pool), num(0), error_flag(false) {}
~waiter();
};
// Submit a task to the pool. The waiter pointer may be
// NULL if the caller doesn't care to wait for the
// task to finish.
void submit(waiter *waiter, std::function<void()> f, bool leaf = false);
// destroy and recreate threads
void recycle();
unsigned int get_max_concurrency() const;
~threadpool();
private:
threadpool(unsigned int max_threads = 0);
void destroy();
void create(unsigned int max_threads);
typedef struct entry {
waiter *wo;
std::function<void()> f;
bool leaf;
} entry;
std::deque<entry> queue;
boost::condition_variable has_work;
boost::mutex mutex;
std::vector<boost::thread> threads;
unsigned int active;
unsigned int max;
bool running;
void run(bool flush = false);
};
}

View File

@ -34,6 +34,7 @@
#include <boost/filesystem.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/format.hpp>
#include <taskflow/taskflow/taskflow.hpp>
#include "include_base_utils.h"
#include "cryptonote_basic/cryptonote_basic_impl.h"
@ -48,7 +49,6 @@
#include "profile_tools.h"
#include "file_io_utils.h"
#include "int-util.h"
#include "common/threadpool.h"
#include "warnings.h"
#include "crypto/hash.h"
#include "cryptonote_core.h"
@ -3384,9 +3384,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
std::vector < uint64_t > results;
results.resize(tx.vin.size(), 0);
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
int threads = tpool.get_max_concurrency();
int threads = std::thread::hardware_concurrency();
tf::Executor executor(threads);
tf::Taskflow taskflow;
uint64_t max_used_block_height = 0;
if (!pmax_used_block_height)
@ -3433,7 +3433,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
{
// ND: Speedup
// 1. Thread ring signature verification if possible.
tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])), true);
executor.run(taskflow, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
}
else
{
@ -3455,8 +3455,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
sig_index++;
}
if (tx.version == 1 && threads > 1)
if (!waiter.wait())
return false;
executor.wait_for_all();
// enforce min output age
if (hf_version >= HF_VERSION_ENFORCE_MIN_AGE)
@ -4925,8 +4924,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
return true;
bool blocks_exist = false;
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
unsigned threads = tpool.get_max_concurrency();
unsigned threads = std::thread::hardware_concurrency();
tf::Executor executor(threads);
blocks.resize(blocks_entry.size());
if (1)
@ -4988,7 +4987,7 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
{
m_blocks_longhash_table.clear();
uint64_t thread_height = height;
tools::threadpool::waiter waiter(tpool);
tf::Taskflow taskflow;
m_prepare_height = height;
m_prepare_nblocks = blocks_entry.size();
m_prepare_blocks = &blocks;
@ -4999,12 +4998,11 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
++nblocks;
if (nblocks == 0)
break;
tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, epee::span<const block>(&blocks[thread_height - height], nblocks), std::ref(maps[i])), true);
executor.run(taskflow, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, epee::span<const block>(&blocks[thread_height - height], nblocks), std::ref(maps[i])));
thread_height += nblocks;
}
if (!waiter.wait())
return false;
executor.wait_for_all();
m_prepare_height = 0;
if (m_cancel)
@ -5132,21 +5130,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::vector<block_complete
}
// gather all the output keys
threads = tpool.get_max_concurrency();
threads = std::thread::hardware_concurrency();
if (!m_db->can_thread_bulk_indices())
threads = 1;
if (threads > 1 && amounts.size() > 1)
{
tools::threadpool::waiter waiter(tpool);
tf::Taskflow taskflow;
for (size_t i = 0; i < amounts.size(); i++)
{
uint64_t amount = amounts[i];
tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount])), true);
executor.run(taskflow, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount])));
}
if (!waiter.wait())
return false;
executor.wait_for_all();
}
else
{

View File

@ -39,7 +39,6 @@ using namespace epee;
#include "common/util.h"
#include "common/updates.h"
#include "common/download.h"
#include "common/threadpool.h"
#include "common/command_line.h"
#include "cryptonote_basic/events.h"
#include "warnings.h"
@ -58,6 +57,7 @@ using namespace epee;
#include "version.h"
#include <boost/filesystem.hpp>
#include <taskflow/taskflow/taskflow.hpp>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "cn"
@ -1009,11 +1009,11 @@ namespace cryptonote
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
epee::span<tx_blob_entry>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
tpool.submit(&waiter, [&, i, it] {
executor.run(taskflow, [&, i, it] {
try
{
results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash);
@ -1026,8 +1026,7 @@ namespace cryptonote
}
});
}
if (!waiter.wait())
return false;
executor.wait_for_all();
it = tx_blobs.begin();
std::vector<bool> already_have(tx_blobs.size(), false);
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
@ -1045,7 +1044,7 @@ namespace cryptonote
}
else
{
tpool.submit(&waiter, [&, i, it] {
executor.run(taskflow, [&, i, it] {
try
{
results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash);
@ -1059,8 +1058,7 @@ namespace cryptonote
});
}
}
if (!waiter.wait())
return false;
executor.wait_for_all();
std::vector<tx_verification_batch_info> tx_info;
tx_info.reserve(tx_blobs.size());

View File

@ -31,13 +31,13 @@
#include "misc_log_ex.h"
#include "misc_language.h"
#include "common/perf_timer.h"
#include "common/threadpool.h"
#include "common/util.h"
#include "rctSigs.h"
#include "bulletproofs.h"
#include "bulletproofs_plus.h"
#include "cryptonote_basic/cryptonote_format_utils.h"
#include "cryptonote_config.h"
#include <taskflow/taskflow/taskflow.hpp>
using namespace crypto;
using namespace std;
@ -1333,14 +1333,13 @@ namespace rct {
try
{
if (semantics) {
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
std::deque<bool> results(rv.outPk.size(), false);
DP("range proofs verified?");
for (size_t i = 0; i < rv.outPk.size(); i++)
tpool.submit(&waiter, [&, i] { results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); });
if (!waiter.wait())
return false;
executor.run(taskflow, [&, i] { results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); });
executor.wait_for_all();
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
@ -1383,8 +1382,8 @@ namespace rct {
{
PERF_TIMER(verRctSemanticsSimple);
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
std::deque<bool> results;
std::vector<const Bulletproof*> bp_proofs;
std::vector<const BulletproofPlus*> bpp_proofs;
@ -1468,27 +1467,24 @@ namespace rct {
else
{
for (size_t i = 0; i < rv.p.rangeSigs.size(); i++)
tpool.submit(&waiter, [&, i, offset] { results[i+offset] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); });
executor.run(taskflow, [&, i, offset] { results[i+offset] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); });
offset += rv.p.rangeSigs.size();
}
}
if (!bpp_proofs.empty() && !verBulletproofPlus(bpp_proofs))
{
LOG_PRINT_L1("Aggregate range proof verified failed");
if (!waiter.wait())
return false;
executor.wait_for_all();
return false;
}
if (!bp_proofs.empty() && !verBulletproof(bp_proofs))
{
LOG_PRINT_L1("Aggregate range proof verified failed");
if (!waiter.wait())
return false;
executor.wait_for_all();
return false;
}
if (!waiter.wait())
return false;
executor.wait_for_all();
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
LOG_PRINT_L1("Range proof verified failed for proof " << i);
@ -1536,8 +1532,8 @@ namespace rct {
const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size());
std::deque<bool> results(threads);
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
const keyV &pseudoOuts = bulletproof || bulletproof_plus ? rv.p.pseudoOuts : rv.pseudoOuts;
@ -1546,15 +1542,14 @@ namespace rct {
results.clear();
results.resize(rv.mixRing.size());
for (size_t i = 0 ; i < rv.mixRing.size() ; i++) {
tpool.submit(&waiter, [&, i] {
executor.run(taskflow, [&, i] {
if (is_rct_clsag(rv.type))
results[i] = verRctCLSAGSimple(message, rv.p.CLSAGs[i], rv.mixRing[i], pseudoOuts[i]);
else
results[i] = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], pseudoOuts[i]);
});
}
if (!waiter.wait())
return false;
executor.wait_for_all();
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {

View File

@ -46,6 +46,7 @@
#include <boost/range/adaptor/transformed.hpp>
#include <boost/preprocessor/stringize.hpp>
#include <openssl/evp.h>
#include <taskflow/taskflow/taskflow.hpp>
#include "include_base_utils.h"
using namespace epee;
@ -65,7 +66,6 @@ using namespace epee;
#include "multisig/multisig_kex_msg.h"
#include "multisig/multisig_tx_builder_ringct.h"
#include "common/command_line.h"
#include "common/threadpool.h"
#include "int-util.h"
#include "profile_tools.h"
#include "crypto/crypto.h"
@ -3197,8 +3197,8 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch");
THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(start_height), error::out_of_hashchain_bounds_error);
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
size_t num_txes = 0;
std::vector<tx_cache_data> tx_cache_data;
@ -3230,16 +3230,16 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
continue;
}
if (m_refresh_type != RefreshNoCoinbase)
tpool.submit(&waiter, [&, i, txidx](){ cache_tx_data(parsed_blocks[i].block.miner_tx, get_transaction_hash(parsed_blocks[i].block.miner_tx), tx_cache_data[txidx]); });
executor.run(taskflow, [&, i, txidx](){ cache_tx_data(parsed_blocks[i].block.miner_tx, get_transaction_hash(parsed_blocks[i].block.miner_tx), tx_cache_data[txidx]); });
++txidx;
for (size_t idx = 0; idx < parsed_blocks[i].txes.size(); ++idx)
{
tpool.submit(&waiter, [&, i, idx, txidx](){ cache_tx_data(parsed_blocks[i].txes[idx], parsed_blocks[i].block.tx_hashes[idx], tx_cache_data[txidx]); });
executor.run(taskflow, [&, i, idx, txidx](){ cache_tx_data(parsed_blocks[i].txes[idx], parsed_blocks[i].block.tx_hashes[idx], tx_cache_data[txidx]); });
++txidx;
}
}
THROW_WALLET_EXCEPTION_IF(txidx != num_txes, error::wallet_internal_error, "txidx does not match tx_cache_data size");
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
hw::device &hwdev = m_account.get_device();
hw::reset_mode rst(hwdev);
@ -3259,15 +3259,15 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
{
if (tx_cache_data[i].empty())
continue;
tpool.submit(&waiter, [&gender, &tx_cache_data, i]() {
executor.run(taskflow, [&gender, &tx_cache_data, i]() {
auto &slot = tx_cache_data[i];
for (auto &iod: slot.primary)
gender(iod);
for (auto &iod: slot.additional)
gender(iod);
}, true);
});
}
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
auto geniod = [&](const cryptonote::transaction &tx, size_t n_vouts, size_t txidx) {
for (size_t k = 0; k < n_vouts; ++k)
@ -3317,7 +3317,7 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
if (parsed_blocks[i].block.major_version >= hf_version_view_tags)
geniods.push_back(geniod_params{ tx, n_vouts, txidx });
else
tpool.submit(&waiter, [&, n_vouts, txidx](){ geniod(tx, n_vouts, txidx); }, true);
executor.run(taskflow, [&, n_vouts, txidx](){ geniod(tx, n_vouts, txidx); });
}
++txidx;
for (size_t j = 0; j < parsed_blocks[i].txes.size(); ++j)
@ -3326,7 +3326,7 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
if (parsed_blocks[i].block.major_version >= hf_version_view_tags)
geniods.push_back(geniod_params{ parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx });
else
tpool.submit(&waiter, [&, i, j, txidx](){ geniod(parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx); }, true);
executor.run(taskflow, [&, i, j, txidx](){ geniod(parsed_blocks[i].txes[j], parsed_blocks[i].txes[j].vout.size(), txidx); });
++txidx;
}
}
@ -3345,19 +3345,19 @@ void wallet2::process_parsed_blocks(const uint64_t start_height, const std::vect
{
size_t batch_end = std::min(batch_start + GENIOD_BATCH_SIZE, geniods.size());
THROW_WALLET_EXCEPTION_IF(batch_end < batch_start, error::wallet_internal_error, "Thread batch end overflow");
tpool.submit(&waiter, [&geniods, &geniod, batch_start, batch_end]() {
executor.run(taskflow, [&geniods, &geniod, batch_start, batch_end]() {
for (size_t i = batch_start; i < batch_end; ++i)
{
const geniod_params &gp = geniods[i];
geniod(gp.tx, gp.n_outs, gp.txidx);
}
}, true);
});
num_batch_txes += batch_end - batch_start;
batch_start = batch_end;
}
THROW_WALLET_EXCEPTION_IF(num_batch_txes != geniods.size(), error::wallet_internal_error, "txes batched for thread pool did not reach expected value");
}
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
hwdev.set_mode(hw::device::NONE);
@ -3460,15 +3460,15 @@ void wallet2::pull_and_parse_next_blocks(bool first, bool try_incremental, uint6
pull_blocks(first, try_incremental, start_height, blocks_start_height, short_chain_history, blocks, o_indices, current_height, process_pool_txs);
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices");
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
parsed_blocks.resize(blocks.size());
for (size_t i = 0; i < blocks.size(); ++i)
{
tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block),
std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error)), true);
executor.run(taskflow, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block),
std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error)));
}
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
for (size_t i = 0; i < blocks.size(); ++i)
{
if (parsed_blocks[i].error)
@ -3502,16 +3502,16 @@ void wallet2::pull_and_parse_next_blocks(bool first, bool try_incremental, uint6
parsed_blocks[i].txes.resize(blocks[i].txs.size());
for (size_t j = 0; j < blocks[i].txs.size(); ++j)
{
tpool.submit(&waiter, [&, i, j](){
executor.run(taskflow, [&, i, j](){
if (!parse_and_validate_tx_base_from_blob(blocks[i].txs[j].blob, parsed_blocks[i].txes[j]))
{
boost::unique_lock<boost::mutex> lock(error_lock);
error = true;
}
}, true);
});
}
}
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
last = !blocks.empty() && cryptonote::get_block_height(parsed_blocks.back().block) + 1 == current_height;
}
catch(...)
@ -4019,8 +4019,8 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo
size_t try_count = 0;
crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash;
std::list<crypto::hash> short_chain_history;
tools::threadpool& tpool = tools::threadpool::getInstanceForCompute();
tools::threadpool::waiter waiter(tpool);
tf::Executor executor(std::thread::hardware_concurrency());
tf::Taskflow taskflow;
uint64_t blocks_start_height;
std::vector<cryptonote::block_complete_entry> blocks;
std::vector<parsed_block> parsed_blocks;
@ -4092,7 +4092,7 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo
break;
}
if (!last)
tpool.submit(&waiter, [&]{pull_and_parse_next_blocks(first, try_incremental, start_height, next_blocks_start_height, short_chain_history, blocks, parsed_blocks, next_blocks, next_parsed_blocks, process_pool_txs, last, error, exception);});
executor.run(taskflow, [&]{pull_and_parse_next_blocks(first, try_incremental, start_height, next_blocks_start_height, short_chain_history, blocks, parsed_blocks, next_blocks, next_parsed_blocks, process_pool_txs, last, error, exception);});
if (!first)
{
@ -4131,7 +4131,7 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo
}
blocks_fetched += added_blocks;
}
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
// handle error from async fetching thread
if (error)
@ -4165,28 +4165,28 @@ void wallet2::refresh(bool trusted_daemon, uint64_t start_height, uint64_t & blo
catch (const tools::error::password_needed&)
{
blocks_fetched += added_blocks;
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
throw;
}
catch (const error::deprecated_rpc_access&)
{
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
throw;
}
catch (const error::reorg_depth_error&)
{
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
throw;
}
catch (const error::incorrect_fork_version&)
{
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
throw;
}
catch (const std::exception&)
{
blocks_fetched += added_blocks;
THROW_WALLET_EXCEPTION_IF(!waiter.wait(), error::wallet_internal_error, "Exception in thread pool");
executor.wait_for_all();
if(try_count < 3)
{
LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")...");

View File

@ -46,7 +46,6 @@
#include "include_base_utils.h"
#include "chaingen_serialization.h"
#include "common/command_line.h"
#include "common/threadpool.h"
#include "cryptonote_basic/account_boost_serialization.h"
#include "cryptonote_basic/cryptonote_basic.h"
@ -808,7 +807,6 @@ inline bool do_replay_events_get_core(std::vector<test_event_entry>& events, cry
t_test_class validator;
bool ret = replay_events_through_core<t_test_class>(c, events, validator);
tools::threadpool::getInstanceForCompute().recycle();
// c.deinit();
return ret;
}

View File

@ -84,7 +84,6 @@ set(unit_tests_sources
test_tx_utils.cpp
test_peerlist.cpp
test_protocol_pack.cpp
threadpool.cpp
tx_proof.cpp
hardfork.cpp
unbound.cpp

View File

@ -1,147 +0,0 @@
// Copyright (c) 2018-2024, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <atomic>
#include "gtest/gtest.h"
#include "misc_language.h"
#include "common/threadpool.h"
TEST(threadpool, wait_nothing)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests());
tools::threadpool::waiter waiter(*tpool);;
waiter.wait();
}
TEST(threadpool, wait_waits)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests());
tools::threadpool::waiter waiter(*tpool);
std::atomic<bool> b(false);
tpool->submit(&waiter, [&b](){ epee::misc_utils::sleep_no_w(1000); b = true; });
ASSERT_FALSE(b);
waiter.wait();
ASSERT_TRUE(b);
}
TEST(threadpool, one_thread)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests(1));
tools::threadpool::waiter waiter(*tpool);
std::atomic<unsigned int> counter(0);
for (size_t n = 0; n < 4096; ++n)
{
tpool->submit(&waiter, [&counter](){++counter;});
}
waiter.wait();
ASSERT_EQ(counter, 4096);
}
TEST(threadpool, many_threads)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests(256));
tools::threadpool::waiter waiter(*tpool);
std::atomic<unsigned int> counter(0);
for (size_t n = 0; n < 4096; ++n)
{
tpool->submit(&waiter, [&counter](){++counter;});
}
waiter.wait();
ASSERT_EQ(counter, 4096);
}
static uint64_t fibonacci(std::shared_ptr<tools::threadpool> tpool, uint64_t n)
{
if (n <= 1)
return n;
uint64_t f1, f2;
tools::threadpool::waiter waiter(*tpool);
tpool->submit(&waiter, [&tpool, &f1, n](){ f1 = fibonacci(tpool, n-1); });
tpool->submit(&waiter, [&tpool, &f2, n](){ f2 = fibonacci(tpool, n-2); });
waiter.wait();
return f1 + f2;
}
TEST(threadpool, reentrency)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests(4));
tools::threadpool::waiter waiter(*tpool);
uint64_t f = fibonacci(tpool, 13);
waiter.wait();
ASSERT_EQ(f, 233);
}
TEST(threadpool, reentrancy)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests(4));
tools::threadpool::waiter waiter(*tpool);
uint64_t f = fibonacci(tpool, 13);
waiter.wait();
ASSERT_EQ(f, 233);
}
TEST(threadpool, leaf_throws)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests());
tools::threadpool::waiter waiter(*tpool);
bool thrown = false, executed = false;
tpool->submit(&waiter, [&](){
try { tpool->submit(&waiter, [&](){ executed = true; }); }
catch(const std::exception &e) { thrown = true; }
}, true);
waiter.wait();
ASSERT_TRUE(thrown);
ASSERT_FALSE(executed);
}
TEST(threadpool, leaf_reentrancy)
{
std::shared_ptr<tools::threadpool> tpool(tools::threadpool::getNewForUnitTests(4));
tools::threadpool::waiter waiter(*tpool);
std::atomic<int> counter(0);
for (int i = 0; i < 1000; ++i)
{
tpool->submit(&waiter, [&](){
tools::threadpool::waiter waiter(*tpool);
for (int j = 0; j < 500; ++j)
{
tpool->submit(&waiter, [&](){ ++counter; }, true);
}
waiter.wait();
});
}
waiter.wait();
ASSERT_EQ(counter, 500000);
}