subDesTagesMitExtraKaese пре 4 година
родитељ
комит
a70a3d22f8

+ 3 - 5
lib/mlfpga/include/commFPGA.hpp

@@ -29,9 +29,9 @@
 
 #define UDP_LEN (1500-28-448) // size of sent UDP packets in bytes
 #define UDP_MTU (1500) // size of recv UDP buffer in bytes
-#define JOB_COUNT (1024 * 4) // max size of jobList
+#define JOB_COUNT (1024 * 4 * 10) // max size of jobList
 
-#define MAX_JOB_LEN (256*256) // max word count of job
+#define MAX_JOB_LEN (256*256*16) // max word count of job
 
 //#define DEBUG_JOB_RESP
 //#define DEBUG_JOB_SEND
@@ -84,7 +84,7 @@ class commFPGA {
 
   private:
     //tx buffer for buffered send function
-    uint32_t sendBuffer[MAX_JOB_LEN];
+    uint32_t *sendBuffer;
     int_least32_t sendBufferReadIndex = 0;
     int_least32_t sendBufferAvailable = 0;
     std::mutex sendLock;
@@ -92,8 +92,6 @@ class commFPGA {
     //list of pending responses
     std::unordered_map<uint32_t,std::shared_ptr<Job>> jobList;
     std::mutex jobLock;
-
-    //listener for a single FPGA
     
     sockaddr_storage addrDest = {};
     

+ 10 - 6
lib/mlfpga/src/commFPGA.cpp

@@ -151,6 +151,8 @@ commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
   port = _port;
   strcpy(ip, host);
 
+  sendBuffer = new uint32_t[MAX_JOB_LEN];
+
   int err = 0;
 
   struct addrinfo hints, *res;
@@ -191,8 +193,8 @@ commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
 
 }
 commFPGA::~commFPGA() {
-  //printf("%15s deleting job queue...\n", ip);
   running = false;
+  delete sendBuffer;
 }
 
 int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
@@ -235,7 +237,9 @@ int commFPGA::assignJob(JobContainer &job) {
   if(free < job->getWordCount())
     return -1;
   {
-    std::lock_guard<std::mutex> slk(sendLock);
+    std::unique_lock<std::mutex> slk(sendLock);
+    if(!slk.owns_lock())
+      return -1;
 
     free = MAX_JOB_LEN - sendBufferAvailable;
     if(free < job->getWordCount())
@@ -281,12 +285,12 @@ int commFPGA::unassignJob(JobContainer &job) {
 int commFPGA::sendFromBuffer() {
   std::lock_guard<std::mutex> lk(sendLock);
   int_least32_t avail = sendBufferAvailable + sendBufferReadIndex > MAX_JOB_LEN ? MAX_JOB_LEN - sendBufferReadIndex : sendBufferAvailable;
-  
+
   if(avail == 0)
     return -1;
 
-  if(avail*4 > UDP_LEN)
-    avail = UDP_LEN / 4;
+  if(avail > UDP_LEN/4)
+    avail = UDP_LEN/4;
 
   int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], avail * 4);
 
@@ -300,7 +304,7 @@ int commFPGA::sendFromBuffer() {
   sendBufferReadIndex = (avail + sendBufferReadIndex) % MAX_JOB_LEN;
   sendBufferAvailable -= avail;
 
-  return rc;
+  return sendBufferAvailable;
 }
 
 size_t commFPGA::jobCount() {

+ 2 - 2
lib/mlfpga/src/connectionManager.cpp

@@ -44,9 +44,9 @@ void ConnectionManager::sendThread() {
   while(running) {
     Clock::time_point start = Clock::now();
     for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
-      it->get()->sendFromBuffer();
+      auto fpga = it->get();
+      fpga->sendFromBuffer();
     }
-    //printf("%8d %8d\n", fpgas[0].sendBufferWriteIndex, fpgas[0].sendBufferReadIndex);
     auto elapsed = Clock::now() - start;
     if(elapsed < sendDelay)
       std::this_thread::sleep_for(sendDelay - elapsed);

+ 48 - 43
lib/mlfpga/src/worker.cpp

@@ -29,59 +29,64 @@ int Worker::threadMain() {
       commFPGA *fpga;
       
       for(size_t i=0; i<currentJobList->getJobCount(); i++) {
-        auto job = currentJobList->getJob(i);
-        switch(job->getState()) {
-          case JobState::initialized:
-            throw("worker can't send job that is not ready");
-            break;
-          case JobState::ready:
-            fpga = findAvailableFPGA();
-            if(fpga == NULL) {
-            break;
-            }
-            if(fpga->assignJob(job) >= 0) {
-              job->setSent();
-              //printf("job %08X: assigned\n", job->getJobId());
-            }
-            break;
-          case JobState::sent:
-            if(now - job->getSent() > jobTimeout) {
-              fpga = (commFPGA*)job->getAssignedFPGA();
-              if(fpga != NULL) {
-                if(fpga->unassignJob(job) < 0)
-                  break;
-                //printf("job %08X: unassigned\n", job->getJobId());
+        {
+          auto job = currentJobList->getJob(i);
+          switch(job->getState()) {
+            case JobState::initialized:
+              throw("worker can't send job that is not ready");
+              break;
+            case JobState::ready:
+              fpga = findAvailableFPGA();
+              if(fpga == NULL) {
+                goto fullQueue;
+                break;
               }
-              if(job->getSendCounter() < retryCount) {
-                job->setState(JobState::ready);
-                fpga = findAvailableFPGA();
-                if(fpga == NULL) {
-                  break;
+              if(fpga->assignJob(job) >= 0) {
+                job->setSent();
+                //printf("job %08X: assigned\n", job->getJobId());
+              }
+              break;
+            case JobState::sent:
+              if(now - job->getSent() > jobTimeout) {
+                fpga = (commFPGA*)job->getAssignedFPGA();
+                if(fpga != NULL) {
+                  if(fpga->unassignJob(job) < 0)
+                    break;
+                  //printf("job %08X: unassigned\n", job->getJobId());
                 }
-                if(fpga->assignJob(job) >= 0) {
-                  job->setSent();
-                  //printf("job %08X: reassigned\n", job->getJobId());
+                if(job->getSendCounter() < retryCount) {
+                  job->setState(JobState::ready);
+                  fpga = findAvailableFPGA();
+                  if(fpga == NULL) {
+                    goto fullQueue;
+                    break;
+                  }
+                  if(fpga->assignJob(job) >= 0) {
+                    job->setSent();
+                    //printf("job %08X: reassigned\n", job->getJobId());
+                  }
+                } else {
+                  job->setState(JobState::failed);
+                  job->setReceived(false);
                 }
-              } else {
-                job->setState(JobState::failed);
-                job->setReceived(false);
               }
-            }
-            break;
-          case JobState::receiving:
+              break;
+            case JobState::receiving:
 
-            break;
-          case JobState::finished:
-            remainingJobs--;
-            break;
-          case JobState::failed:
-            remainingJobs--;
-            break;
+              break;
+            case JobState::finished:
+              remainingJobs--;
+              break;
+            case JobState::failed:
+              remainingJobs--;
+              break;
+          }
         }
       }
       if(remainingJobs <= 0) {
         break;
       }
+      fullQueue:
       currentJobList->waitOne(jobTimeout);
     }
   }

+ 3 - 3
tests/main.cpp

@@ -7,9 +7,9 @@ size_t s=0, f=0, r=0;
 std::mutex statsLk;
 
 void work() {
-    auto worker = connectionManager.createWorker(Module::dummyBigModule, 10000);
+    auto worker = connectionManager.createWorker(Module::dummyBigModule, 1000);
 
-    worker->setJobTimeout(milliseconds(100));
+    worker->setJobTimeout(milliseconds(50));
     worker->setRetryCount(10);
     worker->setDoneCallback([worker](){
         auto jobs = worker->getJobList();
@@ -53,7 +53,7 @@ int main(void)
     connectionManager.addFPGA("192.168.1.32", 1234);
     connectionManager.addFPGA("192.168.1.32", 1234);
 
-    //connectionManager.setSendDelay(microseconds(0));
+    connectionManager.setSendDelay(microseconds(1));
 
     connectionManager.start();