Kaynağa Gözat

added tests

subDesTagesMitExtraKaese 4 yıl önce
ebeveyn
işleme
ad4f8460e2

BIN
build/op_lib.so


+ 0 - 2
include/conv2D.hpp

@@ -44,8 +44,6 @@ namespace tf_lib {
       int sizeWithBorder = width + 2*border;
       int pixels = sizeWithBorder * sizeWithBorder;
 
-      void fpgaCall(const Tensor *input, const Tensor *kernel, Tensor *output, int sample, int channel, int filter);
-      void delayThread(DoneCallback done);
 
     //TF_DISALLOW_COPY_AND_ASSIGN(Conv2DOp);
   };

+ 2 - 4
lib/mlfpga/include/commFPGA.hpp

@@ -31,7 +31,7 @@
 #define UDP_MTU (1500) // size of recv UDP buffer in bytes
 #define JOB_COUNT (1024 * 4)
 
-//#define DEBUG_JOB_RESP
+#define DEBUG_JOB_RESP
 
 
 typedef std::chrono::high_resolution_clock Clock;
@@ -59,10 +59,9 @@ class commFPGA {
     //called by worker thread
     
     int assignJob(std::shared_ptr<Job> &job);
-    int fillBuffer(JobData *sendBuf);
     int unassignJob(std::shared_ptr<Job> &job);
 
-    uint_least32_t jobCount();
+    size_t jobCount();
     
     //called by send thread
     int sendRaw(uint8_t *buf, uint bufLen);
@@ -89,7 +88,6 @@ class commFPGA {
 
     //list of pending responses
     std::unordered_map<uint32_t,std::shared_ptr<Job>> jobList;
-    uint_least32_t jobsActive = 0;
     std::mutex jobLock;
 
     //listener for a single FPGA

+ 1 - 1
lib/mlfpga/include/connectionManager.hpp

@@ -36,7 +36,7 @@ class ConnectionManager {
     ConnectionManager();
     ~ConnectionManager();
 
-    void addFPGA(const char* ip, const uint port);
+    void addFPGA(const char* ip, const uint port, bool bindSelf=false);
 
     void start();
 

+ 14 - 3
lib/mlfpga/include/job.hpp

@@ -104,19 +104,30 @@ class Job : public JobData {
     JobState getState() const {return state;}
     void setState(JobState s) {state = s;}
 
-    void isComplete();
+    void setReady();
+    void setSent();
+    void setReceived(const bool success);
     void setDoneCallback(DoneCallback cb);
 
+    Clock::time_point getSent() const {return sent;}
+    Clock::time_point getReceived() const {return received;}
+
+    void* getAssignedFPGA() const {return assignedFPGA;}
+    void setAssignedFPGA(void *fpga) {assignedFPGA = fpga;}
+
+    size_t getSendCounter() const {return sendCounter;}
+
   private:
     //only payload and CRC of response
     WordBuffer recvBuf;
     DoneCallback doneCb = NULL;
 
     JobState state = JobState::initialized;
-    Clock::time_point created = Clock::now();
+    Clock::time_point sent;
     Clock::time_point received;
 
-    
+    void *assignedFPGA = NULL;
+    size_t sendCounter = 0;
 
 };
 

+ 3 - 1
lib/mlfpga/include/jobList.hpp

@@ -11,6 +11,7 @@ class JobList {
   public:
     JobList(Module mod, size_t numberOfJobs);
     void waitAll();
+    void waitOne(microseconds us);
     void finishJob();
 
     void setDoneCallback(DoneCallback cb);
@@ -19,7 +20,8 @@ class JobList {
 
     std::shared_ptr<Job>& getJob(size_t i);
 
-    std::shared_ptr<Job> getNextJob();
+    std::shared_ptr<Job>& getNextJob();
+    size_t getJobCount() const {return jobCount;}
   private:
     std::vector<std::shared_ptr<Job>> jobs;
     DoneCallback doneCb;

+ 3 - 1
lib/mlfpga/include/worker.hpp

@@ -14,7 +14,8 @@ class Worker {
     Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas);
     ~Worker();
 
-    void start();
+    void startAsync();
+    void startSync();
     
     int assignJobList(std::shared_ptr<JobList> &jobList);
 
@@ -24,6 +25,7 @@ class Worker {
     std::vector<std::unique_ptr<commFPGA>> *fpgaVector;
 
     commFPGA* findAvailableFPGA();
+    void sendJob(std::shared_ptr<Job> &job);
     
 
     std::future<int> result;

Dosya farkı çok büyük olduğundan ihmal edildi
+ 0 - 30
lib/mlfpga/makefile


+ 43 - 33
lib/mlfpga/src/commFPGA.cpp

@@ -48,7 +48,7 @@ void commFPGA::recvUDP() {
 }
 
 int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
-  jobLock.lock();
+  std::lock_guard<std::mutex> lk(jobLock);
 
   for(size_t i=0; i < bufLen; i++) {
     
@@ -57,6 +57,9 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
         if(buf[i] == PREAMBLE) {
           recvState = RecvState::checkJobId;
         }
+        #ifdef DEBUG_JOB_RESP
+          else printf("wrong preamble %08X\n", buf[i]);
+        #endif
         break;
 
       case RecvState::checkJobId: 
@@ -64,6 +67,9 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
         if(currentJob == jobList.end()) {
           i -= 1;
           recvState = RecvState::checkPreamble;
+          #ifdef DEBUG_JOB_RESP
+            printf("job %08X jobId not found\n", buf[i]);
+          #endif
         } else if(currentJob->second->getState() != JobState::sent) {
           #ifdef DEBUG_JOB_RESP
             printf("job %08X wasn't sent\n", buf[i]);
@@ -71,9 +77,7 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
           i -= 1;
           recvState = RecvState::checkPreamble;
         } else {
-          #ifdef DEBUG_JOB_RESP
-            printf("job %08X jobId not found\n", buf[i]);
-          #endif
+          assert(currentJob->second->getAssignedFPGA() == this);
           recvState = RecvState::checkModuleId;
         }
         break;
@@ -87,7 +91,7 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
           i = i - 2 < 0 ? -1 : i - 2;
           recvState = RecvState::checkPreamble;
           #ifdef DEBUG_JOB_RESP
-            printf("job %08X wrong moduleId %08X\n", *currentJobResp->jobId, word);
+            printf("job %08X wrong moduleId %08X\n", currentJob->second->getJobId(), buf[i]);
           #endif
         }
         break;
@@ -95,17 +99,16 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
         currentJob->second->setResponsePayload(recvPayloadIndex++, buf[i]);
         if(recvPayloadIndex >= currentJob->second->getResponseBufferWordCount()) {
           if(currentJob->second->checkCRC()) {
-            currentJob->second->setState(JobState::finished);
-            currentJob->second->isComplete();
+            currentJob->second->setReceived(true);
             jobList.erase(currentJob->second->getJobId());
           } else {
             currentJob->second->setState(JobState::sent);
             #ifdef DEBUG_JOB_RESP
-              printf("job %08X wrong crc %08X, %4d, %4d\n", *currentJobResp->jobId, word, bufLen, i);
-              for(uint_least32_t k=0; k<currentJobResp->wordCount; k++) {
-                printf(" %4d %08X", k, currentJobResp->words[k]);
+              printf("job %08X wrong crc %08X, %4d, %4d\n", currentJob->second->getJobId(), buf[i], bufLen, i);
+              for(uint_least32_t k=0; k<currentJob->second->getWordCount(); k++) {
+                printf(" %4d %08X", k, currentJob->second->getWord(k));
               }
-              cout << endl;
+              std::cout << std::endl;
             #endif
           }
           recvState = RecvState::checkPreamble;
@@ -114,8 +117,6 @@ int commFPGA::parseRaw(uint32_t *buf, size_t bufLen) {
     }
   }
 
-  jobLock.unlock();
-
   return 0;
 }
 
@@ -195,16 +196,38 @@ int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
 }
 
 int commFPGA::assignJob(std::shared_ptr<Job> &job) {
-  jobLock.lock();
+
+  if(job->getAssignedFPGA() != NULL)
+    return -1;
+
+  std::lock_guard<std::mutex> lk(jobLock);
   if(jobList.size() >= JOB_COUNT)
     return -1;
   
+  uint_least32_t free = (sendBufferReadIndex - sendBufferWriteIndex) % MAX_JOB_LEN;
+  if(free < job->getWordCount() && free != 0)
+    return -1;
+
   jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job));
-  
-  jobsActive++;
-  jobLock.unlock();
+  job->setAssignedFPGA(this);
+
+  for(uint_least32_t i=0; i<job->getWordCount(); i++) {
+    sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(job->getWord(i));
+    printf("%08X ", job->getWord(i));
+  }
+  printf("\n");
+  sendBufferWriteIndex = (sendBufferWriteIndex + job->getWordCount()) % MAX_JOB_LEN;
+
   return 0;
 }
+int commFPGA::unassignJob(std::shared_ptr<Job> &job) {
+  if(job->getAssignedFPGA() != this)
+    return -1;
+  
+  std::lock_guard<std::mutex> lk(jobLock);
+  job->setAssignedFPGA(NULL);
+  return jobList.erase(job->getJobId());
+}
 
 int commFPGA::sendFromBuffer() {
   uint_least32_t avail = (sendBufferWriteIndex - sendBufferReadIndex) % MAX_JOB_LEN;
@@ -226,7 +249,7 @@ int commFPGA::sendFromBuffer() {
 
   int rc = sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], (readEnd - sendBufferReadIndex) * 4);
 
-  
+
   if(readEnd < MAX_JOB_LEN)
     sendBufferReadIndex = readEnd;
   else
@@ -235,19 +258,6 @@ int commFPGA::sendFromBuffer() {
   return rc;
 }
 
-int commFPGA::fillBuffer(JobData *jobData) {
-  uint_least32_t free = (sendBufferReadIndex - sendBufferWriteIndex) % MAX_JOB_LEN;
-  //printf("free %8d %8d %8d\n", free, sendBufferReadIndex, sendBufferWriteIndex);
-  if(free < jobData->getWordCount() && free != 0)
-    return -1;
-
-  for(uint_least32_t i=0; i<jobData->getWordCount(); i++) {
-    sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(jobData->getWord(i));
-  }
-  sendBufferWriteIndex = (sendBufferWriteIndex + jobData->getWordCount()) % MAX_JOB_LEN;
-  return 0;
-}
-
-uint_least32_t commFPGA::jobCount() {
-  return jobsActive;
+size_t commFPGA::jobCount() {
+  return jobList.size();
 }

+ 8 - 3
lib/mlfpga/src/connectionManager.cpp

@@ -5,10 +5,9 @@ ConnectionManager::ConnectionManager() {
 }
 ConnectionManager::~ConnectionManager() {
   running = false;
-  sendResult.get();
 }
 
-void ConnectionManager::addFPGA(const char* ip, const uint port) {
+void ConnectionManager::addFPGA(const char* ip, const uint port, bool bindSelf) {
   fpgas.emplace_back(new commFPGA(ip, port));
   fpgas.back()->start();
 }
@@ -16,7 +15,13 @@ void ConnectionManager::addFPGA(const char* ip, const uint port) {
 int ConnectionManager::sendJobListAsync(std::shared_ptr<JobList> &jobList) {
   workers.emplace_back(new Worker(&fpgas));
   workers.back()->assignJobList(jobList);
-  workers.back()->start();
+  workers.back()->startAsync();
+  return 0;
+}
+int ConnectionManager::sendJobListSync(std::shared_ptr<JobList> &jobList) {
+  workers.emplace_back(new Worker(&fpgas));
+  workers.back()->assignJobList(jobList);
+  workers.back()->startSync();
   return 0;
 }
 

+ 13 - 1
lib/mlfpga/src/job.cpp

@@ -41,8 +41,20 @@ void Job::setDoneCallback(DoneCallback cb) {
   doneCb = cb;
 }
 
-void Job::isComplete() {
+void Job::setReady() {
+  calcCRC();
+  setState(JobState::ready);
+}
+
+void Job::setReceived(const bool success) {
+  setState(success ? JobState::finished : JobState::failed);
   received = Clock::now();
   if(doneCb)
     doneCb();
+}
+
+void Job::setSent() {
+  setState(JobState::sent);
+  sendCounter++;
+  sent = Clock::now();
 }

+ 8 - 9
lib/mlfpga/src/jobList.cpp

@@ -17,6 +17,11 @@ void JobList::waitAll() {
   jobListDone.wait(lk, [this]{return pendingJobCount <= 0;});
 }
 
+void JobList::waitOne(microseconds us) {
+  std::unique_lock<std::mutex> lk(pendingJobCount_m);
+  jobListDone.wait_for(lk, us);
+}
+
 void JobList::finishJob() {
   std::lock_guard<std::mutex> lk(pendingJobCount_m);
   pendingJobCount--;
@@ -27,15 +32,9 @@ std::shared_ptr<Job>& JobList::getJob(size_t i) {
   return jobs.at(i);
 }
 
-std::shared_ptr<Job> JobList::getNextJob() {
-  for(size_t i=0; i<jobCount; i++) {
-    size_t rotated_i = (i+nextJobIndex+1) % jobCount;
-    if(jobs.at(rotated_i)->getState() == JobState::ready) {
-      nextJobIndex = rotated_i;
-      return jobs.at(rotated_i);
-    }
-  }
-  return NULL;
+std::shared_ptr<Job>& JobList::getNextJob() {
+  nextJobIndex = (nextJobIndex+1) % jobCount;
+  return jobs.at(nextJobIndex);
 }
 
 void JobList::setDoneCallback(DoneCallback cb) {

+ 55 - 8
lib/mlfpga/src/worker.cpp

@@ -7,9 +7,12 @@ Worker::~Worker() {
   hasJobList.notify_all();
 }
 
-void Worker::start() {
+void Worker::startAsync() {
   result = std::async(std::launch::async, &Worker::threadMain, this);
 }
+void Worker::startSync() {
+  threadMain();
+}
 
 int Worker::assignJobList(std::shared_ptr<JobList> &jobList) {
   std::lock_guard<std::mutex> lk(currentJobList_m);
@@ -26,18 +29,62 @@ int Worker::threadMain() {
   if(currentJobList == NULL)
     return -1;
 
-  while(currentJobList->getPendingJobCount() > 0) {
-    std::shared_ptr<Job> job = currentJobList->getNextJob();
-    if(job == NULL) {
+  while(true) {
+    size_t remainingJobs = currentJobList->getJobCount();
+    Clock::time_point now = Clock::now(); 
+    commFPGA *fpga;
+    
+    for(size_t i=0; i<currentJobList->getJobCount(); i++) {
+      std::shared_ptr<Job> &job = currentJobList->getJob(i);
+      switch(job->getState()) {
+        case JobState::initialized:
+
+          break;
+        case JobState::ready:
+          sendJob(job);
+          break;
+        case JobState::sent:
+          if(std::chrono::duration_cast<microseconds>(now - job->getSent()).count() > 1000) {
+            fpga = (commFPGA*)job->getAssignedFPGA();
+            if(fpga != NULL) {
+              fpga->unassignJob(job);
+            }
+            if(job->getSendCounter() < 5) {
+              job->setState(JobState::ready);
+              sendJob(job);
+            } else {
+              job->setState(JobState::failed);
+              job->setReceived(false);
+            }
+          }
+          break;
+        case JobState::receiving:
+
+          break;
+        case JobState::finished:
+          remainingJobs--;
+          break;
+        case JobState::failed:
+          remainingJobs--;
+          break;
+      }
+    }
+    if(remainingJobs <= 0) {
       break;
     }
+    currentJobList->waitOne(microseconds(1000));
+  }
+  return 0;
+}
+
+void Worker::sendJob(std::shared_ptr<Job> &job) {
     commFPGA *fpga = findAvailableFPGA();
     if(fpga == NULL) {
-      continue;
+      return;
+    }
+    if(fpga->assignJob(job) >= 0) {
+      job->setSent();
     }
-    fpga->assignJob(job);
-  }
-  return 0;
 }
 
 commFPGA* Worker::findAvailableFPGA() {

+ 13 - 3
makefile

@@ -3,9 +3,10 @@ CXX=/usr/bin/g++
 SRC_DIR=src
 INC_DIR=include
 BUILD_DIR=build
+TESTS_DIR=tests
 FPGA_LIB_DIR=lib/mlfpga
 
-CFLAGS=-g -Wall -std=c++11
+CFLAGS=-g -Wall -pthread -std=c++11
 LFLAGS=-shared -Wl,--no-as-needed,-Map=$(BUILD_DIR)/project.map
 
 
@@ -18,7 +19,11 @@ OBJS=$(patsubst $(SRC_DIR)/%.cpp,$(BUILD_DIR)/%.o,$(SRCS))
 FPGA_LIB_SRCS=$(wildcard $(FPGA_LIB_DIR)/$(SRC_DIR)/*.cpp)
 FPGA_LIB_OBJS=$(patsubst $(FPGA_LIB_DIR)/$(SRC_DIR)/%.cpp,$(BUILD_DIR)/%.o,$(FPGA_LIB_SRCS))
 
+TESTS_SRCS=$(wildcard $(TESTS_DIR)/*.cpp)
+FPGA_LIB_TESTS_SRCS=$(wildcard $(FPGA_LIB_DIR)/$(TESTS_DIR)/*.cpp)
+
 EXECUTABLE=op_lib.so
+TEST_EXEC=test
 
 all: config $(BUILD_DIR)/$(EXECUTABLE)
 
@@ -34,7 +39,12 @@ $(OBJS): $(BUILD_DIR)/%.o : $(SRC_DIR)/%.cpp $(INC_DIR)/%.hpp
 $(FPGA_LIB_OBJS): $(BUILD_DIR)/%.o : $(FPGA_LIB_DIR)/$(SRC_DIR)/%.cpp $(FPGA_LIB_DIR)/$(INC_DIR)/%.hpp
 	$(CXX) $(CFLAGS) -fPIC -c -I$(FPGA_LIB_DIR)/$(INC_DIR) -o $@ $<
 
-tf_cflags:
+test: config $(BUILD_DIR)/$(EXECUTABLE) $(BUILD_DIR)/$(TEST_EXEC)
+
+$(BUILD_DIR)/$(TEST_EXEC): $(TESTS_SRCS) $(FPGA_LIB_OBJS)
+	$(CXX) $(CFLAGS) -I$(FPGA_LIB_DIR)/$(INC_DIR) -o $@ $^
+
+
 
 clean:
-	rm -f $(BUILD_DIR)/*.o $(BUILD_DIR)/$(EXECUTABLE)
+	rm -f $(BUILD_DIR)/*.o $(BUILD_DIR)/$(EXECUTABLE) $(BUILD_DIR)/$(TEST_EXEC)

+ 1 - 1
src/conv2D.cpp

@@ -54,7 +54,7 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 4>();
     auto output_tensor = output->tensor<int32, 4>();
 
-    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, batchSize * channels * filters));
+    std::shared_ptr<JobList> jobs(new JobList(Module::conv2D_5x5_Module, batchSize * channels * filters));
 
     for(int sample=0; sample<batchSize; sample++) {
       for(int channel=0; channel<channels; channel++) {

+ 18 - 0
tests/echo.py

@@ -0,0 +1,18 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import socket
+
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+server_address = '0.0.0.0'
+server_port = 1234
+
+server = (server_address, server_port)
+sock.bind(server)
+print("Listening on " + server_address + ":" + str(server_port))
+
+while True:
+	payload, client_address = sock.recvfrom(1)
+	print("Echoing data back to " + str(client_address))
+	sent = sock.sendto(payload, client_address)

+ 27 - 0
tests/main.cpp

@@ -0,0 +1,27 @@
+#include <stdio.h>
+#include "connectionManager.hpp"
+
+ConnectionManager connectionManager;
+
+
+int main(void)
+{
+    puts("This is a shared library test...");
+
+
+    JobData x(4);
+    printf("%08X\n", x.getPreamble());
+    return 0;
+    
+    connectionManager.addFPGA("192.168.1.10", 1234, true);
+
+    connectionManager.start();
+
+    std::shared_ptr<JobList> jobs(new JobList(Module::dummyModule, 1));
+    for(size_t i=0; i<1; i++)
+        jobs->getJob(i)->setReady();
+
+    connectionManager.sendJobListSync(jobs);
+
+    return 0;
+}