瀏覽代碼

added dummyBigTest

subDesTagesMitExtraKaese 4 年之前
父節點
當前提交
e69614cfbb

二進制
build/op_lib.so


+ 5 - 0
examples/identity.py

@@ -0,0 +1,5 @@
+import tensorflow as tf
+import tensorflow.keras as keras
+from tensorflow.keras import layers
+
+from tensorflow.keras.utils import plot_model, to_categorical

+ 36 - 0
include/dummyBigOp.hpp

@@ -0,0 +1,36 @@
+#ifndef DUMMY_BIG_OP_FPGA_H
+#define DUMMY_BIG_OP_FPGA_H
+
+#include "tensorflow/core/framework/op_kernel.h"
+#include "tensorflow/core/framework/function.h"
+#include <stdlib.h>
+
+#include <iostream>
+#include <string>
+#include <chrono>
+#include <thread>
+#include <future>
+#include <mutex>
+#include <condition_variable>
+
+#include "../lib/mlfpga/include/connectionManager.hpp"
+#include "../lib/mlfpga/include/modules.hpp"
+
+#include "entrypoint.hpp"
+
+namespace tf_lib {
+  using namespace tensorflow;
+  using namespace std::chrono;
+
+  class DummyBigOp : public AsyncOpKernel {
+    public:
+      explicit DummyBigOp(OpKernelConstruction* context);
+      void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
+
+    private:
+      const int dataLength = 1024;
+      int tagCounter = 0;
+
+  };
+}
+#endif

+ 0 - 1
include/dummyOp.hpp

@@ -28,7 +28,6 @@ namespace tf_lib {
       void ComputeAsync(OpKernelContext* context, DoneCallback done) override;
 
     private:
-      void fpgaCall(const Tensor *input, Tensor *output, DoneCallback done);
 
       const int dataLength = 4;
       int tagCounter = 0;

+ 1 - 0
include/entrypoint.hpp

@@ -10,6 +10,7 @@
 
 #include "conv2D.hpp"
 #include "dummyOp.hpp"
+#include "dummyBigOp.hpp"
 #include "../lib/mlfpga/include/connectionManager.hpp"
 
 namespace tf_lib {

+ 1 - 1
layers/__init__.py

@@ -1,2 +1,2 @@
 
-__all__ = ["conv2D"]
+__all__ = ["conv2D", "dummy"]

+ 6 - 3
lib/mlfpga/include/connectionManager.hpp

@@ -36,16 +36,19 @@ class ConnectionManager {
     ConnectionManager();
     ~ConnectionManager();
 
-    void addFPGA(const char* ip, const uint port, bool bindSelf=false);
+    void startFromTensorflow();
 
+    void addFPGA(const char* ip, const uint port, bool bindSelf=false);
     void start();
 
-    Worker* createWorker(Module mod, size_t numberOfJobs);
+    Worker* createWorker(Module mod, size_t numberOfJobs = 1);
     Worker* getWorker(size_t i) const {return &(*workers.at(i));}
     size_t getWorkerCount() const {return workers.size();}
 
     void setSendDelay(microseconds us) {sendDelay = us;}
 
+    bool isRunning() const {return running;}
+
   private:
     std::vector<std::unique_ptr<commFPGA>> fpgas;
     std::vector<std::unique_ptr<Worker>> workers;
@@ -53,7 +56,7 @@ class ConnectionManager {
     void sendThread();
     std::future<void> sendResult;
 
-    bool running = true;
+    bool running = false;
     microseconds sendDelay = microseconds(50);
 };
 

+ 3 - 0
lib/mlfpga/include/job.hpp

@@ -74,6 +74,8 @@ class JobData : public WordBuffer {
     uint32_t getPayload(size_t i) const {return getWord(i+3);}
     void setPayload(size_t i, uint32_t v) const {setWord(i+3, v);}
 
+    size_t getPayloadSize() const {return getWordCount()-4;}
+
     uint32_t getCRC() const {return getWord(getWordCount()-1);}
     void setCRC(uint32_t v) const {setWord(getWordCount()-1, v);}
 };
@@ -87,6 +89,7 @@ class Job : public JobData {
 
     uint32_t getResponsePayload(size_t i) const {return recvBuf.getWord(i);}
     void setResponsePayload(size_t i, uint32_t v) const {recvBuf.setWord(i, v);}
+    size_t getResponsePayloadSize() const {return recvBuf.getWordCount()-1;}
     uint32_t* getResponseAddr() const {return recvBuf.getWordAddr();}
     size_t getResponseBufferWordCount() const {return recvBuf.getWordCount();}
 

+ 17 - 0
lib/mlfpga/src/connectionManager.cpp

@@ -18,7 +18,24 @@ Worker* ConnectionManager::createWorker(Module mod, size_t numberOfJobs) {
   return w;
 }
 
+void ConnectionManager::startFromTensorflow() {
+  if(isRunning())
+    return;
+
+  addFPGA("localhost", 1234);
+  start();
+
+  printf("fpga server started\n");
+  /*
+  cm.addFPGA("192.168.88.32", 1234);
+  cm.addFPGA("192.168.88.33", 1234);
+  cm.addFPGA("192.168.88.34", 1234);
+  cm.addFPGA("192.168.88.35", 1234);
+  */
+}
+
 void ConnectionManager::start() {
+  running = true;
   sendResult = std::async(std::launch::async, &ConnectionManager::sendThread, this);
 }
 

+ 1 - 1
lib/mlfpga/src/worker.cpp

@@ -32,7 +32,7 @@ int Worker::threadMain() {
         auto job = currentJobList->getJob(i);
         switch(job->getState()) {
           case JobState::initialized:
-
+            throw("worker can't send job that is not ready");
             break;
           case JobState::ready:
             fpga = findAvailableFPGA();

+ 15 - 2
src/conv2D.cpp

@@ -14,6 +14,7 @@ namespace tf_lib {
   };
 
   void Conv2DOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
+    connectionManager.startFromTensorflow();
     // Input tensor is of the following dimensions:
     // [ batch, in_rows, in_cols, in_depth ]
     const Tensor& input = context->input(0);
@@ -67,13 +68,25 @@ namespace tf_lib {
                 job->setPayload(x*outputSize + y, input_tensor(sample, x, y, channel));
               }
             }
+            job->setReady();
           }
         }
       }
     }
-    worker->setDoneCallback([output_tensor, worker, done]{
+    worker->setDoneCallback([output_tensor, worker, done, batchSize, channels, filters, this]{
       auto jobs = worker->getJobList();
-      output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
+      for(int sample=0; sample<batchSize; sample++) {
+        for(int channel=0; channel<channels; channel++) {
+          for(int filter=0; filter<filters; filter++) {
+            auto job = jobs->getJob(sample * channels * filters + channel * filters + filter);
+            for(int x=0; x<outputSize; x++) {
+              for(int y=0; y<outputSize; y++) {
+                output_tensor(sample, x, y, channel) = job->getPayload(x*outputSize + y);
+              }
+            }
+          }
+        }
+      }
       done();
     });
 

+ 50 - 0
src/dummyBigOp.cpp

@@ -0,0 +1,50 @@
+
+#include "dummyBigOp.hpp"
+
+namespace tf_lib {
+  DummyBigOp::DummyBigOp(OpKernelConstruction* context) : AsyncOpKernel(context) {
+
+  };
+
+  void DummyBigOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
+    connectionManager.startFromTensorflow();
+    // Input tensor is of the following dimensions:
+    // [ batch, in_rows, in_cols, in_depth ]
+    const Tensor& input = context->input(0);
+
+    TensorShape input_shape = input.shape();
+
+    TensorShape output_shape;
+    const int32 dims[] = {dataLength};
+    TensorShapeUtils::MakeShape(dims, 1, &output_shape);
+
+    output_shape.set_dim(0, dims[0]);
+
+    Tensor* output = nullptr;
+    OP_REQUIRES_OK(context, context->allocate_output(0, output_shape, &output));
+
+    auto input_tensor = input.tensor<int32, 1>();
+    auto output_tensor = output->tensor<int32, 1>();
+
+    auto worker = connectionManager.createWorker(Module::dummyBigModule);
+    worker->setJobTimeout(milliseconds(100));
+    worker->setRetryCount(10);
+    {
+      auto job = worker->getJobList()->getJob(0);
+
+      for(size_t i=0; i<job->getPayloadSize(); i++) {
+        job->setPayload(i, input_tensor(i));
+        job->setReady();
+      }
+    }
+    worker->setDoneCallback([output_tensor, worker, done]{
+      auto job = worker->getJobList()->getJob(0);
+      for(size_t i=0; i<job->getResponsePayloadSize(); i++) {
+        output_tensor(i) = job->getResponsePayload(i);
+      }
+      done();
+    });
+
+    worker->startAsync();
+  }
+}

+ 15 - 6
src/dummyOp.cpp

@@ -7,6 +7,7 @@ namespace tf_lib {
   };
 
   void DummyOp::ComputeAsync(OpKernelContext* context, DoneCallback done) {
+    connectionManager.startFromTensorflow();
     // Input tensor is of the following dimensions:
     // [ batch, in_rows, in_cols, in_depth ]
     const Tensor& input = context->input(0);
@@ -27,17 +28,25 @@ namespace tf_lib {
     auto input_tensor = input.tensor<int32, 1>();
     auto output_tensor = output->tensor<int32, 1>();
 
-    auto worker = connectionManager.createWorker(Module::dummyModule, 1);
+    auto worker = connectionManager.createWorker(Module::dummyModule);
+    worker->setJobTimeout(milliseconds(100));
+    worker->setRetryCount(10);
     {
-      auto jobs = worker->getJobList();
-      jobs->getJob(0)->setPayload(0, input_tensor(0));
+      auto job = worker->getJobList()->getJob(0);
+
+      for(size_t i=0; i<job->getPayloadSize(); i++) {
+        job->setPayload(i, input_tensor(i));
+        job->setReady();
+      }
     }
     worker->setDoneCallback([output_tensor, worker, done]{
-      auto jobs = worker->getJobList();
-      output_tensor(0) = jobs->getJob(0)->getResponsePayload(0);
+      auto job = worker->getJobList()->getJob(0);
+      for(size_t i=0; i<job->getResponsePayloadSize(); i++) {
+        output_tensor(i) = job->getResponsePayload(i);
+      }
       done();
     });
 
-    worker->startSync();
+    worker->startAsync();
   }
 }

+ 12 - 14
src/entrypoint.cpp

@@ -129,23 +129,21 @@ namespace tf_lib {
 
   REGISTER_KERNEL_BUILDER(Name("MyDummy").Device(DEVICE_CPU), DummyOp);
 
+  REGISTER_OP("MyDummyBig")
+    .Input("input: int32")
+    .Output("output: int32")
+    .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) {
+      c->set_output(0, c->input(0));
+      return Status::OK();
+    });
+  ;
+
+  REGISTER_KERNEL_BUILDER(Name("MyDummyBig").Device(DEVICE_CPU), DummyBigOp);
+
   ConnectionManager connectionManager;
 
   void __attribute__ ((constructor)) init(void) {
-    printf("starting fpga server\n");
-      
-    connectionManager.addFPGA("localhost", 1234);
-    connectionManager.addFPGA("localhost", 1234);
-    connectionManager.addFPGA("localhost", 1234);
-    connectionManager.addFPGA("localhost", 1234);
-    connectionManager.addFPGA("localhost", 1234);
-
-    /*
-    connectionManager.addFPGA("192.168.88.32", 1234);
-    connectionManager.addFPGA("192.168.88.33", 1234);
-    connectionManager.addFPGA("192.168.88.34", 1234);
-    connectionManager.addFPGA("192.168.88.35", 1234);
-    */
+    printf("fpga llibrary loaded\n");
   }
 
 }

+ 6 - 3
tests/main.cpp

@@ -7,9 +7,9 @@ size_t s=0, f=0, r=0;
 std::mutex statsLk;
 
 void work() {
-    auto worker = connectionManager.createWorker(Module::dummyModule, 1);
+    auto worker = connectionManager.createWorker(Module::dummyBigModule, 10000);
 
-    worker->setJobTimeout(microseconds(10000));
+    worker->setJobTimeout(milliseconds(100));
     worker->setRetryCount(10);
     worker->setDoneCallback([worker](){
         auto jobs = worker->getJobList();
@@ -48,13 +48,16 @@ int main(void)
     puts("This is a shared library test...");
 
     
+    connectionManager.addFPGA("192.168.1.32", 1234);
+    connectionManager.addFPGA("192.168.1.32", 1234);
+    connectionManager.addFPGA("192.168.1.32", 1234);
     connectionManager.addFPGA("192.168.1.32", 1234);
 
     //connectionManager.setSendDelay(microseconds(0));
 
     connectionManager.start();
 
-    for(int i=0; i<7; i++)
+    for(int i=0; i<8; i++)
         work();
     
     for(size_t i=0; i<connectionManager.getWorkerCount(); i++) {

+ 21 - 1
tests/op_test.py

@@ -11,10 +11,30 @@ from hostLib import load_op
 
 class FPGALibTest(tf.test.TestCase):
   def testDummyOp(self):
-    input = [0,1,2,3]
+    input = [1337, 42, 2**31-1, -1]
     with self.session():
       result = load_op.op_lib.MyDummy(input=input)
       self.assertAllEqual(result, input)
 
+  def testDummyOp100(self):
+    with self.session():
+      for i in range(100):
+        input = [i+1, 42, 2**31-1, -1]
+        result = load_op.op_lib.MyDummy(input=input)
+        self.assertAllEqual(result, input)
+
+  def testDummyOpBig(self):
+    input = [i+1 for i in range(1024)]
+    with self.session():
+      result = load_op.op_lib.MyDummyBig(input=input)
+      self.assertAllEqual(result, input)
+
+  def testDummyOpBig100(self):
+    with self.session():
+      for i in range(100):
+        input = [i*100+k+1 for k in range(1024)]
+        result = load_op.op_lib.MyDummyBig(input=input)
+        self.assertAllEqual(result, input)
+
 if __name__ == "__main__":
   tf.test.main()