Bläddra i källkod

Added worker removal

subDesTagesMitExtraKaese 4 år sedan
förälder
incheckning
a71dc70f38

+ 1 - 1
lib/mlfpga/include/commFPGA.hpp

@@ -31,7 +31,7 @@
 #define UDP_MTU (1500) // size of recv UDP buffer in bytes
 #define JOB_COUNT (1024 * 4 * 10) // max size of jobList
 
-#define MAX_JOB_LEN (256*256*16) // max word count of job
+#define MAX_JOB_LEN (256*256) // max word count of job
 
 //#define DEBUG_JOB_RESP
 //#define DEBUG_JOB_SEND

+ 2 - 0
lib/mlfpga/include/connectionManager.hpp

@@ -8,6 +8,7 @@
 #include <future>
 #include <mutex>
 #include <condition_variable>
+#include <algorithm>
 
 #include "commFPGA.hpp"
 #include "worker.hpp"
@@ -44,6 +45,7 @@ class ConnectionManager {
     Worker* createWorker(Module mod, size_t numberOfJobs = 1);
     Worker* getWorker(size_t i) const {return &(*workers.at(i));}
     size_t getWorkerCount() const {return workers.size();}
+    void removeFinishedWorkers();
 
     void setSendDelay(microseconds us) {sendDelay = us;}
 

+ 2 - 0
lib/mlfpga/include/worker.hpp

@@ -25,6 +25,8 @@ class Worker {
     void setDoneCallback(DoneCallback cb);
     void waitUntilDone();
 
+    bool isRunning() const {return running;}
+
   private:
     std::pair<std::mutex, std::shared_ptr<JobList>> jobList;
     std::vector<std::unique_ptr<commFPGA>> *fpgaVector;

+ 10 - 16
lib/mlfpga/src/commFPGA.cpp

@@ -230,27 +230,19 @@ int commFPGA::assignJob(JobContainer &job) {
   if(jobList.size() >= JOB_COUNT)
     return -1;
 
-  
+  std::lock_guard<std::mutex> slk(sendLock);
+
   uint_least32_t free = MAX_JOB_LEN - sendBufferAvailable;
-  uint_least32_t sendBufferWriteIndex;
 
   if(free < job->getWordCount())
     return -1;
-  {
-    std::unique_lock<std::mutex> slk(sendLock);
-    if(!slk.owns_lock())
-      return -1;
-
-    free = MAX_JOB_LEN - sendBufferAvailable;
-    if(free < job->getWordCount())
-      return -1;
 
-    jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job.sharedPtr()));
-    job->setAssignedFPGA(this);
+  jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job.sharedPtr()));
+  job->setAssignedFPGA(this);
 
-    sendBufferWriteIndex = sendBufferReadIndex + sendBufferAvailable;
-    sendBufferAvailable += job->getWordCount();
-  }
+  uint_least32_t sendBufferWriteIndex = sendBufferReadIndex + sendBufferAvailable;
+  sendBufferAvailable += job->getWordCount();
+  
   for(uint_least32_t i=0; i<job->getWordCount(); i++) {
     sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(job->getWord(i));
   }
@@ -292,7 +284,9 @@ int commFPGA::sendFromBuffer() {
   if(avail > UDP_LEN/4)
     avail = UDP_LEN/4;
 
-  int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], avail * 4);
+  sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], avail * 4);
+
+  //printf("%8d %4d %8lu\n", sendBufferAvailable, avail, sendBufferReadIndex);
 
   #ifdef DEBUG_JOB_SEND
   printf("send ");

+ 12 - 0
lib/mlfpga/src/connectionManager.cpp

@@ -18,6 +18,18 @@ Worker* ConnectionManager::createWorker(Module mod, size_t numberOfJobs) {
   return w;
 }
 
+void ConnectionManager::removeFinishedWorkers() {
+  workers.erase(
+    std::remove_if(
+      workers.begin(),
+      workers.end(),
+      [&] (std::unique_ptr<Worker> const& p) {
+        return !p.get()->isRunning();
+      }),
+    workers.end()
+  );
+}
+
 void ConnectionManager::startFromTensorflow() {
   if(isRunning())
     return;

+ 10 - 2
lib/mlfpga/src/worker.cpp

@@ -22,6 +22,7 @@ JobListContainer Worker::getJobList() {
 int Worker::threadMain() {
   pthread_setname_np(pthread_self(), "mlfpga worker");
   {
+    size_t lastI = 0;
     auto currentJobList = getJobList();
     while(running) {
       size_t remainingJobs = currentJobList->getJobCount();
@@ -30,7 +31,8 @@ int Worker::threadMain() {
       
       for(size_t i=0; i<currentJobList->getJobCount(); i++) {
         {
-          auto job = currentJobList->getJob(i);
+          size_t currentI = (lastI + i) % currentJobList->getJobCount();
+          auto job = currentJobList->getJob(currentI);
           switch(job->getState()) {
             case JobState::initialized:
               throw("worker can't send job that is not ready");
@@ -38,6 +40,7 @@ int Worker::threadMain() {
             case JobState::ready:
               fpga = findAvailableFPGA();
               if(fpga == NULL) {
+                lastI = currentI;
                 goto fullQueue;
                 break;
               }
@@ -58,6 +61,7 @@ int Worker::threadMain() {
                   job->setState(JobState::ready);
                   fpga = findAvailableFPGA();
                   if(fpga == NULL) {
+                    lastI = currentI;
                     goto fullQueue;
                     break;
                   }
@@ -93,13 +97,15 @@ int Worker::threadMain() {
   
   if(doneCb != NULL)
     doneCb();
+
+  running = false;
   return 0;
 }
 
 commFPGA* Worker::findAvailableFPGA() {
   uint_least32_t minCnt = JOB_COUNT-1;
   commFPGA *fpga = NULL;
-  for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
+  for(auto it=fpgaVector->begin(); it!=fpgaVector->end(); it++) {
     uint_least32_t cnt = it->get()->jobCount();
     if(cnt < minCnt) {
       minCnt = cnt;
@@ -114,5 +120,7 @@ void Worker::setDoneCallback(DoneCallback cb) {
 }
 
 void Worker::waitUntilDone() {
+  if(!running)
+    return;
   jobList.second->waitAll();
 }

+ 1 - 0
src/conv2D.cpp

@@ -88,6 +88,7 @@ namespace tf_lib {
         }
       }
       done();
+      connectionManager.removeFinishedWorkers();
     });
 
     worker->startAsync();

+ 1 - 0
src/dummyBigOp.cpp

@@ -43,6 +43,7 @@ namespace tf_lib {
         output_tensor(i) = job->getResponsePayload(i);
       }
       done();
+      connectionManager.removeFinishedWorkers();
     });
 
     worker->startAsync();

+ 1 - 0
src/dummyOp.cpp

@@ -45,6 +45,7 @@ namespace tf_lib {
         output_tensor(i) = job->getResponsePayload(i);
       }
       done();
+      connectionManager.removeFinishedWorkers();
     });
 
     worker->startAsync();

+ 11 - 6
tests/main.cpp

@@ -9,7 +9,7 @@ std::mutex statsLk;
 void work() {
     auto worker = connectionManager.createWorker(Module::dummyBigModule, 1000);
 
-    worker->setJobTimeout(milliseconds(50));
+    worker->setJobTimeout(milliseconds(1000));
     worker->setRetryCount(10);
     worker->setDoneCallback([worker](){
         auto jobs = worker->getJobList();
@@ -53,15 +53,20 @@ int main(void)
     connectionManager.addFPGA("192.168.1.32", 1234);
     connectionManager.addFPGA("192.168.1.32", 1234);
 
-    connectionManager.setSendDelay(microseconds(1));
+    connectionManager.setSendDelay(microseconds(50));
 
     connectionManager.start();
 
-    for(int i=0; i<8; i++)
-        work();
+    int workNum = 100;
     
-    for(size_t i=0; i<connectionManager.getWorkerCount(); i++) {
-        connectionManager.getWorker(i)->waitUntilDone();
+    while(workNum > 0 || connectionManager.getWorkerCount() > 0) {
+        connectionManager.removeFinishedWorkers();
+        while(workNum > 0 && connectionManager.getWorkerCount() < 8) {
+            workNum--;
+            work();
+        }
+        printf("work: %2d   worker: %2lu\n", workNum, connectionManager.getWorkerCount());
+        std::this_thread::sleep_for(milliseconds(300));
     }
 
     std::unique_lock<std::mutex> lk(statsLk);