Browse Source

bug fixes

subDesTagesMitExtraKaese 4 years ago
parent
commit
be447afb96

+ 10 - 7
lib/mlfpga/include/commFPGA.hpp

@@ -29,10 +29,12 @@
 
 #define UDP_LEN (1500-28-448) // size of sent UDP packets in bytes
 #define UDP_MTU (1500) // size of recv UDP buffer in bytes
-#define JOB_COUNT (1024 * 4)
+#define JOB_COUNT (1024 * 4) // max size of jobList
 
-#define DEBUG_JOB_RESP
+#define MAX_JOB_LEN (256*256) // max word count of job
 
+//#define DEBUG_JOB_RESP
+//#define DEBUG_JOB_SEND
 
 typedef std::chrono::high_resolution_clock Clock;
 typedef std::chrono::milliseconds milliseconds;
@@ -58,8 +60,8 @@ class commFPGA {
 
     //called by worker thread
     
-    int assignJob(std::shared_ptr<Job> &job);
-    int unassignJob(std::shared_ptr<Job> &job);
+    int assignJob(JobContainer &job);
+    int unassignJob(JobContainer &job);
 
     size_t jobCount();
     
@@ -72,7 +74,7 @@ class commFPGA {
     void recvUDP();
     int parseRaw(uint32_t *buf, size_t bufLen);
     
-    std::unordered_map<uint32_t,std::shared_ptr<Job>>::iterator currentJob;
+    std::shared_ptr<Job> currentJob;
     RecvState recvState = RecvState::checkPreamble;
     size_t recvPayloadIndex = 0;
 
@@ -83,8 +85,9 @@ class commFPGA {
   private:
     //tx buffer for buffered send function
     uint32_t sendBuffer[MAX_JOB_LEN];
-    uint_least32_t sendBufferReadIndex = 0;
-    uint_least32_t sendBufferWriteIndex = 0;
+    int_least32_t sendBufferReadIndex = 0;
+    int_least32_t sendBufferAvailable = 0;
+    std::mutex sendLock;
 
     //list of pending responses
     std::unordered_map<uint32_t,std::shared_ptr<Job>> jobList;

+ 5 - 4
lib/mlfpga/include/connectionManager.hpp

@@ -40,11 +40,11 @@ class ConnectionManager {
 
     void start();
 
-    //send many Jobs and wait for all responses
-    int sendJobListSync(std::shared_ptr<JobList> &jobList);
+    Worker* createWorker(Module mod, size_t numberOfJobs);
+    Worker* getWorker(size_t i) const {return &(*workers.at(i));}
+    size_t getWorkerCount() const {return workers.size();}
 
-    //send many Jobs and call back
-    int sendJobListAsync(std::shared_ptr<JobList> &jobList);
+    void setSendDelay(microseconds us) {sendDelay = us;}
 
   private:
     std::vector<std::unique_ptr<commFPGA>> fpgas;
@@ -54,6 +54,7 @@ class ConnectionManager {
     std::future<void> sendResult;
 
     bool running = true;
+    microseconds sendDelay = microseconds(50);
 };
 
 #endif

+ 33 - 24
lib/mlfpga/include/job.hpp

@@ -23,8 +23,6 @@ typedef std::function<void()> DoneCallback;
 
 #define PREAMBLE (0xE1E4C312)
 
-#define MAX_JOB_LEN (256*256)
-
 enum class JobState {
   initialized,  //Job was created
   ready,        //Job is ready to be sent
@@ -44,12 +42,14 @@ class WordBuffer {
     uint32_t* getWordAddr() const {return words;}
 
     uint8_t getByte(size_t i) const {return bytes[i];}
-    uint8_t getWord(size_t i) const {return words[i];}
+    void setByte(size_t i, uint32_t v) const {bytes[i] = v;}
+    uint32_t getWord(size_t i) const {return words[i];}
+    void setWord(size_t i, uint32_t v) const {words[i] = v;}
 
     size_t getWordCount() const {return wordCount;}
     size_t getByteCount() const {return wordCount*4;}
 
-  protected:
+  private:
       size_t wordCount;
       union {
         uint8_t *bytes;
@@ -57,26 +57,25 @@ class WordBuffer {
       };
 };
 
-//data structure that is sent over the network
+//wrapper for data structure that is sent over the network
 class JobData : public WordBuffer {
   public:
     JobData(uint payloadLength);
 
-    uint32_t getPreamble() const {return words[0];}
-    void setPreamble(uint32_t v) const {words[0] = v;}
-
-    uint32_t getJobId() const {return words[1];}
-    void setJobId(uint32_t v) const {words[1] = v;}
+    uint32_t getPreamble() const {return getWord(0);}
+    void setPreamble(uint32_t v) const {setWord(0, v);}
 
-    uint32_t getModuleId() const {return words[2];}
-    void setModuleId(uint32_t v) const {words[2] = v;}
+    uint32_t getJobId() const {return getWord(1);}
+    void setJobId(uint32_t v) const {setWord(1, v);}
 
-    uint32_t getPayload(size_t i) const {return words[i+3];}
-    void setPayload(size_t i, uint32_t v) const {words[i+3] = v;}
+    uint32_t getModuleId() const {return getWord(2);}
+    void setModuleId(uint32_t v) const {setWord(2, v);}
 
-    uint32_t getCRC() const {return words[wordCount-1];}
-    void setCRC(uint32_t v) const {words[wordCount-1] = v;}
+    uint32_t getPayload(size_t i) const {return getWord(i+3);}
+    void setPayload(size_t i, uint32_t v) const {setWord(i+3, v);}
 
+    uint32_t getCRC() const {return getWord(getWordCount()-1);}
+    void setCRC(uint32_t v) const {setWord(getWordCount()-1, v);}
 };
 
 //entity to track a single Job
@@ -86,15 +85,8 @@ class Job : public JobData {
 
     uint32_t tag = 0;
 
-    //locks state
-    std::mutex stateMutex;
-    //locks sendBuf
-    std::mutex sendMutex;
-    //locks recvBuf, recvWordIndex
-    std::mutex recvMutex;
-
     uint32_t getResponsePayload(size_t i) const {return recvBuf.getWord(i);}
-    void setResponsePayload(size_t i, uint32_t v) const {recvBuf.getWordAddr()[i] = v;}
+    void setResponsePayload(size_t i, uint32_t v) const {recvBuf.setWord(i, v);}
     uint32_t* getResponseAddr() const {return recvBuf.getWordAddr();}
     size_t getResponseBufferWordCount() const {return recvBuf.getWordCount();}
 
@@ -117,7 +109,9 @@ class Job : public JobData {
 
     size_t getSendCounter() const {return sendCounter;}
 
+    std::mutex jobLock;
   private:
+    
     //only payload and CRC of response
     WordBuffer recvBuf;
     DoneCallback doneCb = NULL;
@@ -131,4 +125,19 @@ class Job : public JobData {
 
 };
 
+//thread safe Job container
+class JobContainer {
+  public:
+    JobContainer(std::shared_ptr<Job> &p) : job(p), lock(p->jobLock) {
+
+    };
+    Job * operator->()const { return job.get(); }
+    Job & operator*() const { return *job; }
+
+    std::shared_ptr<Job>& sharedPtr() {return job;}
+  private:
+    std::shared_ptr<Job> job;
+    std::unique_lock<std::mutex> lock;
+};
+
 #endif

+ 19 - 6
lib/mlfpga/include/jobList.hpp

@@ -14,17 +14,15 @@ class JobList {
     void waitOne(microseconds us);
     void finishJob();
 
-    void setDoneCallback(DoneCallback cb);
-
     size_t getPendingJobCount() const {return pendingJobCount;}
 
-    std::shared_ptr<Job>& getJob(size_t i);
-
-    std::shared_ptr<Job>& getNextJob();
+    JobContainer getJob(size_t i);
+    JobContainer getNextJob();
     size_t getJobCount() const {return jobCount;}
+    
+    std::mutex jobListLock;
   private:
     std::vector<std::shared_ptr<Job>> jobs;
-    DoneCallback doneCb;
 
     size_t jobCount;
     size_t pendingJobCount;
@@ -34,4 +32,19 @@ class JobList {
     size_t nextJobIndex = 0;
 };
 
+class JobListContainer {
+  public:
+    JobListContainer(std::pair<std::mutex, std::shared_ptr<JobList>> &p) : jobList(p.second), lock(p.first) {
+      
+    };
+
+    JobList * operator->()const { return jobList.get(); }
+    JobList & operator*() const { return *jobList; }
+
+    std::shared_ptr<JobList>& sharedPtr() {return jobList;}
+  private:
+    std::shared_ptr<JobList> jobList;
+    std::unique_lock<std::mutex> lock;
+};
+
 #endif

+ 14 - 8
lib/mlfpga/include/worker.hpp

@@ -11,28 +11,34 @@
 
 class Worker {
   public:
-    Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas);
+    Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas, Module mod, size_t numberOfJobs);
     ~Worker();
 
     void startAsync();
     void startSync();
     
-    int assignJobList(std::shared_ptr<JobList> &jobList);
+    JobListContainer getJobList();
+
+    void setJobTimeout(microseconds us) {jobTimeout = us;}
+    void setRetryCount(size_t n) {retryCount = n;}
+
+    void setDoneCallback(DoneCallback cb);
+    void waitUntilDone();
 
   private:
-    std::mutex currentJobList_m;
-    std::shared_ptr<JobList> currentJobList = NULL;
+    std::pair<std::mutex, std::shared_ptr<JobList>> jobList;
     std::vector<std::unique_ptr<commFPGA>> *fpgaVector;
 
     commFPGA* findAvailableFPGA();
-    void sendJob(std::shared_ptr<Job> &job);
     
-
     std::future<int> result;
     int threadMain();
 
-    std::condition_variable hasJobList;
-    void waitForJobList();
+    microseconds jobTimeout = microseconds(1000);
+    size_t retryCount = 10;
+
+    DoneCallback doneCb = NULL;
+    bool running = true;
 };
 
 #endif

+ 101 - 63
lib/mlfpga/src/commFPGA.cpp

@@ -1,6 +1,7 @@
 
 #include "../include/commFPGA.hpp"
 
+
 int resolvehelper(const char* hostname, int family, const char* service, sockaddr_storage* pAddr)
 {
     int result;
@@ -27,6 +28,7 @@ void commFPGA::start() {
 }
 
 void commFPGA::recvUDP() {
+  pthread_setname_np(pthread_self(), "mlfpga recv");
   while(running) {
     int result = 0;
 
@@ -35,7 +37,7 @@ void commFPGA::recvUDP() {
     uint slen = sizeof(addrDest);
     result = recvfrom(sock, (uint8_t*)buf, UDP_MTU/4, 0, (sockaddr*)&addrDest, &slen);
     if(result == -1)
-      return;
+      continue;
 
     result /= 4;
 
@@ -43,15 +45,28 @@ void commFPGA::recvUDP() {
       buf[i] = __builtin_bswap32(buf[i]);
     }
 
+  #ifdef DEBUG_JOB_RESP
+  printf("recv ");
+  for(int_least32_t i=0; i<result; i++) 
+    printf("%u: %08X    ", i, buf[i]);
+  printf(" %d\n", (int)recvState);
+  #endif
+
     parseRaw(buf, result);
   }
 }
 
 int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
+  
+  std::unordered_map<uint32_t,std::shared_ptr<Job>>::iterator jobIt;
+  JobContainer *jobLocked = NULL;
+
   std::lock_guard<std::mutex> lk(jobLock);
 
-  for(size_t i=0; i < bufLen; i++) {
-    
+  if(currentJob != NULL)
+    jobLocked = new JobContainer(currentJob);
+
+  for(int_least32_t i=0; i < bufLen; i++) {
     switch(recvState) {
       case RecvState::checkPreamble:
         if(buf[i] == PREAMBLE) {
@@ -63,50 +78,60 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
         break;
 
       case RecvState::checkJobId: 
-        currentJob = jobList.find(buf[i]);
-        if(currentJob == jobList.end()) {
-          i -= 1;
-          recvState = RecvState::checkPreamble;
+        jobIt = jobList.find(buf[i]);
+        if(jobIt == jobList.end()) {
           #ifdef DEBUG_JOB_RESP
-            printf("job %08X jobId not found\n", buf[i]);
-          #endif
-        } else if(currentJob->second->getState() != JobState::sent) {
-          #ifdef DEBUG_JOB_RESP
-            printf("job %08X wasn't sent\n", buf[i]);
+            printf("job %08X jobId not found, %u\n", buf[i], i);
           #endif
           i -= 1;
           recvState = RecvState::checkPreamble;
         } else {
-          assert(currentJob->second->getAssignedFPGA() == this);
+          currentJob = jobIt->second;
+          //delete old lock
+          if(jobLocked) delete jobLocked;
+          //aquire lock
+          jobLocked = new JobContainer(currentJob);
+          if((*jobLocked)->getState() != JobState::sent) {
+            #ifdef DEBUG_JOB_RESP
+              printf("job %08X wasn't sent\n", buf[i]);
+            #endif
+            i -= 1;
+            recvState = RecvState::checkPreamble;
+          } else {
+          assert((*jobLocked)->getAssignedFPGA() == this);
           recvState = RecvState::checkModuleId;
+          }
         }
         break;
       
       case RecvState::checkModuleId:
-        if(currentJob->second->getModuleId() == buf[i]) {
+        if((*jobLocked)->getModuleId() == buf[i]) {
           recvState = RecvState::writePayload;
           recvPayloadIndex = 0;
-          currentJob->second->setState(JobState::sent);
+          (*jobLocked)->setState(JobState::sent);
         } else {
-          i = i - 2 < 0 ? -1 : i - 2;
-          recvState = RecvState::checkPreamble;
           #ifdef DEBUG_JOB_RESP
-            printf("job %08X wrong moduleId %08X\n", currentJob->second->getJobId(), buf[i]);
+            printf("job %08X wrong moduleId %08X\n", (*jobLocked)->getJobId(), buf[i]);
           #endif
+          i = i - 2 < 0 ? -1 : i - 2;
+          recvState = RecvState::checkPreamble;
         }
         break;
       case RecvState::writePayload:
-        currentJob->second->setResponsePayload(recvPayloadIndex++, buf[i]);
-        if(recvPayloadIndex >= currentJob->second->getResponseBufferWordCount()) {
-          if(currentJob->second->checkCRC()) {
-            currentJob->second->setReceived(true);
-            jobList.erase(currentJob->second->getJobId());
+        (*jobLocked)->setResponsePayload(recvPayloadIndex++, buf[i]);
+        if(recvPayloadIndex >= (*jobLocked)->getResponseBufferWordCount()) {
+          if((*jobLocked)->checkCRC()) {
+            (*jobLocked)->setReceived(true);
+            #ifdef DEBUG_JOB_RESP
+              printf("job %08X: success!\n", (*jobLocked)->getJobId());
+            #endif
+            jobList.erase((*jobLocked)->getJobId());
           } else {
-            currentJob->second->setState(JobState::sent);
+            (*jobLocked)->setState(JobState::sent);
             #ifdef DEBUG_JOB_RESP
-              printf("job %08X wrong crc %08X, %4d, %4d\n", currentJob->second->getJobId(), buf[i], bufLen, i);
-              for(uint_least32_t k=0; k<currentJob->second->getWordCount(); k++) {
-                printf(" %4d %08X", k, currentJob->second->getWord(k));
+              printf("job %08X wrong crc %08X, %4lu, %4u\n", (*jobLocked)->getJobId(), buf[i], bufLen, i);
+              for(size_t k=0; k<(*jobLocked)->getWordCount(); k++) {
+                printf(" %4lu %08X", k, (*jobLocked)->getWord(k));
               }
               std::cout << std::endl;
             #endif
@@ -117,6 +142,8 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
     }
   }
 
+  if(jobLocked) delete jobLocked;
+
   return 0;
 }
 
@@ -152,8 +179,8 @@ commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
 
   //set recv buffer size
 
-  int rcvbufsize = MAX_JOB_LEN * 4 * 2;
-  setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&rcvbufsize,sizeof(rcvbufsize));
+  //int rcvbufsize = MAX_JOB_LEN * 4 * 2;
+  //setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&rcvbufsize,sizeof(rcvbufsize));
   
   //UDP client
   resolvehelper(host, AF_INET, std::to_string(port).c_str(), &addrDest);
@@ -165,8 +192,7 @@ commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
 }
 commFPGA::~commFPGA() {
   //printf("%15s deleting job queue...\n", ip);
-  
-  
+  running = false;
 }
 
 int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
@@ -179,81 +205,93 @@ int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
     if(payloadLen > UDP_LEN/4*4)
       payloadLen = UDP_LEN/4*4;
 
-    //printf("sending %d bytes at offset %d\n", payloadLen, byteIndex);
-
     result = sendto(sock, &buf[byteIndex], payloadLen, 0, (sockaddr*)&addrDest, sizeof(addrDest));
     if(result == -1) {
       int err = errno;
       std::cout << "error sending packet " << err << std::endl;
       break;
     } 
-    //usleep(50);
-    //printf("%d bytes sent\n", result);
-    //mark n * 4 bytes as sent
     byteIndex += payloadLen;
   }
   return byteIndex;
 }
 
-int commFPGA::assignJob(std::shared_ptr<Job> &job) {
+int commFPGA::assignJob(JobContainer &job) {
 
   if(job->getAssignedFPGA() != NULL)
     return -1;
 
-  std::lock_guard<std::mutex> lk(jobLock);
+  std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
+  if(!lk.owns_lock())
+    return -1;
+  
   if(jobList.size() >= JOB_COUNT)
     return -1;
+
+  std::lock_guard<std::mutex> slk(sendLock);
   
-  uint_least32_t free = (sendBufferReadIndex - sendBufferWriteIndex) % MAX_JOB_LEN;
-  if(free < job->getWordCount() && free != 0)
+  uint_least32_t free = MAX_JOB_LEN - sendBufferAvailable;
+  if(free < job->getWordCount())
     return -1;
 
-  jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job));
+  jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job.sharedPtr()));
   job->setAssignedFPGA(this);
 
+  uint_least32_t sendBufferWriteIndex = sendBufferReadIndex + sendBufferAvailable + 1;
+
   for(uint_least32_t i=0; i<job->getWordCount(); i++) {
     sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(job->getWord(i));
-    printf("%08X ", job->getWord(i));
   }
+  #ifdef DEBUG_JOB_SEND
+  printf("fill ");
+  for(uint_least32_t i=0; i<job->getWordCount(); i++) 
+    printf("%u: %08X    ", (sendBufferWriteIndex + i) % MAX_JOB_LEN, job->getWord(i));
   printf("\n");
-  sendBufferWriteIndex = (sendBufferWriteIndex + job->getWordCount()) % MAX_JOB_LEN;
+  #endif
+  sendBufferAvailable += job->getWordCount();
 
   return 0;
 }
-int commFPGA::unassignJob(std::shared_ptr<Job> &job) {
+int commFPGA::unassignJob(JobContainer &job) {
   if(job->getAssignedFPGA() != this)
     return -1;
-  
-  std::lock_guard<std::mutex> lk(jobLock);
+
+  std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
+  if(!lk.owns_lock())
+    return -1;
+
+  if(job->getState() == JobState::receiving) {
+    currentJob = NULL;
+    job->setState(JobState::sent);
+    #ifdef DEBUG_JOB_RESP
+      printf("job %08X: unassigned during recv\n", job->getJobId());
+    #endif
+  }
   job->setAssignedFPGA(NULL);
   return jobList.erase(job->getJobId());
 }
 
 int commFPGA::sendFromBuffer() {
-  uint_least32_t avail = (sendBufferWriteIndex - sendBufferReadIndex) % MAX_JOB_LEN;
+  std::lock_guard<std::mutex> lk(sendLock);
+  int_least32_t avail = sendBufferAvailable + sendBufferReadIndex > MAX_JOB_LEN ? MAX_JOB_LEN - sendBufferReadIndex : sendBufferAvailable;
   
-  if(avail <= 0)
+  if(avail == 0)
     return -1;
-    
-  uint_least32_t readEnd;
 
   if(avail*4 > UDP_LEN)
-    readEnd = sendBufferReadIndex + UDP_LEN / 4;
-  else
-    readEnd = sendBufferReadIndex + avail;
-
-  if(readEnd >= MAX_JOB_LEN)
-    readEnd = MAX_JOB_LEN;
+    avail = UDP_LEN / 4;
 
-  //printf("avail %5d read %5d write %5d len %5d\n", avail, sendBufferReadIndex, sendBufferWriteIndex, (readEnd - sendBufferReadIndex));
-
-  int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], (readEnd - sendBufferReadIndex) * 4);
+  int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], avail * 4);
 
+  #ifdef DEBUG_JOB_SEND
+  printf("send ");
+  for(size_t i=0; i<avail; i++)
+    printf("%lu: %08X    ", sendBufferReadIndex+i,  __builtin_bswap32(sendBuffer[sendBufferReadIndex+i]));
+  printf("\n");
+  #endif
 
-  if(readEnd < MAX_JOB_LEN)
-    sendBufferReadIndex = readEnd;
-  else
-    sendBufferReadIndex = 0;
+  sendBufferReadIndex = (avail + sendBufferReadIndex) % MAX_JOB_LEN;
+  sendBufferAvailable -= avail;
 
   return rc;
 }

+ 8 - 14
lib/mlfpga/src/connectionManager.cpp

@@ -12,17 +12,10 @@ void ConnectionManager::addFPGA(const char* ip, const uint port, bool bindSelf)
   fpgas.back()->start();
 }
 
-int ConnectionManager::sendJobListAsync(std::shared_ptr<JobList> &jobList) {
-  workers.emplace_back(new Worker(&fpgas));
-  workers.back()->assignJobList(jobList);
-  workers.back()->startAsync();
-  return 0;
-}
-int ConnectionManager::sendJobListSync(std::shared_ptr<JobList> &jobList) {
-  workers.emplace_back(new Worker(&fpgas));
-  workers.back()->assignJobList(jobList);
-  workers.back()->startSync();
-  return 0;
+Worker* ConnectionManager::createWorker(Module mod, size_t numberOfJobs) {
+  Worker *w = new Worker(&fpgas, mod, numberOfJobs);
+  workers.emplace_back(w);
+  return w;
 }
 
 void ConnectionManager::start() {
@@ -30,14 +23,15 @@ void ConnectionManager::start() {
 }
 
 void ConnectionManager::sendThread() {
+  pthread_setname_np(pthread_self(), "mlfpga send");
   while(running) {
     Clock::time_point start = Clock::now();
     for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
       it->get()->sendFromBuffer();
     }
     //printf("%8d %8d\n", fpgas[0].sendBufferWriteIndex, fpgas[0].sendBufferReadIndex);
-    uint us = std::chrono::duration_cast<microseconds>(Clock::now() - start).count();
-    if(us < 50)
-      usleep(50 - us);
+    auto elapsed = Clock::now() - start;
+    if(elapsed < sendDelay)
+      std::this_thread::sleep_for(sendDelay - elapsed);
   }
 }

+ 3 - 3
lib/mlfpga/src/job.cpp

@@ -30,11 +30,11 @@ void Job::calcCRC() {
 
 //checks CRC of recvBuf
 bool Job::checkCRC() {
-  uint32_t sum = getPreamble() + getJobId() + getModuleId();
-  for(uint_least32_t i=1; i<recvBuf.getWordCount()-1; i++) {
+  uint32_t sum = getJobId() + getModuleId();
+  for(uint_least32_t i=0; i<recvBuf.getWordCount(); i++) {
     sum += recvBuf.getWord(i);
   }
-  return recvBuf.getWord(recvBuf.getWordCount()-1) == -sum;
+  return sum == 0;
 }
 
 void Job::setDoneCallback(DoneCallback cb) {

+ 4 - 8
lib/mlfpga/src/jobList.cpp

@@ -28,15 +28,11 @@ void JobList::finishJob() {
   jobListDone.notify_all();
 }
 
-std::shared_ptr<Job>& JobList::getJob(size_t i) {
-  return jobs.at(i);
+JobContainer JobList::getJob(size_t i) {
+  return JobContainer(jobs.at(i));
 }
 
-std::shared_ptr<Job>& JobList::getNextJob() {
+JobContainer JobList::getNextJob() {
   nextJobIndex = (nextJobIndex+1) % jobCount;
-  return jobs.at(nextJobIndex);
-}
-
-void JobList::setDoneCallback(DoneCallback cb) {
-  doneCb = cb;
+  return JobContainer(jobs.at(nextJobIndex));
 }

+ 74 - 62
lib/mlfpga/src/worker.cpp

@@ -1,10 +1,11 @@
 #include "worker.hpp"
 
-Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas) {
+Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas, Module mod, size_t numberOfJobs) : 
+jobList(std::piecewise_construct, std::make_tuple(), std::make_tuple(new JobList(mod, numberOfJobs))) {
   fpgaVector = fpgas;
 }
 Worker::~Worker() {
-  hasJobList.notify_all();
+  running = false;
 }
 
 void Worker::startAsync() {
@@ -14,79 +15,82 @@ void Worker::startSync() {
   threadMain();
 }
 
-int Worker::assignJobList(std::shared_ptr<JobList> &jobList) {
-  std::lock_guard<std::mutex> lk(currentJobList_m);
-  if(currentJobList != NULL)
-    return -1;
-  
-  currentJobList = jobList;
-  hasJobList.notify_all();
-
-  return 0;
+JobListContainer Worker::getJobList() {
+  return JobListContainer(jobList);
 }
 
 int Worker::threadMain() {
-  if(currentJobList == NULL)
-    return -1;
-
-  while(true) {
-    size_t remainingJobs = currentJobList->getJobCount();
-    Clock::time_point now = Clock::now(); 
-    commFPGA *fpga;
-    
-    for(size_t i=0; i<currentJobList->getJobCount(); i++) {
-      std::shared_ptr<Job> &job = currentJobList->getJob(i);
-      switch(job->getState()) {
-        case JobState::initialized:
+  pthread_setname_np(pthread_self(), "mlfpga worker");
+  {
+    auto currentJobList = getJobList();
+    while(running) {
+      size_t remainingJobs = currentJobList->getJobCount();
+      Clock::time_point now = Clock::now(); 
+      commFPGA *fpga;
+      
+      for(size_t i=0; i<currentJobList->getJobCount(); i++) {
+        auto job = currentJobList->getJob(i);
+        switch(job->getState()) {
+          case JobState::initialized:
 
-          break;
-        case JobState::ready:
-          sendJob(job);
-          break;
-        case JobState::sent:
-          if(std::chrono::duration_cast<microseconds>(now - job->getSent()).count() > 1000) {
-            fpga = (commFPGA*)job->getAssignedFPGA();
-            if(fpga != NULL) {
-              fpga->unassignJob(job);
+            break;
+          case JobState::ready:
+            fpga = findAvailableFPGA();
+            if(fpga == NULL) {
+            break;
             }
-            if(job->getSendCounter() < 5) {
-              job->setState(JobState::ready);
-              sendJob(job);
-            } else {
-              job->setState(JobState::failed);
-              job->setReceived(false);
+            if(fpga->assignJob(job) >= 0) {
+              job->setSent();
+              //printf("job %08X: assigned\n", job->getJobId());
             }
-          }
-          break;
-        case JobState::receiving:
+            break;
+          case JobState::sent:
+            if(now - job->getSent() > jobTimeout) {
+              fpga = (commFPGA*)job->getAssignedFPGA();
+              if(fpga != NULL) {
+                if(fpga->unassignJob(job) < 0)
+                  break;
+                //printf("job %08X: unassigned\n", job->getJobId());
+              }
+              if(job->getSendCounter() < retryCount) {
+                job->setState(JobState::ready);
+                fpga = findAvailableFPGA();
+                if(fpga == NULL) {
+                  break;
+                }
+                if(fpga->assignJob(job) >= 0) {
+                  job->setSent();
+                  //printf("job %08X: reassigned\n", job->getJobId());
+                }
+              } else {
+                job->setState(JobState::failed);
+                job->setReceived(false);
+              }
+            }
+            break;
+          case JobState::receiving:
 
-          break;
-        case JobState::finished:
-          remainingJobs--;
-          break;
-        case JobState::failed:
-          remainingJobs--;
-          break;
+            break;
+          case JobState::finished:
+            remainingJobs--;
+            break;
+          case JobState::failed:
+            remainingJobs--;
+            break;
+        }
       }
+      if(remainingJobs <= 0) {
+        break;
+      }
+      currentJobList->waitOne(jobTimeout);
     }
-    if(remainingJobs <= 0) {
-      break;
-    }
-    currentJobList->waitOne(microseconds(1000));
   }
+  
+  if(doneCb != NULL)
+    doneCb();
   return 0;
 }
 
-void Worker::sendJob(std::shared_ptr<Job> &job) {
-    commFPGA *fpga = findAvailableFPGA();
-    if(fpga == NULL) {
-      return;
-    }
-    if(fpga->assignJob(job) >= 0) {
-      job->setSent();
-    }
-}
-
 commFPGA* Worker::findAvailableFPGA() {
   uint_least32_t minCnt = JOB_COUNT-1;
   commFPGA *fpga = NULL;
@@ -98,4 +102,12 @@ commFPGA* Worker::findAvailableFPGA() {
     }
   }
   return fpga;
+}
+
+void Worker::setDoneCallback(DoneCallback cb) {
+  doneCb = cb;
+}
+
+void Worker::waitUntilDone() {
+  jobList.second->waitAll();
 }

+ 15 - 11
src/conv2D.cpp

@@ -54,26 +54,30 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 4>();
     auto output_tensor = output->tensor<int32, 4>();
 
-    std::shared_ptr<JobList> jobs(new JobList(Module::conv2D_5x5_Module, batchSize * channels * filters));
-
-    for(int sample=0; sample<batchSize; sample++) {
-      for(int channel=0; channel<channels; channel++) {
-        for(int filter=0; filter<filters; filter++) {
-          std::shared_ptr<Job> &job = jobs->getJob(sample * channels * filters + channel * filters + filter);
-          for(int x=0; x<outputSize; x++) {
-            for(int y=0; y<outputSize; y++) {
-              job->setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
+    auto worker = connectionManager.createWorker(Module::conv2D_5x5_Module, batchSize * channels * filters);
+    {
+      auto jobs = worker->getJobList();
+
+      for(int sample=0; sample<batchSize; sample++) {
+        for(int channel=0; channel<channels; channel++) {
+          for(int filter=0; filter<filters; filter++) {
+            auto job = jobs->getJob(sample * channels * filters + channel * filters + filter);
+            for(int x=0; x<outputSize; x++) {
+              for(int y=0; y<outputSize; y++) {
+                job->setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
+              }
             }
           }
         }
       }
     }
-    jobs->setDoneCallback([output_tensor, &jobs, done]{
+    worker->setDoneCallback([output_tensor, worker, done]{
+      auto jobs = worker->getJobList();
       output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
       done();
     });
 
-    connectionManager.sendJobListAsync(jobs);
+    worker->startAsync();
 
   }
 

+ 8 - 5
src/dummyOp.cpp

@@ -27,14 +27,17 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 1>();
     auto output_tensor = output->tensor<int32, 1>();
 
-    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, 1));
-    jobs->getJob(0)->setPayload(0, input_tensor(0));
-
-    jobs->setDoneCallback([output_tensor, &jobs, done]{
+    auto worker = connectionManager.createWorker(Module::dummyModule, 1);
+    {
+      auto jobs = worker->getJobList();
+      jobs->getJob(0)->setPayload(0, input_tensor(0));
+    }
+    worker->setDoneCallback([output_tensor, worker, done]{
+      auto jobs = worker->getJobList();
       output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
       done();
     });
 
-    connectionManager.sendJobListAsync(jobs);
+    worker->startSync();
   }
 }

+ 50 - 10
tests/main.cpp

@@ -3,25 +3,65 @@
 
 ConnectionManager connectionManager;
 
+size_t s=0, f=0, r=0;
+std::mutex statsLk;
+
+void work() {
+    auto worker = connectionManager.createWorker(Module::dummyModule, 1);
+
+    worker->setJobTimeout(microseconds(10000));
+    worker->setRetryCount(10);
+    worker->setDoneCallback([worker](){
+        auto jobs = worker->getJobList();
+        std::unique_lock<std::mutex> lk(statsLk);
+        for(size_t i=0; i<jobs->getJobCount(); i++) {
+            auto job = jobs->getJob(i);
+            if(job->getState() == JobState::finished) {
+                s++;
+            } else if(job->getState() == JobState::failed) {
+                f++;
+            } else {
+                printf("job %08X: invalid state %d\n", job->getJobId(), (int)job->getState());
+            }
+            r += job->getSendCounter() - 1;
+        }
+        
+    });
+    {
+        auto jobs = worker->getJobList();
+        for(size_t i=0; i<jobs->getJobCount(); i++) {
+            auto job = jobs->getJob(i);
+            static int num=0;
+            job->setPayload(0, num);
+            job->setPayload(1, num);
+            job->setPayload(2, num);
+            job->setPayload(3, num++);
+            job->setReady();
+        }
+    }
+
+    worker->startAsync();
+}
 
 int main(void)
 {
     puts("This is a shared library test...");
 
-
-    JobData x(4);
-    printf("%08X\n", x.getPreamble());
-    return 0;
     
-    connectionManager.addFPGA("192.168.1.10", 1234, true);
+    connectionManager.addFPGA("192.168.1.32", 1234);
 
-    connectionManager.start();
+    //connectionManager.setSendDelay(microseconds(0));
 
-    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, 1));
-    for(size_t i=0; i<1; i++)
-        jobs->getJob(i)->setReady();
+    connectionManager.start();
 
-    connectionManager.sendJobListSync(jobs);
+    for(int i=0; i<7; i++)
+        work();
+    
+    for(size_t i=0; i<connectionManager.getWorkerCount(); i++) {
+        connectionManager.getWorker(i)->waitUntilDone();
+    }
 
+    std::unique_lock<std::mutex> lk(statsLk);
+    printf("failed: %lu, successful: %lu, retries: %lu\n", f, s, r);
     return 0;
 }