worker.cpp 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #include "worker.hpp"
  2. Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas) {
  3. fpgaVector = fpgas;
  4. }
  5. Worker::~Worker() {
  6. hasJobList.notify_all();
  7. }
  8. void Worker::startAsync() {
  9. result = std::async(std::launch::async, &Worker::threadMain, this);
  10. }
  11. void Worker::startSync() {
  12. threadMain();
  13. }
  14. int Worker::assignJobList(std::shared_ptr<JobList> &jobList) {
  15. std::lock_guard<std::mutex> lk(currentJobList_m);
  16. if(currentJobList != NULL)
  17. return -1;
  18. currentJobList = jobList;
  19. hasJobList.notify_all();
  20. return 0;
  21. }
  22. int Worker::threadMain() {
  23. if(currentJobList == NULL)
  24. return -1;
  25. while(true) {
  26. size_t remainingJobs = currentJobList->getJobCount();
  27. Clock::time_point now = Clock::now();
  28. commFPGA *fpga;
  29. for(size_t i=0; i<currentJobList->getJobCount(); i++) {
  30. std::shared_ptr<Job> &job = currentJobList->getJob(i);
  31. switch(job->getState()) {
  32. case JobState::initialized:
  33. break;
  34. case JobState::ready:
  35. sendJob(job);
  36. break;
  37. case JobState::sent:
  38. if(std::chrono::duration_cast<microseconds>(now - job->getSent()).count() > 1000) {
  39. fpga = (commFPGA*)job->getAssignedFPGA();
  40. if(fpga != NULL) {
  41. fpga->unassignJob(job);
  42. }
  43. if(job->getSendCounter() < 5) {
  44. job->setState(JobState::ready);
  45. sendJob(job);
  46. } else {
  47. job->setState(JobState::failed);
  48. job->setReceived(false);
  49. }
  50. }
  51. break;
  52. case JobState::receiving:
  53. break;
  54. case JobState::finished:
  55. remainingJobs--;
  56. break;
  57. case JobState::failed:
  58. remainingJobs--;
  59. break;
  60. }
  61. }
  62. if(remainingJobs <= 0) {
  63. break;
  64. }
  65. currentJobList->waitOne(microseconds(1000));
  66. }
  67. return 0;
  68. }
  69. void Worker::sendJob(std::shared_ptr<Job> &job) {
  70. commFPGA *fpga = findAvailableFPGA();
  71. if(fpga == NULL) {
  72. return;
  73. }
  74. if(fpga->assignJob(job) >= 0) {
  75. job->setSent();
  76. }
  77. }
  78. commFPGA* Worker::findAvailableFPGA() {
  79. uint_least32_t minCnt = JOB_COUNT-1;
  80. commFPGA *fpga = NULL;
  81. for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
  82. uint_least32_t cnt = it->get()->jobCount();
  83. if(cnt < minCnt) {
  84. minCnt = cnt;
  85. fpga = it->get();
  86. }
  87. }
  88. return fpga;
  89. }