serialcli.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. #! python
  2. #
  3. # Backend for .NET/Mono (IronPython), .NET >= 2
  4. #
  5. # This file is part of pySerial. https://github.com/pyserial/pyserial
  6. # (C) 2008-2015 Chris Liechti <cliechti@gmx.net>
  7. #
  8. # SPDX-License-Identifier: BSD-3-Clause
  9. import System
  10. import System.IO.Ports
  11. from serial.serialutil import *
  12. # must invoke function with byte array, make a helper to convert strings
  13. # to byte arrays
  14. sab = System.Array[System.Byte]
  15. def as_byte_array(string):
  16. return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython
  17. class Serial(SerialBase):
  18. """Serial port implementation for .NET/Mono."""
  19. BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
  20. 9600, 19200, 38400, 57600, 115200)
  21. def open(self):
  22. """\
  23. Open port with current settings. This may throw a SerialException
  24. if the port cannot be opened.
  25. """
  26. if self._port is None:
  27. raise SerialException("Port must be configured before it can be used.")
  28. if self.is_open:
  29. raise SerialException("Port is already open.")
  30. try:
  31. self._port_handle = System.IO.Ports.SerialPort(self.portstr)
  32. except Exception as msg:
  33. self._port_handle = None
  34. raise SerialException("could not open port %s: %s" % (self.portstr, msg))
  35. # if RTS and/or DTR are not set before open, they default to True
  36. if self._rts_state is None:
  37. self._rts_state = True
  38. if self._dtr_state is None:
  39. self._dtr_state = True
  40. self._reconfigure_port()
  41. self._port_handle.Open()
  42. self.is_open = True
  43. if not self._dsrdtr:
  44. self._update_dtr_state()
  45. if not self._rtscts:
  46. self._update_rts_state()
  47. self.reset_input_buffer()
  48. def _reconfigure_port(self):
  49. """Set communication parameters on opened port."""
  50. if not self._port_handle:
  51. raise SerialException("Can only operate on a valid port handle")
  52. #~ self._port_handle.ReceivedBytesThreshold = 1
  53. if self._timeout is None:
  54. self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
  55. else:
  56. self._port_handle.ReadTimeout = int(self._timeout * 1000)
  57. # if self._timeout != 0 and self._interCharTimeout is not None:
  58. # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
  59. if self._write_timeout is None:
  60. self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
  61. else:
  62. self._port_handle.WriteTimeout = int(self._write_timeout * 1000)
  63. # Setup the connection info.
  64. try:
  65. self._port_handle.BaudRate = self._baudrate
  66. except IOError as e:
  67. # catch errors from illegal baudrate settings
  68. raise ValueError(str(e))
  69. if self._bytesize == FIVEBITS:
  70. self._port_handle.DataBits = 5
  71. elif self._bytesize == SIXBITS:
  72. self._port_handle.DataBits = 6
  73. elif self._bytesize == SEVENBITS:
  74. self._port_handle.DataBits = 7
  75. elif self._bytesize == EIGHTBITS:
  76. self._port_handle.DataBits = 8
  77. else:
  78. raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
  79. if self._parity == PARITY_NONE:
  80. self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
  81. elif self._parity == PARITY_EVEN:
  82. self._port_handle.Parity = System.IO.Ports.Parity.Even
  83. elif self._parity == PARITY_ODD:
  84. self._port_handle.Parity = System.IO.Ports.Parity.Odd
  85. elif self._parity == PARITY_MARK:
  86. self._port_handle.Parity = System.IO.Ports.Parity.Mark
  87. elif self._parity == PARITY_SPACE:
  88. self._port_handle.Parity = System.IO.Ports.Parity.Space
  89. else:
  90. raise ValueError("Unsupported parity mode: %r" % self._parity)
  91. if self._stopbits == STOPBITS_ONE:
  92. self._port_handle.StopBits = System.IO.Ports.StopBits.One
  93. elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
  94. self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
  95. elif self._stopbits == STOPBITS_TWO:
  96. self._port_handle.StopBits = System.IO.Ports.StopBits.Two
  97. else:
  98. raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
  99. if self._rtscts and self._xonxoff:
  100. self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
  101. elif self._rtscts:
  102. self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
  103. elif self._xonxoff:
  104. self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
  105. else:
  106. self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k
  107. #~ def __del__(self):
  108. #~ self.close()
  109. def close(self):
  110. """Close port"""
  111. if self.is_open:
  112. if self._port_handle:
  113. try:
  114. self._port_handle.Close()
  115. except System.IO.Ports.InvalidOperationException:
  116. # ignore errors. can happen for unplugged USB serial devices
  117. pass
  118. self._port_handle = None
  119. self.is_open = False
  120. # - - - - - - - - - - - - - - - - - - - - - - - -
  121. @property
  122. def in_waiting(self):
  123. """Return the number of characters currently in the input buffer."""
  124. if not self.is_open:
  125. raise portNotOpenError
  126. return self._port_handle.BytesToRead
  127. def read(self, size=1):
  128. """\
  129. Read size bytes from the serial port. If a timeout is set it may
  130. return less characters as requested. With no timeout it will block
  131. until the requested number of bytes is read.
  132. """
  133. if not self.is_open:
  134. raise portNotOpenError
  135. # must use single byte reads as this is the only way to read
  136. # without applying encodings
  137. data = bytearray()
  138. while size:
  139. try:
  140. data.append(self._port_handle.ReadByte())
  141. except System.TimeoutException:
  142. break
  143. else:
  144. size -= 1
  145. return bytes(data)
  146. def write(self, data):
  147. """Output the given string over the serial port."""
  148. if not self.is_open:
  149. raise portNotOpenError
  150. #~ if not isinstance(data, (bytes, bytearray)):
  151. #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
  152. try:
  153. # must call overloaded method with byte array argument
  154. # as this is the only one not applying encodings
  155. self._port_handle.Write(as_byte_array(data), 0, len(data))
  156. except System.TimeoutException:
  157. raise writeTimeoutError
  158. return len(data)
  159. def reset_input_buffer(self):
  160. """Clear input buffer, discarding all that is in the buffer."""
  161. if not self.is_open:
  162. raise portNotOpenError
  163. self._port_handle.DiscardInBuffer()
  164. def reset_output_buffer(self):
  165. """\
  166. Clear output buffer, aborting the current output and
  167. discarding all that is in the buffer.
  168. """
  169. if not self.is_open:
  170. raise portNotOpenError
  171. self._port_handle.DiscardOutBuffer()
  172. def _update_break_state(self):
  173. """
  174. Set break: Controls TXD. When active, to transmitting is possible.
  175. """
  176. if not self.is_open:
  177. raise portNotOpenError
  178. self._port_handle.BreakState = bool(self._break_state)
  179. def _update_rts_state(self):
  180. """Set terminal status line: Request To Send"""
  181. if not self.is_open:
  182. raise portNotOpenError
  183. self._port_handle.RtsEnable = bool(self._rts_state)
  184. def _update_dtr_state(self):
  185. """Set terminal status line: Data Terminal Ready"""
  186. if not self.is_open:
  187. raise portNotOpenError
  188. self._port_handle.DtrEnable = bool(self._dtr_state)
  189. @property
  190. def cts(self):
  191. """Read terminal status line: Clear To Send"""
  192. if not self.is_open:
  193. raise portNotOpenError
  194. return self._port_handle.CtsHolding
  195. @property
  196. def dsr(self):
  197. """Read terminal status line: Data Set Ready"""
  198. if not self.is_open:
  199. raise portNotOpenError
  200. return self._port_handle.DsrHolding
  201. @property
  202. def ri(self):
  203. """Read terminal status line: Ring Indicator"""
  204. if not self.is_open:
  205. raise portNotOpenError
  206. #~ return self._port_handle.XXX
  207. return False # XXX an error would be better
  208. @property
  209. def cd(self):
  210. """Read terminal status line: Carrier Detect"""
  211. if not self.is_open:
  212. raise portNotOpenError
  213. return self._port_handle.CDHolding
  214. # - - platform specific - - - -
  215. # none