uvgVPCCenc 1.2.0
uvgVPCCenc is an open-source real-time V-PCC encoder library written in C++ from scratch.
Loading...
Searching...
No Matches
threadqueue.hpp
Go to the documentation of this file.
1/*****************************************************************************
2 * This file is part of uvgVPCCenc V-PCC encoder.
3 *
4 * Copyright (c) 2024-present, Tampere University, ITU/ISO/IEC, project contributors
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modification,
8 * are permitted (subject to the limitations in the disclaimer below) provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice, this
11 * list of conditions and the following disclaimer.
12 *
13 * * Redistributions in binary form must reproduce the above copyright notice, this
14 * list of conditions and the following disclaimer in the documentation and/or
15 * other materials provided with the distribution.
16 *
17 * * Neither the name of Tampere University, ITU/ISO/IEC nor the names of its
18 * contributors may be used to endorse or promote products derived from
19 * this software without specific prior written permission.
20 *
21 * NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY THIS LICENSE.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
24 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
25 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
26 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
27 * INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION HOWEVER CAUSED AND ON
29 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 * INCLUDING NEGLIGENCE OR OTHERWISE ARISING IN ANY WAY OUT OF THE USE OF THIS
32 ****************************************************************************/
33
35
36#pragma once
37
38#include <atomic>
39#include <cassert>
40#include <condition_variable>
41#include <deque>
42#include <functional>
43#include <memory>
44#include <mutex>
45#include <thread>
46#include <vector>
47
48namespace uvgutils {
49
76
77class ThreadQueue;
78
79class Job : public std::enable_shared_from_this<Job> {
80 public:
81 using JobFunction = std::function<void()>;
82 Job(std::string name, std::size_t priority, JobFunction func)
83 : name_(name),
84 func_(func),
88 completed_(false) {}
89 // Variadic template constructor
90 template <typename Func, typename... Args>
91 Job(std::string name, std::size_t priority, Func&& func, Args&&... args)
92 : name_(name),
93 func_(std::bind(std::forward<Func>(func), std::forward<Args>(args)...)),
97 completed_(false) {}
98 void execute() const;
99 void addDependency(const std::shared_ptr<Job>& dependency);
100 bool isReady() const;
101 void wait();
102 void complete();
103 std::string getName() const { return name_; }
105 void setState(threadqueue_job_state state) { state_ = state; }
106
107 mutable std::mutex mtx_;
108 std::vector<std::shared_ptr<Job>> reverseDependencies_;
109 std::string name_;
112 std::atomic<int> dependencies_;
113 std::atomic<std::size_t> priority;
114
115 private:
116 std::condition_variable cv_;
117 std::atomic<bool> completed_;
118};
119
121 public:
122 ThreadQueue() : stop_(false) {};
123 void initThreadQueue(int numThreads);
124 ~ThreadQueue();
125
126 void submitJob(const std::shared_ptr<Job>& job);
127 void pushJob(const std::shared_ptr<Job>& job);
128 void stop();
129 static void waitForJob(const std::shared_ptr<Job>& job);
130
131 private:
132 void workerThread();
133 std::mutex mtx_;
134 std::condition_variable jobAvailable_;
135 std::condition_variable jobDone_;
136 std::vector<std::thread> threads_;
137 std::array<std::deque<std::shared_ptr<Job>>, 6> jobs_;
138 std::atomic<bool> stop_;
139};
140
142
143} // namespace uvgutils
Definition threadqueue.hpp:79
Job(std::string name, std::size_t priority, Func &&func, Args &&... args)
Definition threadqueue.hpp:91
void complete()
Definition threadqueue.cpp:104
void setState(threadqueue_job_state state)
Definition threadqueue.hpp:105
std::vector< std::shared_ptr< Job > > reverseDependencies_
Definition threadqueue.hpp:108
void wait()
Definition threadqueue.cpp:96
bool isReady() const
Definition threadqueue.cpp:85
std::atomic< std::size_t > priority
Definition threadqueue.hpp:113
Job(std::string name, std::size_t priority, JobFunction func)
Definition threadqueue.hpp:82
std::mutex mtx_
Definition threadqueue.hpp:107
void addDependency(const std::shared_ptr< Job > &dependency)
Definition threadqueue.cpp:63
std::function< void()> JobFunction
Definition threadqueue.hpp:81
threadqueue_job_state state_
Definition threadqueue.hpp:111
void execute() const
Definition threadqueue.cpp:87
JobFunction func_
Definition threadqueue.hpp:110
std::string name_
Definition threadqueue.hpp:109
threadqueue_job_state getState() const
Definition threadqueue.hpp:104
std::string getName() const
Definition threadqueue.hpp:103
std::atomic< int > dependencies_
Definition threadqueue.hpp:112
Definition threadqueue.hpp:120
void stop()
Definition threadqueue.cpp:142
void pushJob(const std::shared_ptr< Job > &job)
Definition threadqueue.cpp:119
void submitJob(const std::shared_ptr< Job > &job)
Definition threadqueue.cpp:127
void initThreadQueue(int numThreads)
Definition threadqueue.cpp:111
ThreadQueue()
Definition threadqueue.hpp:122
~ThreadQueue()
Definition threadqueue.cpp:117
static void waitForJob(const std::shared_ptr< Job > &job)
Definition threadqueue.cpp:155
Definition jobManagement.hpp:72
Definition jobManagement.hpp:50
threadqueue_job_state
Definition threadqueue.hpp:50
@ THREADQUEUE_JOB_STATE_READY
Job is ready to run.
Definition threadqueue.hpp:64
@ THREADQUEUE_JOB_STATE_RUNNING
Job is running.
Definition threadqueue.hpp:69
@ THREADQUEUE_JOB_STATE_DONE
Job is completed.
Definition threadqueue.hpp:74
@ THREADQUEUE_JOB_STATE_PAUSED
Job has been submitted, but is not allowed to run yet.
Definition threadqueue.hpp:54
@ THREADQUEUE_JOB_STATE_WAITING
Job is waiting for dependencies.
Definition threadqueue.hpp:59
std::string jobStateToStr(threadqueue_job_state s)
Definition threadqueue.cpp:51