#include "stdafx.h" #include "JobManager.h" namespace Jobs { JobManager::JobManager(const uint32_t Workers, uint32_t DefaultWorkerId) : _IsActive(true), _OwnerThreadId(std::this_thread::get_id()) { for (uint32_t i = 0; i < Workers; i++) { _Workers.EmplaceBack(new JobWorker(this, DefaultWorkerId)); _Threads.Emplace([this, i]() { _Workers[i]->Run(); }).Start(); } } JobManager::~JobManager() { this->HaltAllJobs(); } Job* JobManager::ScheduleJob(std::function Task, std::function OnFinished) { if (!this->_IsActive) return nullptr; auto NewJob = new Job(Task, OnFinished, true); _JobPool.Enqueue(NewJob); if (!this->_IsActive) { delete NewJob; return nullptr; } return NewJob; } void JobManager::ScheduleJobNoWait(std::function Task, std::function OnFinished) { if (!this->_IsActive) return; auto NewJob = new Job(Task, OnFinished, false); _JobPool.Enqueue(NewJob); if (!this->_IsActive) { delete NewJob; } } void JobManager::WaitForSingleJob(Job* Job) { while (!Job->IsFinished()) std::this_thread::yield(); delete Job; } void JobManager::WaitForAllJobs(Job** Jobs, const uint32_t Count) { bool AllFinished = false; while (!AllFinished) { AllFinished = true; for (uint32_t i = 0; i < Count; i++) { if (Jobs[i] != nullptr && !Jobs[i]->IsFinished()) { AllFinished = false; break; } } if (AllFinished) break; std::this_thread::yield(); } for (uint32_t i = 0; i < Count; i++) delete Jobs[i]; } void JobManager::WaitForAllJobs(List& Jobs) { this->WaitForAllJobs(&Jobs[0], Jobs.Count()); } void JobManager::WaitAll(std::function ProgressCallback, std::function CancelCallback) { // Must be called by owner thread if (std::this_thread::get_id() != _OwnerThreadId) throw std::exception("Must be called by thread that owns JobManager!"); // We can store this here, just for dependency tracking... uint32_t StorageLeft = this->_JobPool.Count(); std::atomic LastProgress = 0; // Flags brought to stack for easy read information... auto isPoolEmpty = this->_JobPool.IsEmpty(); auto isWorkersIdle = this->AreWorkersIdle(); while (!isPoolEmpty || !isWorkersIdle) { std::this_thread::yield(); isPoolEmpty = this->_JobPool.IsEmpty(); isWorkersIdle = this->AreWorkersIdle(); if (ProgressCallback != nullptr) { auto TempStorage = this->_JobPool.Count(); if (TempStorage > StorageLeft) { // We got more jobs in the process of processing other jobs // so the progress is shifted StorageLeft = TempStorage; if (LastProgress != 0) { LastProgress = 0; ProgressCallback(0); } } else { // We can convert our progress range here, then abs the value int32_t ProgressRange = (TempStorage - StorageLeft); uint32_t Progress = std::abs((int32_t)(((float)ProgressRange / (float)StorageLeft) * 100.0f)); if (LastProgress != Progress) { LastProgress = Progress; ProgressCallback(Progress); } } } // Break out of the loop if the setup was canceled if (!this->_IsActive) break; // Check if a cancel callback was provided, and if cancel is confirmed if (CancelCallback != nullptr && CancelCallback()) { this->Cancel(); break; } } } void JobManager::HaltAllJobs() { // Must be called by owner thread if (std::this_thread::get_id() != _OwnerThreadId) throw std::exception("Must be called by thread that owns JobManager!"); // Prevents the scheduler from scheduling more jobs this->_IsActive = false; // Prevent execution of threads... for (auto& Worker : _Workers) Worker->Stop(); // Wait for the last jobs to comlete, then destroy the workers for (auto& Thread : _Threads) Thread.Join(); for (auto& Worker : _Workers) delete Worker; // Iterate, and delete all jobs being non-waited, and mark all jobs awaited as finished. Job* Item = nullptr; while (_JobPool.Dequeue(Item)) { if (!Item->IsAwaiter()) { // It's not being awaited on so we must free it here... delete Item; } else { // Forceable mark as finished, so the awaiter elsewhere can stop and clean up. Item->IsFinished(); } } // Make sure the buffers are clear for subsequent calls _Workers.Clear(); _Threads.Clear(); } void JobManager::Cancel() { this->_IsActive = false; for (auto& Worker : _Workers) Worker->Stop(); } bool JobManager::AreWorkersIdle() { for (auto& Worker : _Workers) { if (Worker->IsExecuting()) return false; } return true; } Job* JobWorker::GetJob() { Job* Item = nullptr; auto Result = _Manager->_JobPool.Dequeue(Item); return (Result) ? Item : nullptr; } }