#!/usr/bin/env python3 # -*- coding: utf-8 -*- ########################################################################################################################################## # Python-File Information # ########################################################################################################################################## # # File: Xcom_Test_Program.py # Author: Tobias Müller # Date: 2017.08.09 # ########################################################################################################################################## # Requirements # ########################################################################################################################################## import sys import signal import socket import os import time import ptvsd as debug import serial as UART import pigpio as GPIO from Xcom_API import Xcom_API ########################################################################################################################################## # Class Definition & Description # ########################################################################################################################################## # This program demonstrate the functionality of the Xcom_API-class. To use this program a Raspberry Pi with a Raspicomm extender module # and a RS232-bridge called Xcom-232i is required. ########################################################################################################################################## # Program Information # ########################################################################################################################################## PROG_NAME = 'Xcom_Test_Program' PROG_VERSION = 'v1.0' ########################################################################################################################################## # Debug server-Settings # ########################################################################################################################################## debug.enable_attach(secret='Debug') ########################################################################################################################################## # GPIO-Server-Setting # ########################################################################################################################################## # LED-Ports LED1 = 18 LED2 = 27 # Detect GPIO-HOST Hostname = socket.gethostname() # GPIO-State-Variable GPIO_ENABLE = False # GPIO-ERROR_Variable GPIO_ERR = False ########################################################################################################################################## # COM-Port-Setting # ########################################################################################################################################## # COM-Parameter-Variable COM_PARA = {'Port' : '/dev/ttyAMA0', 'Baudrate' : 115200, 'Bytesize' : UART.EIGHTBITS, 'Parity' : UART.PARITY_EVEN, 'Stopbits' : UART.STOPBITS_ONE, 'Timeout' : 2, 'XON_XOFF' : False, 'RTS_CTS' : False, 'DSR_DTR' : False} # COM-State-Variable COM_ENABLE = False # COM-ERROR-Variable COM_ERR = False COM_TIME_ERR = False ########################################################################################################################################## # Global Variable # ########################################################################################################################################## object_set = False port = '' baudrate = '' crc = False source = '' destination = '' ########################################################################################################################################## # Functions # ########################################################################################################################################## # exit Programm def exit(): sys.exit(0) #print header def header(): # Use Global-Variable global port global baudrate global object_set global PROG_NAME global PROG_VERSION global COM_ENABLE global GPIO_ENABLE global destination global crc global source # clear terminal os.system('cls' if os.name == 'nt' else 'clear') #print Header print('#' *120) print('#' *120) print('##',' ' *114,'##') print('## Welcome to {} version: {}. This program demonstrate the functionality of the {}-class. ##'.format(PROG_NAME, PROG_VERSION[1:],Xcom_API.get_prog_name())) print('##',' ' *114,'##') print('## COM-Port: {}{}destination-addr.: {}{}Version Xcom_API: {} ##'.format(port,' '*(26-len(port)),destination,' '*(24-len(destination)),Xcom_API.get_prog_version()[1:])) print('## Baudrate: {}{}source-addr.: {}{}Object of Xcom_API set: {}{} ##'.format(baudrate,' '*(26-len(baudrate)),source,' '*(29-len(source)),object_set,' '*(object_set))) print('## COM enable: {}{}CRC-Check enable: {}{}GPIO enable: {}{} ##'.format(COM_ENABLE,' '*(19+COM_ENABLE),crc,' '*(20+crc),GPIO_ENABLE,' '*(GPIO_ENABLE))) print('##',' ' *114,'##') print('#' *120) print('#' *120,'\n') # setup def setup(): # Use Global-Variable global COM_PARA global port global baudrate global destination global crc global source # get COM-Port if len(sys.argv) >= 2: port = sys.argv[1] COM_PARA['Port'] = port header() else: while True: Buffer = input('Do you want to use COM-Port: /dev/ttyAMA0? (yes/no): ') if Buffer == 'yes' or Buffer == 'y': port = COM_PARA['Port'] header() break elif Buffer == 'no' or Buffer == 'n': port = input('Enter a COM-Port: ') COM_PARA['Port'] = port header() break else: print('What do you mean?\n') # get baudrate if len(sys.argv) >= 3 and (sys.argv[2] == '115200' or sys.argv[2] == '38400'): baudrate = sys.argv[2] COM_PARA['Baudrate'] = int(baudrate) header() else: while True: Buffer = input('Do you want to use a Baudrate of: 115200? (yes/no): ') if Buffer == 'yes' or Buffer == 'y': baudrate = str(COM_PARA['Baudrate']) header() break elif Buffer == 'no' or Buffer == 'n': while True: baudrate = input('Enter a Baudrate (115200 or 38400): ') if baudrate == '115200' or baudrate == '38400': COM_PARA['Baudrate'] = int(baudrate) header() break else: print('baudrate: {}, is not supported.'.format(baudrate)) break else: print('What do you mean?\n') # get destination-address if len(sys.argv) >= 4: destination = sys.argv[3] header() else: while True: Buffer = input('Do you want to use the destination-address: 101? (yes/no): ') if Buffer == 'yes' or Buffer == 'y': destination = str(101) header() break elif Buffer == 'no' or Buffer == 'n': while True: destination = input('Enter a destination-address: ') try: if isinstance(int(destination),int): header() break except ValueError: print('destination-address: {}, is not an integer.'.format(destination)) break else: print('What do you mean?\n') # get source-address if len(sys.argv) >= 5: source = sys.argv[4] header() else: while True: Buffer = input('Do you want to use the source-address: 1? (yes/no): ') if Buffer == 'yes' or Buffer == 'y': source = str(1) header() break elif Buffer == 'no' or Buffer == 'n': while True: source = input('Enter a source-address: ') try: if isinstance(int(source),int): header() break except ValueError: print('source-address: {}, is not an integer.'.format(source)) break else: print('What do you mean?\n') # get crc check state if len(sys.argv) == 6 and (sys.argv[5] == 'False' or sys.argv[5] == 'True' or sys.argv[5] == '1' or sys.argv[5] == '0'): if sys.argv[5] == 'True' or sys.argv[5] == '1': crc = True else: crc = False header() else: while True: Buffer = input('Do you want to activate CRC-Check? (yes/no): ') if Buffer == 'yes' or Buffer == 'y': crc = True header() break elif Buffer == 'no' or Buffer == 'n': crc = False header() break else: print('What do you mean?\n') # open GPIOs def open_gpios(LED1,LED2,Hostname): # Use Global-Variable global GPIO_ERR global GPIO_ENABLE # Try Function try: # Enable GPIO-Server LED_Handle = GPIO.pi(Hostname) # GPIO-Mode LED_Handle.set_mode(LED1, GPIO.OUTPUT) LED_Handle.set_mode(LED2, GPIO.OUTPUT) # GPIO-Start-State LED_Handle.write(LED1, GPIO.LOW) LED_Handle.write(LED2, GPIO.LOW) GPIO_ENABLE = True # print completed task print('GPIOs on \"{}\" are configured.'.format(Hostname)) # return LED-Handle return LED_Handle # Handle Errors except AttributeError: print('There is no connection to the \"{}\" possible.\nEnsure that the Deamon-Host-Process\"pigpiod\" has started with root privileges on the Raspberry Pi to connect to the GPIOs.'.format(Hostname)) GPIO_ERR = True exit() # close GPIOs def close_gpios(LED_Handle,Hostname,LED1,LED2): # Use Global-Variable global GPIO_ERR global GPIO_ENABLE # Try Function try: # Reset GPIOs LED_Handle.clear_bank_1(1< {}'.format(Buffer[len(Buffer)-1-i],Buffer2[i])) print(' {} -> {}'.format('0','Bit reserved (no information)')) print(' {} -> {}'.format('0','Bit reserved (no information)')) input('\nPress \'Enter\' to exit function.') header() # xtender_comm_ext def xtender_comm_ext(OBJ_Handle, LED_Handle, COM_Handle, LED2): # Description while True: print('This function uses the methods:\n') print('\'get_write_frame_ext(self, object_type, object_id, property_id, property_data, data_format)\'\n') print('and\n') print('\'get_read_frame_ext(self, object_type, object_id, property_id)\'\n') print('from the Xcom_API-Class to generate a byte-frame for a read or write instruction. You can decide, whether you use the') print('generated byte-frame to send it over a serial-connection or not. If not, it will display the generated byte-frame and') print('close the function\n') print('If you will send the byte-frame, it will decode the answer of the xtender-system with the:\n') print('\'get_data_from_frame_ext(self, bytearray_of_frame, data_format)\'\n') print('method. If the answer from the xtender-system has an error, the error-message will be displayed with the:\n') print('\'get_text_from_error_id(self, error_id)\'\n') print('method, otherwise it display the answer of the received frame. With the:\n') print('\'get_bin_from_frame_flags(self, bytearray_of_frame)\'\n') print('and\n') print('\'get_text_from_frame_flags(self, bytearray_of_frame)\'\n') print('method the state of the frame-flags will be displayed.\n\n\n') Buffer = input('Do you want to continue with this function (yes/no): ') if Buffer == 'yes' or Buffer == 'y': header() break elif Buffer == 'no' or Buffer =='n': header() return else: header() print('What do you mean?\n') # Ask for task and generate Byte_Frame while True: Buffer = input('Type in which frame you want to generate (read/write): ') if Buffer == 'read' or Buffer == 'r': try: header() print('Known object_type:') print(' info\n parameter\n message\n datalog_field\n datalog_transfer\n') Buffer = input('Type in an object_type: ') if Buffer == 'info': object_type = Xcom_API._object_type_info elif Buffer == 'parameter': object_type = Xcom_API._object_type_parameter elif Buffer == 'message': object_type = Xcom_API._object_type_message elif Buffer == 'datalog_field': object_type = Xcom_API._object_type_datalog_field elif Buffer == 'datalog_transfer': object_type = Xcom_API._object_type_datalog_transfer else: raise ValueError header() object_id = input('Type in an object_id: ') header() print('Known property_id:') print(' value\n string\n value_qsp\n min_qsp\n max_qsp\n level_qsp\n') Buffer = input('Type in a property_id: ') if Buffer == 'value': property_id = Xcom_API._property_id_value elif Buffer == 'string': property_id = Xcom_API._property_id_string elif Buffer == 'value_qsp': property_id = Xcom_API._property_id_value_qsp elif Buffer == 'min_qsp': property_id = Xcom_API._property_id_min_qsp elif Buffer == 'max_qsp': property_id = Xcom_API._property_id_max_qsp elif Buffer == 'level_qsp': property_id = Xcom_API._property_id_level_qsp else: raise ValueError header() print('Known data_format:') print(' bool\n format\n short_int\n enum\n short_enum\n long_enum\n error\n int32\n float\n byte\n') Buffer = input('Type in a property_id: ') if Buffer == 'bool': data_format = Xcom_API._format_bool elif Buffer == 'format': data_format = Xcom_API._format_format elif Buffer == 'short_int': data_format = Xcom_API._format_short_int elif Buffer == 'enum': data_format = Xcom_API._format_enum elif Buffer == 'short_enum': data_format = Xcom_API._format_short_enum elif Buffer == 'long_enum': data_format = Xcom_API._format_long_enum elif Buffer == 'error': data_format = Xcom_API._format_error elif Buffer == 'int32': data_format = Xcom_API._format_int32 elif Buffer == 'float': data_format = Xcom_API._format_float elif Buffer == 'byte': data_format = Xcom_API._format_byte else: raise ValueError object_id = int(object_id) Buffer2 = OBJ_Handle.get_read_frame_ext(object_type, object_id, property_id) header() break except: while True: Buffer = input('\nobject_type, object_id, property_id or data_format not supported.\nDo you want to try it again (yes/no): ') if Buffer == 'yes' or Buffer == 'y': header() break elif Buffer == 'no' or Buffer =='n': header() return else: print('\nWhat do you mean?') elif Buffer == 'write' or Buffer =='w': try: header() print('Known object_type:') print(' info\n parameter\n message\n datalog_field\n datalog_transfer\n') Buffer = input('Type in an object_type: ') if Buffer == 'info': object_type = Xcom_API._object_type_info elif Buffer == 'parameter': object_type = Xcom_API._object_type_parameter elif Buffer == 'message': object_type = Xcom_API._object_type_message elif Buffer == 'datalog_field': object_type = Xcom_API._object_type_datalog_field elif Buffer == 'datalog_transfer': object_type = Xcom_API._object_type_datalog_transfer else: raise ValueError header() object_id = input('Type in an object_id: ') header() print('Known property_id:') print(' value\n string\n value_qsp\n min_qsp\n max_qsp\n level_qsp\n') Buffer = input('Type in a property_id: ') if Buffer == 'value': property_id = Xcom_API._property_id_value elif Buffer == 'string': property_id = Xcom_API._property_id_string elif Buffer == 'value_qsp': property_id = Xcom_API._property_id_value_qsp elif Buffer == 'min_qsp': property_id = Xcom_API._property_id_min_qsp elif Buffer == 'max_qsp': property_id = Xcom_API._property_id_max_qsp elif Buffer == 'level_qsp': property_id = Xcom_API._property_id_level_qsp else: raise ValueError header() property_data = input('Type in a property_data: ') header() print('Known data_format:') print(' bool\n format\n short_int\n enum\n short_enum\n long_enum\n error\n int32\n float\n byte\n') Buffer = input('Type in a property_id: ') if Buffer == 'bool': data_format = Xcom_API._format_bool elif Buffer == 'format': data_format = Xcom_API._format_format elif Buffer == 'short_int': data_format = Xcom_API._format_short_int elif Buffer == 'enum': data_format = Xcom_API._format_enum elif Buffer == 'short_enum': data_format = Xcom_API._format_short_enum elif Buffer == 'long_enum': data_format = Xcom_API._format_long_enum elif Buffer == 'error': data_format = Xcom_API._format_error elif Buffer == 'int32': data_format = Xcom_API._format_int32 elif Buffer == 'float': data_format = Xcom_API._format_float elif Buffer == 'long_enum': data_format = Xcom_API._format_byte else: raise ValueError try: property_data = int(property_data) except: property_data = float(property_data) object_id = int(object_id) Buffer2 = OBJ_Handle.get_write_frame_ext(object_type, object_id, property_id, property_data, data_format) header() break except: while True: Buffer = input('\nobject_type, object_id, property_id, property_data or data_format not supported.\nDo you want to try it again (yes/no): ') if Buffer == 'yes' or Buffer == 'y': header() break elif Buffer == 'no' or Buffer =='n': header() return else: print('\nWhat do you mean?') else: header() print('What do you mean?\n') # Ask for send byte-frame while True: Buffer = input('Do you want to send the Byte-frame over the serial connection (yes/no): ') if Buffer == 'yes' or Buffer == 'y': header() break elif Buffer == 'no' or Buffer =='n': print('\ngenerated byte-frame:') print('\n'.join(hex(i) for i in Buffer2)) input('\nPress \'Enter\' to exit function.') header() return else: header() print('\nWhat do you mean?\n') # send Byteframe com_wite(LED_Handle,LED2,COM_Handle,Buffer2) # read from COM-Port Buffer2 = com_read(LED_Handle,LED2,COM_Handle) # continue with progress input('\nPress \'Enter\' to continue.') header() # get data from frame print value try: Buffer = OBJ_Handle.get_data_from_frame_ext(Buffer2,data_format) if not Buffer[0]: print('Answer from xtender:',Buffer[1],'\n') else: print('An error with error-number:',hex(Buffer[1]),'occure.\n') print('Errortext:',OBJ_Handle.get_text_from_error_id(Buffer[1])) except: print('Failure with frame.') input('\nPress \'Enter\' to exit function.') return # print frame_flag informations Buffer = OBJ_Handle.get_bin_from_frame_flags(Buffer2) print('\nFrame-flags binary:',Buffer) print('Frame-flags binary expression as text is:') Buffer2 = OBJ_Handle.get_text_from_frame_flags(Buffer2) for i in range(0,len(Buffer2)): print(' {} -> {}'.format(Buffer[len(Buffer)-1-i],Buffer2[i])) print(' {} -> {}'.format('0','Bit reserved (no information)')) print(' {} -> {}'.format('0','Bit reserved (no information)')) input('\nPress \'Enter\' to exit function.') header() ########################################################################################################################################## # Main-Program # ########################################################################################################################################## # Try Main-Function try: # print Header header() # run setup if to check arguments an create an object of the class Xcom_API setup() # Configure GPIOs and COM-Port and object of class LED_Handle = open_gpios(LED1,LED2,Hostname) header() COM_Handle = open_com_port(LED_Handle,LED1,COM_PARA) header() OBJ_Handle = create_obj_of_class(destination,source,crc) header() # main while loop: while True: # print Task print('Xcom_API-Information-Methods:') print(' get_source_address()\n get_destination_address()\n is_crc_check_active()\n get_object_counter()\n get_prog_name()\n get_prog_version()\n') print('Test-Functions:') print(' xtender_comm()\n xtender_comm_ext()\n') print('If you want to quit this program type: \'quit\'.\n\n') # get answer Buffer = input('Please type in a method you want to use: ') #Control Buffer and start Function if Buffer == 'quit' or Buffer == 'q': del OBJ_Handle break elif Buffer == 'get_source_address()': header() print('This method returns an integer-value of the source-address of an object.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API(source = {})'.format(OBJ_Handle.get_source_address())) print('>>> Source-Address = OBJ_Handle.get_source_address()') print('>>> Source-Address') print(' {}\n'.format(OBJ_Handle.get_source_address())) input('Press \'Enter\' to continue.') header() elif Buffer == 'get_destination_address()': header() print('This method returns an integer-value of the destination-address of an object.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API(destination = {})'.format(OBJ_Handle.get_destination_address())) print('>>> Destination-Address = OBJ_Handle.get_destination_address()') print('>>> Destination-Address') print(' {}\n'.format(OBJ_Handle.get_destination_address())) input('Press \'Enter\' to continue.') header() elif Buffer == 'is_crc_check_active()': header() print('This method returns a boolean-value of the active-state of the CRC-Check of an object.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API(crc = {})'.format(int(OBJ_Handle.is_crc_check_active()))) print('>>> CRC-Check = OBJ_Handle.is_crc_check_active()') print('>>> CRC-Check') print(' {}\n'.format(OBJ_Handle.is_crc_check_active())) input('Press \'Enter\' to continue.') header() elif Buffer == 'get_object_counter()': header() print('This method returns a integer-value of the counter of active-objects of this class.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API()') print('>>> Object-Counter = OBJ_Handle.get_object_counter()') print('>>> Object-Counter') print(' {}\n'.format(OBJ_Handle.get_object_counter())) print('Or without to use an object of the Xcom_API-Class:') print('>>> Object-Counter = Xcom_API.get_object_counter()') print('>>> Object-Counter') print(' {}\n'.format(Xcom_API.get_object_counter())) input('Press \'Enter\' to continue.') header() elif Buffer == 'get_prog_name()': header() print('This method returns a string of the of the program-name of this class.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API()') print('>>> Program-Name = OBJ_Handle.get_prog_name()') print('>>> Program-Name') print(' {}\n'.format(OBJ_Handle.get_prog_name())) print('Or without to use an object of the Xcom_API-Class:') print('>>> Program-Name = Xcom_API.get_prog_name()') print('>>> Program-Name') print(' {}\n'.format(Xcom_API.get_prog_name())) input('Press \'Enter\' to continue.') header() elif Buffer == 'get_prog_version()': header() print('This method returns a string of the of the program-version of this class.\n') print('Example-Python-Code:') print('>>> OBJ_Handle = Xcom_API()') print('>>> Program-Version = OBJ_Handle.get_prog_version()') print('>>> Program-Version') print(' {}\n'.format(OBJ_Handle.get_prog_version())) print('Or without to use an object of the Xcom_API-Class:') print('>>> Program-Version = Xcom_API.get_prog_version()') print('>>> Program-Version') print(' {}\n'.format(Xcom_API.get_prog_version())) input('Press \'Enter\' to continue.') header() elif Buffer == 'xtender_comm()': header() xtender_comm(OBJ_Handle, LED_Handle, COM_Handle, LED2) header() elif Buffer == 'xtender_comm_ext()': header() xtender_comm_ext(OBJ_Handle, LED_Handle, COM_Handle, LED2) header() else: header() print('What do you mean?\n') # exit program exit() # Handle Errors except KeyboardInterrupt: if signal.SIGINT: print('\n\nProgram closed with Strg+c.') # Exit program safely. finally: print('\nExit Program.') if (('COM_Handle' in locals() and COM_Handle and COM_ENABLE and not COM_ERR) or ('LED_Handle' in locals() and LED_Handle and GPIO_ENABLE and not GPIO_ERR)): print('\nCleanup work:') if 'COM_Handle' in locals() and COM_Handle and COM_ENABLE and not COM_ERR: close_com_port(COM_Handle,LED_Handle,LED1) if 'LED_Handle' in locals() and LED_Handle and GPIO_ENABLE and not GPIO_ERR: close_gpios(LED_Handle,Hostname,LED1,LED2)