Ver código fonte

conv2d bug fixes

subDesTagesMitExtraKaese 4 anos atrás
pai
commit
b642639eb9

+ 71 - 0
examples/screengrab.py

@@ -0,0 +1,71 @@
+#!/usr/local/bin/python
+# -*- coding: utf-8 -*-
+
+import numpy as np
+import cv2
+from mss import mss
+from PIL import Image
+import threading
+import time
+from random import randint
+
+import tensorflow as tf
+from tensorflow.keras import layers, models
+
+import sys
+sys.path.append('..')
+from hostLib.layers.conv2D import Conv2D as Conv2DFPGA
+
+bounding_box = {'top': 0, 'left': 0, 'width': 2560, 'height': 1440}
+width, height = 228, 228
+
+sct = mss()
+stop = 0
+
+a = layers.Input(dtype=tf.int32, shape=(width, height, 3))
+z = Conv2DFPGA(1)(a)
+model = models.Model(inputs=a, outputs=z)
+
+
+#model.compile(loss=tf.keras.losses.categorical_crossentropy,
+#              optimizer=tf.keras.optimizers.Adadelta(),
+#              metrics=['accuracy'])
+
+sct_img = sct.grab(bounding_box)
+np_img = np.array(sct_img)
+resized_image = cv2.resize(np_img, (width, height))
+resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGRA2BGR)
+while True:
+  cv2.ellipse(
+    resized_image, 
+    (randint(0,width), randint(0,height)), 
+    (randint(0,width/2), randint(0,height/2)), 
+    randint(0,360), 
+    0, 
+    360, 
+    [randint(0,256), randint(0,256), randint(0,256)],
+    10
+  )
+  img32 = tf.cast(resized_image, tf.int32)
+  #img32 = np.expand_dims(img32, axis=2)
+
+  cv2.imshow('screen', resized_image)
+  x,y,w,h = cv2.getWindowImageRect('screen')
+  batch = np.expand_dims(img32, axis=0)
+  batch = tf.tile(batch, [2,1,1,1])
+
+
+  predictions = model.predict(batch)
+
+
+  pred8 = tf.cast(predictions / 256, tf.uint8)
+  for i in range(pred8.shape[0]):
+    name = 'conv_{}'.format(i)
+    cv2.imshow(name, pred8.numpy()[i])
+    cv2.moveWindow(name, x+((i+1)*300)%1500, y+int((i+1)/5)*300)
+
+
+  if (cv2.waitKey(1) & 0xFF) == ord('x') or stop:
+    cv2.destroyAllWindows()
+    stop = True
+    break

+ 2 - 1
include/conv2D.hpp

@@ -35,7 +35,7 @@ namespace tf_lib {
       int instance = -1;
       int delay = 1000;
 
-      int outputSize = 28;
+      
       int tagCounter = 0;
 
       int width = 224;
@@ -43,6 +43,7 @@ namespace tf_lib {
       int border = kernel/2;
       int sizeWithBorder = width + 2*border;
       int pixels = sizeWithBorder * sizeWithBorder;
+      int outputSize = sizeWithBorder;
 
 
     //TF_DISALLOW_COPY_AND_ASSIGN(Conv2DOp);

+ 1 - 1
lib/mlfpga/include/commFPGA.hpp

@@ -31,7 +31,7 @@
 #define UDP_MTU (1500) // size of recv UDP buffer in bytes
 #define JOB_COUNT (1024 * 4 * 10) // max size of jobList
 
-#define MAX_JOB_LEN (256*256) // max word count of job
+#define MAX_JOB_LEN (256*256*2) // max word count of job
 
 //#define DEBUG_JOB_RESP
 //#define DEBUG_JOB_SEND

+ 1 - 1
lib/mlfpga/include/modules.hpp

@@ -9,7 +9,7 @@
   MOD_DEF( conv2D_2x11_Module, 0x9323eb24, "2D Konvolution 2x11", 224*224, 224*224 ),   \
   MOD_DEF( neuronModule, 0x03b30000, "Neuron", 21, 1 ), \
   MOD_DEF( dummyBigModule, 0x2cb31e7c, "Dummy 1024", 1024, 1024), \
-  MOD_DEF( conv2D_5x5_Module, 0x4cd2e19c, "2D Konvolution 5x5", 224*224, 224*224)
+  MOD_DEF( conv2D_5x5_Module, 0x4cd2e19c, "2D Konvolution 5x5", 228*228, 228*228)
 
 #define MOD_DEF( identifier, id, name, sendLen, recvLen )  identifier
 enum Module { MODS_DEF };

+ 6 - 6
lib/mlfpga/src/commFPGA.cpp

@@ -225,17 +225,17 @@ int commFPGA::assignJob(JobContainer &job) {
 
   std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
   if(!lk.owns_lock())
-    return -1;
+    return -2;
   
   if(jobList.size() >= JOB_COUNT)
-    return -1;
-
+    return -3;
+  
   std::lock_guard<std::mutex> slk(sendLock);
 
   uint_least32_t free = MAX_JOB_LEN - sendBufferAvailable;
 
   if(free < job->getWordCount())
-    return -1;
+    return -4;
 
   jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job.sharedPtr()));
   job->setAssignedFPGA(this);
@@ -253,7 +253,7 @@ int commFPGA::assignJob(JobContainer &job) {
   printf("\n");
   #endif
 
-  return 0;
+  return job->getWordCount();
 }
 int commFPGA::unassignJob(JobContainer &job) {
   if(job->getAssignedFPGA() != this)
@@ -261,7 +261,7 @@ int commFPGA::unassignJob(JobContainer &job) {
 
   std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
   if(!lk.owns_lock())
-    return -1;
+    return -2;
 
   if(job->getState() == JobState::receiving) {
     currentJob = NULL;

+ 7 - 9
lib/mlfpga/src/connectionManager.cpp

@@ -34,16 +34,13 @@ void ConnectionManager::startFromTensorflow() {
   if(isRunning())
     return;
 
-  addFPGA("localhost", 1234);
+  addFPGA("192.168.1.33", 1234);
+  addFPGA("192.168.1.34", 1234);
+  addFPGA("192.168.1.35", 1234);
+
   start();
 
   printf("fpga server started\n");
-  /*
-  cm.addFPGA("192.168.88.32", 1234);
-  cm.addFPGA("192.168.88.33", 1234);
-  cm.addFPGA("192.168.88.34", 1234);
-  cm.addFPGA("192.168.88.35", 1234);
-  */
 }
 
 void ConnectionManager::start() {
@@ -53,13 +50,14 @@ void ConnectionManager::start() {
 
 void ConnectionManager::sendThread() {
   pthread_setname_np(pthread_self(), "mlfpga send");
+  std::chrono::steady_clock::time_point start;
   while(running) {
-    Clock::time_point start = Clock::now();
+    start = std::chrono::steady_clock::now();
     for(std::vector<std::unique_ptr<commFPGA>>::iterator it=fpgas.begin(); it!=fpgas.end(); it++) {
       auto fpga = it->get();
       fpga->sendFromBuffer();
     }
-    auto elapsed = Clock::now() - start;
+    auto elapsed =std::chrono::steady_clock::now() - start;
     if(elapsed < sendDelay)
       std::this_thread::sleep_for(sendDelay - elapsed);
   }

+ 29 - 5
lib/mlfpga/src/worker.cpp

@@ -1,5 +1,7 @@
 #include "worker.hpp"
 
+//#define DEBUG_WORKER
+
 Worker::Worker(std::vector<std::unique_ptr<commFPGA>> *fpgas, Module mod, size_t numberOfJobs) : 
 jobList(std::piecewise_construct, std::make_tuple(), std::make_tuple(new JobList(mod, numberOfJobs))) {
   fpgaVector = fpgas;
@@ -24,9 +26,14 @@ int Worker::threadMain() {
   {
     size_t lastI = 0;
     auto currentJobList = getJobList();
+    int rc;
     while(running) {
       size_t remainingJobs = currentJobList->getJobCount();
-      Clock::time_point now = Clock::now(); 
+      Clock::time_point now = Clock::now();
+      #ifdef DEBUG_WORKER
+        Clock::time_point then;
+        size_t sentBytes = 0;
+      #endif
       commFPGA *fpga;
       
       for(size_t i=0; i<currentJobList->getJobCount(); i++) {
@@ -44,9 +51,17 @@ int Worker::threadMain() {
                 goto fullQueue;
                 break;
               }
-              if(fpga->assignJob(job) >= 0) {
+              rc = fpga->assignJob(job);
+              //printf("rc: %4d i: %4lu\n", rc, i);
+              if(rc >= 0) {
                 job->setSent();
-                //printf("job %08X: assigned\n", job->getJobId());
+                #ifdef DEBUG_WORKER
+                  sentBytes += job->getByteCount();
+                  printf("job %08X: \x1b[32massigned\x1b[0m   no.: %3lu\n", job->getJobId(), currentI);
+                #endif
+              } else if(rc == -4) {
+                lastI = currentI;
+                goto fullQueue;
               }
               break;
             case JobState::sent:
@@ -55,7 +70,9 @@ int Worker::threadMain() {
                 if(fpga != NULL) {
                   if(fpga->unassignJob(job) < 0)
                     break;
-                  //printf("job %08X: unassigned\n", job->getJobId());
+                  #ifdef DEBUG_WORKER
+                    printf("job %08X: \x1b[31munassigned\x1b[0m no.: %3lu\n", job->getJobId(), currentI);
+                  #endif
                 }
                 if(job->getSendCounter() < retryCount) {
                   job->setState(JobState::ready);
@@ -67,7 +84,10 @@ int Worker::threadMain() {
                   }
                   if(fpga->assignJob(job) >= 0) {
                     job->setSent();
-                    //printf("job %08X: reassigned\n", job->getJobId());
+                    #ifdef DEBUG_WORKER
+                      sentBytes += job->getByteCount();
+                      printf("job %08X: \x1b[33mreassigned\x1b[0m no.: %3lu\n", job->getJobId(), currentI);
+                    #endif
                   }
                 } else {
                   job->setState(JobState::failed);
@@ -92,6 +112,10 @@ int Worker::threadMain() {
       }
       fullQueue:
       currentJobList->waitOne(jobTimeout);
+      #ifdef DEBUG_WORKER
+        then = Clock::now();
+        printf("loop: %3ld ms sent: %5lu kB remaining: %lu\n", std::chrono::duration_cast<milliseconds>(then - now).count(), sentBytes/1024, remainingJobs);
+      #endif
     }
   }
   

+ 3 - 2
src/conv2D.cpp

@@ -57,6 +57,8 @@ namespace tf_lib {
 
     auto worker = connectionManager.createWorker(Module::conv2D_5x5_Module, batchSize * channels * filters);
     {
+      worker->setJobTimeout(milliseconds(300));
+      worker->setRetryCount(10);
       auto jobs = worker->getJobList();
 
       for(int sample=0; sample<batchSize; sample++) {
@@ -81,7 +83,7 @@ namespace tf_lib {
             auto job = jobs->getJob(sample * channels * filters + channel * filters + filter);
             for(int x=0; x<outputSize; x++) {
               for(int y=0; y<outputSize; y++) {
-                output_tensor(sample, x, y, channel) = job->getPayload(x*outputSize + y);
+                output_tensor(sample, x, y, channel) = job->getResponsePayload(x*outputSize + y);
               }
             }
           }
@@ -90,7 +92,6 @@ namespace tf_lib {
       done();
       connectionManager.removeFinishedWorkers();
     });
-
     worker->startAsync();
 
   }

+ 1 - 1
src/entrypoint.cpp

@@ -143,7 +143,7 @@ namespace tf_lib {
   ConnectionManager connectionManager;
 
   void __attribute__ ((constructor)) init(void) {
-    printf("fpga llibrary loaded\n");
+    printf("fpga library loaded\n");
   }
 
 }

+ 11 - 11
tests/main.cpp

@@ -3,11 +3,13 @@
 
 ConnectionManager connectionManager;
 
+Module mod = Module::dummyBigModule;
+
 size_t s=0, f=0, r=0;
 std::mutex statsLk;
 
 void work() {
-    auto worker = connectionManager.createWorker(Module::dummyBigModule, 1000);
+    auto worker = connectionManager.createWorker(mod, 1000);
 
     worker->setJobTimeout(milliseconds(1000));
     worker->setRetryCount(10);
@@ -48,28 +50,26 @@ int main(void)
     puts("This is a shared library test...");
 
     
-    connectionManager.addFPGA("192.168.1.32", 1234);
-    connectionManager.addFPGA("192.168.1.32", 1234);
-    connectionManager.addFPGA("192.168.1.32", 1234);
-    connectionManager.addFPGA("192.168.1.32", 1234);
+    connectionManager.addFPGA("192.168.1.33", 1234);
+    connectionManager.addFPGA("192.168.1.34", 1234);
+    connectionManager.addFPGA("192.168.1.35", 1234);
 
     connectionManager.setSendDelay(microseconds(50));
 
     connectionManager.start();
 
-    int workNum = 100;
+    int workNum = 10000;
+    int n=1;
     
     while(workNum > 0 || connectionManager.getWorkerCount() > 0) {
+        std::this_thread::sleep_for(milliseconds(300));
         connectionManager.removeFinishedWorkers();
         while(workNum > 0 && connectionManager.getWorkerCount() < 8) {
             workNum--;
             work();
         }
-        printf("work: %2d   worker: %2lu\n", workNum, connectionManager.getWorkerCount());
-        std::this_thread::sleep_for(milliseconds(300));
+        std::unique_lock<std::mutex> lk(statsLk);
+        printf("work: %2d   worker: %2lu failed: %12lu, successful: %12lu, retries: %12lu  %4lu MBit/s\n", workNum, connectionManager.getWorkerCount(), f, s, r, s*(moduleSendPayloadLength[mod]+4)*4*10*8/1024/1024/3/(n++));
     }
-
-    std::unique_lock<std::mutex> lk(statsLk);
-    printf("failed: %lu, successful: %lu, retries: %lu\n", f, s, r);
     return 0;
 }