2022-05-21 19:58:09 +02:00

223 lines
4.8 KiB
C++

#include "stdafx.h"
#include "JobManager.h"
namespace Jobs
{
JobManager::JobManager(const uint32_t Workers, uint32_t DefaultWorkerId)
: _IsActive(true), _OwnerThreadId(std::this_thread::get_id())
{
for (uint32_t i = 0; i < Workers; i++)
{
_Workers.EmplaceBack(new JobWorker(this, DefaultWorkerId));
_Threads.Emplace([this, i]()
{
_Workers[i]->Run();
}).Start();
}
}
JobManager::~JobManager()
{
this->HaltAllJobs();
}
Job* JobManager::ScheduleJob(std::function<void()> Task, std::function<void()> OnFinished)
{
if (!this->_IsActive)
return nullptr;
auto NewJob = new Job(Task, OnFinished, true);
_JobPool.Enqueue(NewJob);
if (!this->_IsActive)
{
delete NewJob;
return nullptr;
}
return NewJob;
}
void JobManager::ScheduleJobNoWait(std::function<void()> Task, std::function<void()> OnFinished)
{
if (!this->_IsActive)
return;
auto NewJob = new Job(Task, OnFinished, false);
_JobPool.Enqueue(NewJob);
if (!this->_IsActive)
{
delete NewJob;
}
}
void JobManager::WaitForSingleJob(Job* Job)
{
while (!Job->IsFinished())
std::this_thread::yield();
delete Job;
}
void JobManager::WaitForAllJobs(Job** Jobs, const uint32_t Count)
{
bool AllFinished = false;
while (!AllFinished)
{
AllFinished = true;
for (uint32_t i = 0; i < Count; i++)
{
if (Jobs[i] != nullptr && !Jobs[i]->IsFinished())
{
AllFinished = false;
break;
}
}
if (AllFinished)
break;
std::this_thread::yield();
}
for (uint32_t i = 0; i < Count; i++)
delete Jobs[i];
}
void JobManager::WaitForAllJobs(List<Job*>& Jobs)
{
this->WaitForAllJobs(&Jobs[0], Jobs.Count());
}
void JobManager::WaitAll(std::function<void(uint32_t)> ProgressCallback, std::function<bool()> CancelCallback)
{
// Must be called by owner thread
if (std::this_thread::get_id() != _OwnerThreadId)
throw std::exception("Must be called by thread that owns JobManager!");
// We can store this here, just for dependency tracking...
uint32_t StorageLeft = this->_JobPool.Count();
std::atomic<uint32_t> LastProgress = 0;
// Flags brought to stack for easy read information...
auto isPoolEmpty = this->_JobPool.IsEmpty();
auto isWorkersIdle = this->AreWorkersIdle();
while (!isPoolEmpty || !isWorkersIdle)
{
std::this_thread::yield();
isPoolEmpty = this->_JobPool.IsEmpty();
isWorkersIdle = this->AreWorkersIdle();
if (ProgressCallback != nullptr)
{
auto TempStorage = this->_JobPool.Count();
if (TempStorage > StorageLeft)
{
// We got more jobs in the process of processing other jobs
// so the progress is shifted
StorageLeft = TempStorage;
if (LastProgress != 0)
{
LastProgress = 0;
ProgressCallback(0);
}
}
else
{
// We can convert our progress range here, then abs the value
int32_t ProgressRange = (TempStorage - StorageLeft);
uint32_t Progress = std::abs((int32_t)(((float)ProgressRange / (float)StorageLeft) * 100.0f));
if (LastProgress != Progress)
{
LastProgress = Progress;
ProgressCallback(Progress);
}
}
}
// Break out of the loop if the setup was canceled
if (!this->_IsActive)
break;
// Check if a cancel callback was provided, and if cancel is confirmed
if (CancelCallback != nullptr && CancelCallback())
{
this->Cancel();
break;
}
}
}
void JobManager::HaltAllJobs()
{
// Must be called by owner thread
if (std::this_thread::get_id() != _OwnerThreadId)
throw std::exception("Must be called by thread that owns JobManager!");
// Prevents the scheduler from scheduling more jobs
this->_IsActive = false;
// Prevent execution of threads...
for (auto& Worker : _Workers)
Worker->Stop();
// Wait for the last jobs to comlete, then destroy the workers
for (auto& Thread : _Threads)
Thread.Join();
for (auto& Worker : _Workers)
delete Worker;
// Iterate, and delete all jobs being non-waited, and mark all jobs awaited as finished.
Job* Item = nullptr;
while (_JobPool.Dequeue(Item))
{
if (!Item->IsAwaiter())
{
// It's not being awaited on so we must free it here...
delete Item;
}
else
{
// Forceable mark as finished, so the awaiter elsewhere can stop and clean up.
Item->IsFinished();
}
}
// Make sure the buffers are clear for subsequent calls
_Workers.Clear();
_Threads.Clear();
}
void JobManager::Cancel()
{
this->_IsActive = false;
for (auto& Worker : _Workers)
Worker->Stop();
}
bool JobManager::AreWorkersIdle()
{
for (auto& Worker : _Workers)
{
if (Worker->IsExecuting())
return false;
}
return true;
}
Job* JobWorker::GetJob()
{
Job* Item = nullptr;
auto Result = _Manager->_JobPool.Dequeue(Item);
return (Result) ? Item : nullptr;
}
}