123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976 |
- #!/home/nkotelnikova/PycharmProjects/moc/venv/bin/python
- #
- # Very simple serial terminal
- #
- # This file is part of pySerial. https://github.com/pyserial/pyserial
- # (C)2002-2015 Chris Liechti <cliechti@gmx.net>
- #
- # SPDX-License-Identifier: BSD-3-Clause
- import codecs
- import os
- import sys
- import threading
- import serial
- from serial.tools.list_ports import comports
- from serial.tools import hexlify_codec
- # pylint: disable=wrong-import-order,wrong-import-position
- codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None)
- try:
- raw_input
- except NameError:
- # pylint: disable=redefined-builtin,invalid-name
- raw_input = input # in python3 it's "raw"
- unichr = chr
- def key_description(character):
- """generate a readable description for a key"""
- ascii_code = ord(character)
- if ascii_code < 32:
- return 'Ctrl+{:c}'.format(ord('@') + ascii_code)
- else:
- return repr(character)
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- class ConsoleBase(object):
- """OS abstraction for console (input/output codec, no echo)"""
- def __init__(self):
- if sys.version_info >= (3, 0):
- self.byte_output = sys.stdout.buffer
- else:
- self.byte_output = sys.stdout
- self.output = sys.stdout
- def setup(self):
- """Set console to read single characters, no echo"""
- def cleanup(self):
- """Restore default console settings"""
- def getkey(self):
- """Read a single key from the console"""
- return None
- def write_bytes(self, byte_string):
- """Write bytes (already encoded)"""
- self.byte_output.write(byte_string)
- self.byte_output.flush()
- def write(self, text):
- """Write string"""
- self.output.write(text)
- self.output.flush()
- def cancel(self):
- """Cancel getkey operation"""
- # - - - - - - - - - - - - - - - - - - - - - - - -
- # context manager:
- # switch terminal temporary to normal mode (e.g. to get user input)
- def __enter__(self):
- self.cleanup()
- return self
- def __exit__(self, *args, **kwargs):
- self.setup()
- if os.name == 'nt': # noqa
- import msvcrt
- import ctypes
- class Out(object):
- """file-like wrapper that uses os.write"""
- def __init__(self, fd):
- self.fd = fd
- def flush(self):
- pass
- def write(self, s):
- os.write(self.fd, s)
- class Console(ConsoleBase):
- def __init__(self):
- super(Console, self).__init__()
- self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP()
- self._saved_icp = ctypes.windll.kernel32.GetConsoleCP()
- ctypes.windll.kernel32.SetConsoleOutputCP(65001)
- ctypes.windll.kernel32.SetConsoleCP(65001)
- self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace')
- # the change of the code page is not propagated to Python, manually fix it
- sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace')
- sys.stdout = self.output
- self.output.encoding = 'UTF-8' # needed for input
- def __del__(self):
- ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp)
- ctypes.windll.kernel32.SetConsoleCP(self._saved_icp)
- def getkey(self):
- while True:
- z = msvcrt.getwch()
- if z == unichr(13):
- return unichr(10)
- elif z in (unichr(0), unichr(0x0e)): # functions keys, ignore
- msvcrt.getwch()
- else:
- return z
- def cancel(self):
- # CancelIo, CancelSynchronousIo do not seem to work when using
- # getwch, so instead, send a key to the window with the console
- hwnd = ctypes.windll.kernel32.GetConsoleWindow()
- ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0)
- elif os.name == 'posix':
- import atexit
- import termios
- import fcntl
- class Console(ConsoleBase):
- def __init__(self):
- super(Console, self).__init__()
- self.fd = sys.stdin.fileno()
- self.old = termios.tcgetattr(self.fd)
- atexit.register(self.cleanup)
- if sys.version_info < (3, 0):
- self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin)
- else:
- self.enc_stdin = sys.stdin
- def setup(self):
- new = termios.tcgetattr(self.fd)
- new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
- new[6][termios.VMIN] = 1
- new[6][termios.VTIME] = 0
- termios.tcsetattr(self.fd, termios.TCSANOW, new)
- def getkey(self):
- c = self.enc_stdin.read(1)
- if c == unichr(0x7f):
- c = unichr(8) # map the BS key (which yields DEL) to backspace
- return c
- def cancel(self):
- fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0')
- def cleanup(self):
- termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
- else:
- raise NotImplementedError(
- 'Sorry no implementation for your platform ({}) available.'.format(sys.platform))
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- class Transform(object):
- """do-nothing: forward all data unchanged"""
- def rx(self, text):
- """text received from serial port"""
- return text
- def tx(self, text):
- """text to be sent to serial port"""
- return text
- def echo(self, text):
- """text to be sent but displayed on console"""
- return text
- class CRLF(Transform):
- """ENTER sends CR+LF"""
- def tx(self, text):
- return text.replace('\n', '\r\n')
- class CR(Transform):
- """ENTER sends CR"""
- def rx(self, text):
- return text.replace('\r', '\n')
- def tx(self, text):
- return text.replace('\n', '\r')
- class LF(Transform):
- """ENTER sends LF"""
- class NoTerminal(Transform):
- """remove typical terminal control codes from input"""
- REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t')
- REPLACEMENT_MAP.update(
- {
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
- })
- def rx(self, text):
- return text.translate(self.REPLACEMENT_MAP)
- echo = rx
- class NoControls(NoTerminal):
- """Remove all control codes, incl. CR+LF"""
- REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32))
- REPLACEMENT_MAP.update(
- {
- 0x20: 0x2423, # visual space
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
- })
- class Printable(Transform):
- """Show decimal code for all non-ASCII characters and replace most control codes"""
- def rx(self, text):
- r = []
- for c in text:
- if ' ' <= c < '\x7f' or c in '\r\n\b\t':
- r.append(c)
- elif c < ' ':
- r.append(unichr(0x2400 + ord(c)))
- else:
- r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c)))
- r.append(' ')
- return ''.join(r)
- echo = rx
- class Colorize(Transform):
- """Apply different colors for received and echo"""
- def __init__(self):
- # XXX make it configurable, use colorama?
- self.input_color = '\x1b[37m'
- self.echo_color = '\x1b[31m'
- def rx(self, text):
- return self.input_color + text
- def echo(self, text):
- return self.echo_color + text
- class DebugIO(Transform):
- """Print what is sent and received"""
- def rx(self, text):
- sys.stderr.write(' [RX:{}] '.format(repr(text)))
- sys.stderr.flush()
- return text
- def tx(self, text):
- sys.stderr.write(' [TX:{}] '.format(repr(text)))
- sys.stderr.flush()
- return text
- # other ideas:
- # - add date/time for each newline
- # - insert newline after: a) timeout b) packet end character
- EOL_TRANSFORMATIONS = {
- 'crlf': CRLF,
- 'cr': CR,
- 'lf': LF,
- }
- TRANSFORMATIONS = {
- 'direct': Transform, # no transformation
- 'default': NoTerminal,
- 'nocontrol': NoControls,
- 'printable': Printable,
- 'colorize': Colorize,
- 'debug': DebugIO,
- }
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- def ask_for_port():
- """\
- Show a list of ports and ask the user for a choice. To make selection
- easier on systems with long device names, also allow the input of an
- index.
- """
- sys.stderr.write('\n--- Available ports:\n')
- ports = []
- for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
- sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
- ports.append(port)
- while True:
- port = raw_input('--- Enter port index or full name: ')
- try:
- index = int(port) - 1
- if not 0 <= index < len(ports):
- sys.stderr.write('--- Invalid index!\n')
- continue
- except ValueError:
- pass
- else:
- port = ports[index]
- return port
- class Miniterm(object):
- """\
- Terminal application. Copy data from serial port to console and vice versa.
- Handle special keys from the console to show menu etc.
- """
- def __init__(self, serial_instance, echo=False, eol='crlf', filters=()):
- self.console = Console()
- self.serial = serial_instance
- self.echo = echo
- self.raw = False
- self.input_encoding = 'UTF-8'
- self.output_encoding = 'UTF-8'
- self.eol = eol
- self.filters = filters
- self.update_transformations()
- self.exit_character = 0x1d # GS/CTRL+]
- self.menu_character = 0x14 # Menu: CTRL+T
- self.alive = None
- self._reader_alive = None
- self.receiver_thread = None
- self.rx_decoder = None
- self.tx_decoder = None
- def _start_reader(self):
- """Start reader thread"""
- self._reader_alive = True
- # start serial->console thread
- self.receiver_thread = threading.Thread(target=self.reader, name='rx')
- self.receiver_thread.daemon = True
- self.receiver_thread.start()
- def _stop_reader(self):
- """Stop reader thread only, wait for clean exit of thread"""
- self._reader_alive = False
- if hasattr(self.serial, 'cancel_read'):
- self.serial.cancel_read()
- self.receiver_thread.join()
- def start(self):
- """start worker threads"""
- self.alive = True
- self._start_reader()
- # enter console->serial loop
- self.transmitter_thread = threading.Thread(target=self.writer, name='tx')
- self.transmitter_thread.daemon = True
- self.transmitter_thread.start()
- self.console.setup()
- def stop(self):
- """set flag to stop worker threads"""
- self.alive = False
- def join(self, transmit_only=False):
- """wait for worker threads to terminate"""
- self.transmitter_thread.join()
- if not transmit_only:
- if hasattr(self.serial, 'cancel_read'):
- self.serial.cancel_read()
- self.receiver_thread.join()
- def close(self):
- self.serial.close()
- def update_transformations(self):
- """take list of transformation classes and instantiate them for rx and tx"""
- transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f]
- for f in self.filters]
- self.tx_transformations = [t() for t in transformations]
- self.rx_transformations = list(reversed(self.tx_transformations))
- def set_rx_encoding(self, encoding, errors='replace'):
- """set encoding for received data"""
- self.input_encoding = encoding
- self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors)
- def set_tx_encoding(self, encoding, errors='replace'):
- """set encoding for transmitted data"""
- self.output_encoding = encoding
- self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
- def dump_port_settings(self):
- """Write current settings to sys.stderr"""
- sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
- p=self.serial))
- sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
- ('active' if self.serial.rts else 'inactive'),
- ('active' if self.serial.dtr else 'inactive'),
- ('active' if self.serial.break_condition else 'inactive')))
- try:
- sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
- ('active' if self.serial.cts else 'inactive'),
- ('active' if self.serial.dsr else 'inactive'),
- ('active' if self.serial.ri else 'inactive'),
- ('active' if self.serial.cd else 'inactive')))
- except serial.SerialException:
- # on RFC 2217 ports, it can happen if no modem state notification was
- # yet received. ignore this error.
- pass
- sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
- sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
- sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
- def reader(self):
- """loop and copy serial->console"""
- try:
- while self.alive and self._reader_alive:
- # read all that is there or wait for one byte
- data = self.serial.read(self.serial.in_waiting or 1)
- if data:
- if self.raw:
- self.console.write_bytes(data)
- else:
- text = self.rx_decoder.decode(data)
- for transformation in self.rx_transformations:
- text = transformation.rx(text)
- self.console.write(text)
- except serial.SerialException:
- self.alive = False
- self.console.cancel()
- raise # XXX handle instead of re-raise?
- def writer(self):
- """\
- Loop and copy console->serial until self.exit_character character is
- found. When self.menu_character is found, interpret the next key
- locally.
- """
- menu_active = False
- try:
- while self.alive:
- try:
- c = self.console.getkey()
- except KeyboardInterrupt:
- c = '\x03'
- if not self.alive:
- break
- if menu_active:
- self.handle_menu_key(c)
- menu_active = False
- elif c == self.menu_character:
- menu_active = True # next char will be for menu
- elif c == self.exit_character:
- self.stop() # exit app
- break
- else:
- #~ if self.raw:
- text = c
- for transformation in self.tx_transformations:
- text = transformation.tx(text)
- self.serial.write(self.tx_encoder.encode(text))
- if self.echo:
- echo_text = c
- for transformation in self.tx_transformations:
- echo_text = transformation.echo(echo_text)
- self.console.write(echo_text)
- except:
- self.alive = False
- raise
- def handle_menu_key(self, c):
- """Implement a simple menu / settings"""
- if c == self.menu_character or c == self.exit_character:
- # Menu/exit character again -> send itself
- self.serial.write(self.tx_encoder.encode(c))
- if self.echo:
- self.console.write(c)
- elif c == '\x15': # CTRL+U -> upload file
- self.upload_file()
- elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
- sys.stderr.write(self.get_help_text())
- elif c == '\x12': # CTRL+R -> Toggle RTS
- self.serial.rts = not self.serial.rts
- sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive'))
- elif c == '\x04': # CTRL+D -> Toggle DTR
- self.serial.dtr = not self.serial.dtr
- sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive'))
- elif c == '\x02': # CTRL+B -> toggle BREAK condition
- self.serial.break_condition = not self.serial.break_condition
- sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive'))
- elif c == '\x05': # CTRL+E -> toggle local echo
- self.echo = not self.echo
- sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
- elif c == '\x06': # CTRL+F -> edit filters
- self.change_filter()
- elif c == '\x0c': # CTRL+L -> EOL mode
- modes = list(EOL_TRANSFORMATIONS) # keys
- eol = modes.index(self.eol) + 1
- if eol >= len(modes):
- eol = 0
- self.eol = modes[eol]
- sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
- self.update_transformations()
- elif c == '\x01': # CTRL+A -> set encoding
- self.change_encoding()
- elif c == '\x09': # CTRL+I -> info
- self.dump_port_settings()
- #~ elif c == '\x01': # CTRL+A -> cycle escape mode
- #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
- elif c in 'pP': # P -> change port
- self.change_port()
- elif c in 'sS': # S -> suspend / open port temporarily
- self.suspend_port()
- elif c in 'bB': # B -> change baudrate
- self.change_baudrate()
- elif c == '8': # 8 -> change to 8 bits
- self.serial.bytesize = serial.EIGHTBITS
- self.dump_port_settings()
- elif c == '7': # 7 -> change to 8 bits
- self.serial.bytesize = serial.SEVENBITS
- self.dump_port_settings()
- elif c in 'eE': # E -> change to even parity
- self.serial.parity = serial.PARITY_EVEN
- self.dump_port_settings()
- elif c in 'oO': # O -> change to odd parity
- self.serial.parity = serial.PARITY_ODD
- self.dump_port_settings()
- elif c in 'mM': # M -> change to mark parity
- self.serial.parity = serial.PARITY_MARK
- self.dump_port_settings()
- elif c in 'sS': # S -> change to space parity
- self.serial.parity = serial.PARITY_SPACE
- self.dump_port_settings()
- elif c in 'nN': # N -> change to no parity
- self.serial.parity = serial.PARITY_NONE
- self.dump_port_settings()
- elif c == '1': # 1 -> change to 1 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE
- self.dump_port_settings()
- elif c == '2': # 2 -> change to 2 stop bits
- self.serial.stopbits = serial.STOPBITS_TWO
- self.dump_port_settings()
- elif c == '3': # 3 -> change to 1.5 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
- self.dump_port_settings()
- elif c in 'xX': # X -> change software flow control
- self.serial.xonxoff = (c == 'X')
- self.dump_port_settings()
- elif c in 'rR': # R -> change hardware flow control
- self.serial.rtscts = (c == 'R')
- self.dump_port_settings()
- else:
- sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
- def upload_file(self):
- """Ask user for filenname and send its contents"""
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- with self.console:
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- with open(filename, 'rb') as f:
- sys.stderr.write('--- Sending file {} ---\n'.format(filename))
- while True:
- block = f.read(1024)
- if not block:
- break
- self.serial.write(block)
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
- except IOError as e:
- sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
- def change_filter(self):
- """change the i/o transformations"""
- sys.stderr.write('\n--- Available Filters:\n')
- sys.stderr.write('\n'.join(
- '--- {:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
- with self.console:
- new_filters = sys.stdin.readline().lower().split()
- if new_filters:
- for f in new_filters:
- if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
- break
- else:
- self.filters = new_filters
- self.update_transformations()
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
- def change_encoding(self):
- """change encoding on the serial port"""
- sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
- with self.console:
- new_encoding = sys.stdin.readline().strip()
- if new_encoding:
- try:
- codecs.lookup(new_encoding)
- except LookupError:
- sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
- else:
- self.set_rx_encoding(new_encoding)
- self.set_tx_encoding(new_encoding)
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
- def change_baudrate(self):
- """change the baudrate"""
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- with self.console:
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError as e:
- sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
- def change_port(self):
- """Have a conversation with the user to change the serial port"""
- with self.console:
- try:
- port = ask_for_port()
- except KeyboardInterrupt:
- port = None
- if port and port != self.serial.port:
- # reader thread needs to be shut down
- self._stop_reader()
- # save settings
- settings = self.serial.getSettingsDict()
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- # restore settings and open
- new_serial.applySettingsDict(settings)
- new_serial.rts = self.serial.rts
- new_serial.dtr = self.serial.dtr
- new_serial.open()
- new_serial.break_condition = self.serial.break_condition
- except Exception as e:
- sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
- new_serial.close()
- else:
- self.serial.close()
- self.serial = new_serial
- sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
- # and restart the reader thread
- self._start_reader()
- def suspend_port(self):
- """\
- open port temporarily, allow reconnect, exit and port change to get
- out of the loop
- """
- # reader thread needs to be shut down
- self._stop_reader()
- self.serial.close()
- sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
- do_change_port = False
- while not self.serial.is_open:
- sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
- exit=key_description(self.exit_character)))
- k = self.console.getkey()
- if k == self.exit_character:
- self.stop() # exit app
- break
- elif k in 'pP':
- do_change_port = True
- break
- try:
- self.serial.open()
- except Exception as e:
- sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
- if do_change_port:
- self.change_port()
- else:
- # and restart the reader thread
- self._start_reader()
- sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
- def get_help_text(self):
- """return the help text"""
- # help text, starts with blank line!
- return """
- --- pySerial ({version}) - miniterm - help
- ---
- --- {exit:8} Exit program
- --- {menu:8} Menu escape key, followed by:
- --- Menu keys:
- --- {menu:7} Send the menu character itself to remote
- --- {exit:7} Send the exit character itself to remote
- --- {info:7} Show info
- --- {upload:7} Upload file (prompt will be shown)
- --- {repr:7} encoding
- --- {filter:7} edit filters
- --- Toggles:
- --- {rts:7} RTS {dtr:7} DTR {brk:7} BREAK
- --- {echo:7} echo {eol:7} EOL
- ---
- --- Port settings ({menu} followed by the following):
- --- p change port
- --- 7 8 set data bits
- --- N E O S M change parity (None, Even, Odd, Space, Mark)
- --- 1 2 3 set stop bits (1, 2, 1.5)
- --- b change baud rate
- --- x X disable/enable software flow control
- --- r R disable/enable hardware flow control
- """.format(version=getattr(serial, 'VERSION', 'unknown version'),
- exit=key_description(self.exit_character),
- menu=key_description(self.menu_character),
- rts=key_description('\x12'),
- dtr=key_description('\x04'),
- brk=key_description('\x02'),
- echo=key_description('\x05'),
- info=key_description('\x09'),
- upload=key_description('\x15'),
- repr=key_description('\x01'),
- filter=key_description('\x06'),
- eol=key_description('\x0c'))
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- # default args can be used to override when calling main() from an other script
- # e.g to create a miniterm-my-device.py
- def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None):
- """Command line tool, entry point"""
- import argparse
- parser = argparse.ArgumentParser(
- description="Miniterm - A simple terminal program for the serial port.")
- parser.add_argument(
- "port",
- nargs='?',
- help="serial port name ('-' to show port list)",
- default=default_port)
- parser.add_argument(
- "baudrate",
- nargs='?',
- type=int,
- help="set baud rate, default: %(default)s",
- default=default_baudrate)
- group = parser.add_argument_group("port settings")
- group.add_argument(
- "--parity",
- choices=['N', 'E', 'O', 'S', 'M'],
- type=lambda c: c.upper(),
- help="set parity, one of {N E O S M}, default: N",
- default='N')
- group.add_argument(
- "--rtscts",
- action="store_true",
- help="enable RTS/CTS flow control (default off)",
- default=False)
- group.add_argument(
- "--xonxoff",
- action="store_true",
- help="enable software flow control (default off)",
- default=False)
- group.add_argument(
- "--rts",
- type=int,
- help="set initial RTS line state (possible values: 0, 1)",
- default=default_rts)
- group.add_argument(
- "--dtr",
- type=int,
- help="set initial DTR line state (possible values: 0, 1)",
- default=default_dtr)
- group.add_argument(
- "--ask",
- action="store_true",
- help="ask again for port when open fails",
- default=False)
- group = parser.add_argument_group("data handling")
- group.add_argument(
- "-e", "--echo",
- action="store_true",
- help="enable local echo (default off)",
- default=False)
- group.add_argument(
- "--encoding",
- dest="serial_port_encoding",
- metavar="CODEC",
- help="set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s",
- default='UTF-8')
- group.add_argument(
- "-f", "--filter",
- action="append",
- metavar="NAME",
- help="add text transformation",
- default=[])
- group.add_argument(
- "--eol",
- choices=['CR', 'LF', 'CRLF'],
- type=lambda c: c.upper(),
- help="end of line mode",
- default='CRLF')
- group.add_argument(
- "--raw",
- action="store_true",
- help="Do no apply any encodings/transformations",
- default=False)
- group = parser.add_argument_group("hotkeys")
- group.add_argument(
- "--exit-char",
- type=int,
- metavar='NUM',
- help="Unicode of special character that is used to exit the application, default: %(default)s",
- default=0x1d) # GS/CTRL+]
- group.add_argument(
- "--menu-char",
- type=int,
- metavar='NUM',
- help="Unicode code of special character that is used to control miniterm (menu), default: %(default)s",
- default=0x14) # Menu: CTRL+T
- group = parser.add_argument_group("diagnostics")
- group.add_argument(
- "-q", "--quiet",
- action="store_true",
- help="suppress non-error messages",
- default=False)
- group.add_argument(
- "--develop",
- action="store_true",
- help="show Python traceback on error",
- default=False)
- args = parser.parse_args()
- if args.menu_char == args.exit_char:
- parser.error('--exit-char can not be the same as --menu-char')
- if args.filter:
- if 'help' in args.filter:
- sys.stderr.write('Available filters:\n')
- sys.stderr.write('\n'.join(
- '{:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n')
- sys.exit(1)
- filters = args.filter
- else:
- filters = ['default']
- while True:
- # no port given on command line -> ask user now
- if args.port is None or args.port == '-':
- try:
- args.port = ask_for_port()
- except KeyboardInterrupt:
- sys.stderr.write('\n')
- parser.error('user aborted and port is not given')
- else:
- if not args.port:
- parser.error('port is not given')
- try:
- serial_instance = serial.serial_for_url(
- args.port,
- args.baudrate,
- parity=args.parity,
- rtscts=args.rtscts,
- xonxoff=args.xonxoff,
- do_not_open=True)
- if not hasattr(serial_instance, 'cancel_read'):
- # enable timeout for alive flag polling if cancel_read is not available
- serial_instance.timeout = 1
- if args.dtr is not None:
- if not args.quiet:
- sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive'))
- serial_instance.dtr = args.dtr
- if args.rts is not None:
- if not args.quiet:
- sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
- serial_instance.rts = args.rts
- serial_instance.open()
- except serial.SerialException as e:
- sys.stderr.write('could not open port {}: {}\n'.format(repr(args.port), e))
- if args.develop:
- raise
- if not args.ask:
- sys.exit(1)
- else:
- args.port = '-'
- else:
- break
- miniterm = Miniterm(
- serial_instance,
- echo=args.echo,
- eol=args.eol.lower(),
- filters=filters)
- miniterm.exit_character = unichr(args.exit_char)
- miniterm.menu_character = unichr(args.menu_char)
- miniterm.raw = args.raw
- miniterm.set_rx_encoding(args.serial_port_encoding)
- miniterm.set_tx_encoding(args.serial_port_encoding)
- if not args.quiet:
- sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format(
- p=miniterm.serial))
- sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format(
- key_description(miniterm.exit_character),
- key_description(miniterm.menu_character),
- key_description(miniterm.menu_character),
- key_description('\x08')))
- miniterm.start()
- try:
- miniterm.join(True)
- except KeyboardInterrupt:
- pass
- if not args.quiet:
- sys.stderr.write("\n--- exit ---\n")
- miniterm.join()
- miniterm.close()
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- if __name__ == '__main__':
- main()
|