commFPGA.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. #include "../include/commFPGA.hpp"
  2. int resolvehelper(const char* hostname, int family, const char* service, sockaddr_storage* pAddr)
  3. {
  4. int result;
  5. addrinfo* result_list = NULL;
  6. addrinfo hints = {};
  7. hints.ai_family = family;
  8. hints.ai_socktype = SOCK_DGRAM; // without this flag, getaddrinfo will return 3x the number of addresses (one for each socket type).
  9. result = getaddrinfo(hostname, service, &hints, &result_list);
  10. if (result == 0)
  11. {
  12. //ASSERT(result_list->ai_addrlen <= sizeof(sockaddr_in));
  13. memcpy(pAddr, result_list->ai_addr, result_list->ai_addrlen);
  14. freeaddrinfo(result_list);
  15. }
  16. return result;
  17. }
  18. // commFPGA class members
  19. void commFPGA::start() {
  20. recvResult = std::async(std::launch::async, &commFPGA::recvUDP, this);
  21. }
  22. void commFPGA::recvUDP() {
  23. pthread_setname_np(pthread_self(), "mlfpga recv");
  24. while(running) {
  25. int result = 0;
  26. uint32_t buf[UDP_MTU/4+1];
  27. uint slen = sizeof(addrDest);
  28. result = recvfrom(sock, (uint8_t*)buf, UDP_MTU, 0, (sockaddr*)&addrDest, &slen);
  29. if(result == -1)
  30. continue;
  31. result /= 4;
  32. for(int_least32_t i=0; i < result; i++) {
  33. buf[i] = __builtin_bswap32(buf[i]);
  34. }
  35. #ifdef DEBUG_JOB_RESP
  36. printf("recv ");
  37. for(int_least32_t i=0; i<result; i++)
  38. printf("%u: %08X ", i, buf[i]);
  39. printf(" %d\n", (int)recvState);
  40. #endif
  41. parseRaw(buf, result);
  42. }
  43. }
  44. int commFPGA::parseRaw(uint32_t *buf, int_least32_t bufLen) {
  45. std::unordered_map<uint32_t,std::shared_ptr<Job>>::iterator jobIt;
  46. JobContainer *jobLocked = NULL;
  47. std::lock_guard<std::mutex> lk(jobLock);
  48. if(currentJob != NULL)
  49. jobLocked = new JobContainer(currentJob);
  50. for(int_least32_t i=0; i < bufLen; i++) {
  51. switch(recvState) {
  52. case RecvState::checkPreamble:
  53. if(buf[i] == PREAMBLE) {
  54. recvState = RecvState::checkJobId;
  55. }
  56. #ifdef DEBUG_JOB_RESP
  57. else printf("wrong preamble %08X\n", buf[i]);
  58. #endif
  59. break;
  60. case RecvState::checkJobId:
  61. jobIt = jobList.find(buf[i]);
  62. if(jobIt == jobList.end()) {
  63. #ifdef DEBUG_JOB_RESP
  64. printf("job %08X jobId not found, %u\n", buf[i], i);
  65. #endif
  66. i -= 1;
  67. recvState = RecvState::checkPreamble;
  68. } else {
  69. currentJob = jobIt->second;
  70. //delete old lock
  71. if(jobLocked) delete jobLocked;
  72. //aquire lock
  73. jobLocked = new JobContainer(currentJob);
  74. if((*jobLocked)->getState() != JobState::sent) {
  75. #ifdef DEBUG_JOB_RESP
  76. printf("job %08X wasn't sent\n", buf[i]);
  77. #endif
  78. i -= 1;
  79. recvState = RecvState::checkPreamble;
  80. } else {
  81. assert((*jobLocked)->getAssignedFPGA() == this);
  82. recvState = RecvState::checkModuleId;
  83. }
  84. }
  85. break;
  86. case RecvState::checkModuleId:
  87. if((*jobLocked)->getModuleId() == buf[i]) {
  88. recvState = RecvState::writePayload;
  89. recvPayloadIndex = 0;
  90. (*jobLocked)->setState(JobState::sent);
  91. } else {
  92. #ifdef DEBUG_JOB_RESP
  93. printf("job %08X wrong moduleId %08X\n", (*jobLocked)->getJobId(), buf[i]);
  94. #endif
  95. i = i - 2 < 0 ? -1 : i - 2;
  96. recvState = RecvState::checkPreamble;
  97. }
  98. break;
  99. case RecvState::writePayload:
  100. (*jobLocked)->setResponsePayload(recvPayloadIndex++, buf[i]);
  101. if(recvPayloadIndex >= (*jobLocked)->getResponseBufferWordCount()) {
  102. if((*jobLocked)->checkCRC()) {
  103. (*jobLocked)->setReceived(true);
  104. #ifdef DEBUG_JOB_RESP
  105. printf("job %08X: success!\n", (*jobLocked)->getJobId());
  106. #endif
  107. jobList.erase((*jobLocked)->getJobId());
  108. } else {
  109. (*jobLocked)->setState(JobState::sent);
  110. #ifdef DEBUG_JOB_RESP
  111. printf("job %08X wrong crc %08X, %4lu, %4u\n", (*jobLocked)->getJobId(), buf[i], bufLen, i);
  112. for(size_t k=0; k<(*jobLocked)->getWordCount(); k++) {
  113. printf(" %4lu %08X", k, (*jobLocked)->getWord(k));
  114. }
  115. std::cout << std::endl;
  116. #endif
  117. }
  118. recvState = RecvState::checkPreamble;
  119. }
  120. break;
  121. }
  122. }
  123. if(jobLocked) delete jobLocked;
  124. return 0;
  125. }
  126. commFPGA::commFPGA(const char *host, uint _port, bool bindSelf) {
  127. port = _port;
  128. strcpy(ip, host);
  129. sendBuffer = new uint32_t[MAX_JOB_LEN];
  130. int err = 0;
  131. struct addrinfo hints, *res;
  132. //UDP host
  133. memset(&hints, 0, sizeof hints);
  134. hints.ai_family = AF_INET; // use IPv4
  135. hints.ai_socktype = SOCK_DGRAM;
  136. hints.ai_flags = AI_PASSIVE; // fill in my IP for me
  137. getaddrinfo(NULL, std::to_string(port).c_str(), &hints, &res);
  138. sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  139. if(bindSelf)
  140. err = bind(sock, res->ai_addr, res->ai_addrlen);
  141. if(err != 0) {
  142. printf("%15s sock: %2d, err: %2d, port: %5d\n", ip, sock, err, port);
  143. exit(1);
  144. }
  145. //set recieve timeout
  146. struct timeval tv;
  147. tv.tv_sec = 0;
  148. tv.tv_usec = 100000;
  149. setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
  150. //set recv buffer size
  151. //int rcvbufsize = MAX_JOB_LEN * 4 * 2;
  152. //setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&rcvbufsize,sizeof(rcvbufsize));
  153. //UDP client
  154. resolvehelper(host, AF_INET, std::to_string(port).c_str(), &addrDest);
  155. //send a packet to fpga to update its response port
  156. sendRaw((uint8_t*)"0000", 4);
  157. }
  158. commFPGA::~commFPGA() {
  159. running = false;
  160. delete sendBuffer;
  161. }
  162. int commFPGA::sendRaw(uint8_t *buf, uint bufLen) {
  163. int result = 0;
  164. uint_least32_t byteIndex = 0;
  165. while(byteIndex < bufLen) {
  166. uint payloadLen = bufLen - byteIndex;
  167. if(payloadLen > UDP_LEN/4*4)
  168. payloadLen = UDP_LEN/4*4;
  169. result = sendto(sock, &buf[byteIndex], payloadLen, 0, (sockaddr*)&addrDest, sizeof(addrDest));
  170. if(result == -1) {
  171. int err = errno;
  172. std::cout << "error sending packet " << err << std::endl;
  173. break;
  174. }
  175. byteIndex += payloadLen;
  176. }
  177. return byteIndex;
  178. }
  179. int commFPGA::assignJob(JobContainer &job) {
  180. if(job->getAssignedFPGA() != NULL)
  181. return -1;
  182. std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
  183. if(!lk.owns_lock())
  184. return -2;
  185. if(jobList.size() >= JOB_COUNT)
  186. return -3;
  187. std::lock_guard<std::mutex> slk(sendLock);
  188. uint_least32_t free = MAX_JOB_LEN - sendBufferAvailable;
  189. if(free < job->getWordCount())
  190. return -4;
  191. jobList.insert(std::pair<uint32_t,std::shared_ptr<Job>>(job->getJobId(), job.sharedPtr()));
  192. job->setAssignedFPGA(this);
  193. uint_least32_t sendBufferWriteIndex = sendBufferReadIndex + sendBufferAvailable;
  194. sendBufferAvailable += job->getWordCount();
  195. for(uint_least32_t i=0; i<job->getWordCount(); i++) {
  196. sendBuffer[(sendBufferWriteIndex + i) % MAX_JOB_LEN] = __builtin_bswap32(job->getWord(i));
  197. }
  198. #ifdef DEBUG_JOB_SEND
  199. printf("fill ");
  200. for(uint_least32_t i=0; i<job->getWordCount(); i++)
  201. printf("%u: %08X ", (sendBufferWriteIndex + i) % MAX_JOB_LEN, job->getWord(i));
  202. printf("\n");
  203. #endif
  204. return job->getWordCount();
  205. }
  206. int commFPGA::unassignJob(JobContainer &job) {
  207. if(job->getAssignedFPGA() != this)
  208. return -1;
  209. std::unique_lock<std::mutex> lk(jobLock, std::try_to_lock);
  210. if(!lk.owns_lock())
  211. return -2;
  212. if(job->getState() == JobState::receiving) {
  213. currentJob = NULL;
  214. job->setState(JobState::sent);
  215. #ifdef DEBUG_JOB_RESP
  216. printf("job %08X: unassigned during recv\n", job->getJobId());
  217. #endif
  218. }
  219. job->setAssignedFPGA(NULL);
  220. return jobList.erase(job->getJobId());
  221. }
  222. int commFPGA::sendFromBuffer() {
  223. std::lock_guard<std::mutex> lk(sendLock);
  224. int_least32_t avail = sendBufferAvailable + sendBufferReadIndex > MAX_JOB_LEN ? MAX_JOB_LEN - sendBufferReadIndex : sendBufferAvailable;
  225. if(avail == 0)
  226. return -1;
  227. if(avail > UDP_LEN/4)
  228. avail = UDP_LEN/4;
  229. sendRaw((uint8_t*)&sendBuffer[sendBufferReadIndex], avail * 4);
  230. //printf("%8d %4d %8lu\n", sendBufferAvailable, avail, sendBufferReadIndex);
  231. #ifdef DEBUG_JOB_SEND
  232. printf("send ");
  233. for(size_t i=0; i<avail; i++)
  234. printf("%lu: %08X ", sendBufferReadIndex+i, __builtin_bswap32(sendBuffer[sendBufferReadIndex+i]));
  235. printf("\n");
  236. #endif
  237. sendBufferReadIndex = (avail + sendBufferReadIndex) % MAX_JOB_LEN;
  238. sendBufferAvailable -= avail;
  239. return sendBufferAvailable;
  240. }
  241. size_t commFPGA::jobCount() {
  242. return jobList.size();
  243. }