worker.cpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #include "worker.hpp"
  2. //#define DEBUG_WORKER
  3. Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas, Module mod, size_t numberOfJobs) :
  4. jobList(std::piecewise_construct, std::make_tuple(), std::make_tuple(new JobList(mod, numberOfJobs))) {
  5. fpgaVector = fpgas;
  6. }
  7. Worker::~Worker() {
  8. running = false;
  9. }
  10. void Worker::startAsync() {
  11. result = std::async(std::launch::async, &Worker::threadMain, this);
  12. }
  13. void Worker::startSync() {
  14. threadMain();
  15. }
  16. JobListContainer Worker::getJobList() {
  17. return JobListContainer(jobList);
  18. }
  19. int Worker::threadMain() {
  20. pthread_setname_np(pthread_self(), "mlfpga worker");
  21. {
  22. size_t lastI = 0;
  23. auto currentJobList = getJobList();
  24. int rc;
  25. while(running) {
  26. size_t remainingJobs = currentJobList->getJobCount();
  27. Clock::time_point now = Clock::now();
  28. #ifdef DEBUG_WORKER
  29. Clock::time_point then;
  30. size_t sentBytes = 0;
  31. #endif
  32. commFPGA *fpga;
  33. for(size_t i=0; i<currentJobList->getJobCount(); i++) {
  34. {
  35. size_t currentI = (lastI + i) % currentJobList->getJobCount();
  36. auto job = currentJobList->getJob(currentI);
  37. switch(job->getState()) {
  38. case JobState::initialized:
  39. throw("worker can't send job that is not ready");
  40. break;
  41. case JobState::ready:
  42. fpga = findAvailableFPGA();
  43. if(fpga == NULL) {
  44. lastI = currentI;
  45. goto fullQueue;
  46. break;
  47. }
  48. rc = fpga->assignJob(job);
  49. //printf("rc: %4d i: %4lu\n", rc, i);
  50. if(rc >= 0) {
  51. job->setSent();
  52. #ifdef DEBUG_WORKER
  53. sentBytes += job->getByteCount();
  54. printf("job %08X: \x1b[32massigned\x1b[0m no.: %3lu\n", job->getJobId(), currentI);
  55. #endif
  56. } else if(rc == -4) {
  57. lastI = currentI;
  58. goto fullQueue;
  59. }
  60. break;
  61. case JobState::sent:
  62. if(now - job->getSent() > jobTimeout) {
  63. fpga = (commFPGA*)job->getAssignedFPGA();
  64. if(fpga != NULL) {
  65. if(fpga->unassignJob(job) < 0)
  66. break;
  67. #ifdef DEBUG_WORKER
  68. printf("job %08X: \x1b[31munassigned\x1b[0m no.: %3lu\n", job->getJobId(), currentI);
  69. #endif
  70. }
  71. if(job->getSendCounter() < retryCount) {
  72. job->setState(JobState::ready);
  73. fpga = findAvailableFPGA();
  74. if(fpga == NULL) {
  75. lastI = currentI;
  76. goto fullQueue;
  77. break;
  78. }
  79. if(fpga->assignJob(job) >= 0) {
  80. job->setSent();
  81. #ifdef DEBUG_WORKER
  82. sentBytes += job->getByteCount();
  83. printf("job %08X: \x1b[33mreassigned\x1b[0m no.: %3lu\n", job->getJobId(), currentI);
  84. #endif
  85. }
  86. } else {
  87. job->setState(JobState::failed);
  88. job->setReceived(false);
  89. }
  90. }
  91. break;
  92. case JobState::receiving:
  93. break;
  94. case JobState::finished:
  95. remainingJobs--;
  96. break;
  97. case JobState::failed:
  98. remainingJobs--;
  99. break;
  100. }
  101. }
  102. }
  103. if(remainingJobs <= 0) {
  104. break;
  105. }
  106. fullQueue:
  107. currentJobList->waitOne(jobTimeout);
  108. #ifdef DEBUG_WORKER
  109. then = Clock::now();
  110. printf("loop: %3ld ms sent: %5lu kB remaining: %lu\n", std::chrono::duration_cast<milliseconds>(then - now).count(), sentBytes/1024, remainingJobs);
  111. #endif
  112. }
  113. }
  114. if(doneCb != NULL)
  115. doneCb();
  116. running = false;
  117. return 0;
  118. }
  119. commFPGA* Worker::findAvailableFPGA() {
  120. uint_least32_t minCnt = JOB_COUNT-1;
  121. commFPGA *fpga = NULL;
  122. for(auto it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
  123. uint_least32_t cnt = it->get()->jobCount();
  124. if(cnt < minCnt) {
  125. minCnt = cnt;
  126. fpga = it->get();
  127. }
  128. }
  129. return fpga;
  130. }
  131. void Worker::setDoneCallback(DoneCallback cb) {
  132. doneCb = cb;
  133. }
  134. void Worker::waitUntilDone() {
  135. if(!running)
  136. return;
  137. jobList.second->waitAll();
  138. }