summaryrefslogtreecommitdiffstats
path: root/src/core/threading.cc
blob: a1ae30534086d2c685e1c55356c017efa487044b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "core/pch.hh"

#include "core/threading.hh"

#include "core/io/cmdline.hh"

#include "core/math/constexpr.hh"

constexpr static std::string_view DEFAULT_POOL_SIZE_ARG = "4";

static BS::light_thread_pool* thread_pool;
static std::deque<Task*> task_deque;

static void task_process(Task* task)
{
    task->set_status(task_status::PROCESSING);
    task->process();

    if(task->get_status() == task_status::PROCESSING) {
        // If the task status is still PROCESSING
        // it can be deduced it hasn't been cancelled
        task->set_status(task_status::COMPLETED);
    }
}

task_status Task::get_status(void) const
{
    return m_status;
}

void Task::set_status(task_status status)
{
    m_status = status;
}

void threading::init(void)
{
    auto argument = cmdline::get("threads", DEFAULT_POOL_SIZE_ARG);
    auto num_concurrent_threads = std::thread::hardware_concurrency();
    unsigned int thread_pool_size;

    if(num_concurrent_threads && 0 == argument.compare("max")) {
        // Use the maximum available number of concurrent
        // hardware threads provided by the implementation
        thread_pool_size = num_concurrent_threads;
    }
    else {
        if(num_concurrent_threads) {
            auto result = std::from_chars(argument.data(), argument.data() + argument.size(), thread_pool_size);

            if(result.ec == std::errc()) {
                thread_pool_size = glm::clamp<unsigned int>(thread_pool_size, 1U, num_concurrent_threads);
            }
            else {
                thread_pool_size = 4U;
            }
        }
        else {
            auto result = std::from_chars(argument.data(), argument.data() + argument.size(), thread_pool_size);

            if(result.ec == std::errc()) {
                thread_pool_size = glm::max<unsigned int>(thread_pool_size, 1U);
            }
            else {
                thread_pool_size = 4U;
            }
        }
    }

    spdlog::info("threading: using {} threads for pooling tasks", thread_pool_size);

    thread_pool = new BS::light_thread_pool(thread_pool_size);

    task_deque.clear();
}

void threading::shutdown(void)
{
    for(auto task : task_deque) {
        auto status = task->get_status();
        if((status != task_status::CANCELLED) || (status != task_status::COMPLETED)) {
            task->set_status(task_status::CANCELLED);
        }
    }

    thread_pool->purge();
    thread_pool->wait();

    for(auto task : task_deque)
        delete task;
    task_deque.clear();

    delete thread_pool;
}

void threading::update(void)
{
    auto task_iter = task_deque.cbegin();

    while(task_iter != task_deque.cend()) {
        auto task_ptr = *task_iter;
        auto status = task_ptr->get_status();

        if(status == task_status::CANCELLED) {
            delete task_ptr;
            task_iter = task_deque.erase(task_iter);
            continue;
        }

        if(status == task_status::COMPLETED) {
            task_ptr->finalize();
            delete task_ptr;
            task_iter = task_deque.erase(task_iter);
            continue;
        }

        task_iter = std::next(task_iter);
    }
}

void threading::detail::submit_new(Task* task)
{
    task->set_status(task_status::ENQUEUED);

    static_cast<void>(thread_pool->submit_task(std::bind(&task_process, task)));

    task_deque.push_back(task);
}