123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- ##########################################################################################################################################
- # Python-File Information #
- ##########################################################################################################################################
- #
- # File: Xcom_Test_Program.py
- # Author: Tobias Müller
- # Date: 2017.08.09
- #
- ##########################################################################################################################################
- # Requirements #
- ##########################################################################################################################################
- import sys
- import signal
- import socket
- import os
- import time
- import ptvsd as debug
- import serial as UART
- import pigpio as GPIO
- from Xcom_API import Xcom_API
- ##########################################################################################################################################
- # Class Definition & Description #
- ##########################################################################################################################################
- # This program demonstrate the functionality of the Xcom_API-class. It sends permanently a request for a known object_id and display the
- # answer from the Xtender-System. To use this program a Raspberry Pi with a Raspicomm extender module and a RS232-bridge called
- # Xcom-232i is required.
- ##########################################################################################################################################
- # Program Information #
- ##########################################################################################################################################
- PROG_NAME = 'Xcom_Test_Program'
- PROG_VERSION = 'v1.0'
- ##########################################################################################################################################
- # Debug server-Settings #
- ##########################################################################################################################################
- debug.enable_attach(secret='Debug')
- ##########################################################################################################################################
- # GPIO-Server-Setting #
- ##########################################################################################################################################
- # LED-Ports
- LED1 = 18
- LED2 = 27
- # Detect GPIO-HOST
- Hostname = socket.gethostname()
- # GPIO-State-Variable
- GPIO_ENABLE = False
- # GPIO-ERROR_Variable
- GPIO_ERR = False
- ##########################################################################################################################################
- # COM-Port-Setting #
- ##########################################################################################################################################
- # COM-Parameter-Variable
- COM_PARA = {'Port' : '/dev/ttyAMA0',
- 'Baudrate' : 115200,
- 'Bytesize' : UART.EIGHTBITS,
- 'Parity' : UART.PARITY_EVEN,
- 'Stopbits' : UART.STOPBITS_ONE,
- 'Timeout' : 2,
- 'XON_XOFF' : False,
- 'RTS_CTS' : False,
- 'DSR_DTR' : False}
- # COM-State-Variable
- COM_ENABLE = False
- # COM-ERROR-Variable
- COM_ERR = False
- COM_TIME_ERR = False
- ##########################################################################################################################################
- # Global Variable #
- ##########################################################################################################################################
- object_set = False
- port = ''
- baudrate = ''
- crc = False
- source = ''
- destination = ''
- ##########################################################################################################################################
- # Functions #
- ##########################################################################################################################################
- # exit Programm
- def exit():
- sys.exit(0)
- #print header
- def header():
- # Use Global-Variable
- global port
- global baudrate
- global object_set
- global PROG_NAME
- global PROG_VERSION
- global COM_ENABLE
- global GPIO_ENABLE
- global destination
- global crc
- global source
- # clear terminal
- os.system('cls' if os.name == 'nt' else 'clear')
- #print Header
- print('#' *120)
- print('#' *120)
- print('##',' ' *114,'##')
- print('## Welcome to {} version: {}. This program demonstrate the functionality of the {}-class. ##'.format(PROG_NAME, PROG_VERSION[1:],Xcom_API.get_prog_name()))
- print('## It sends permanently a request for a known object_id and display it.',' ' *42,'##')
- print('##',' ' *114,'##')
- print('## COM-Port: {}{}destination-addr.: {}{}Version Xcom_API: {} ##'.format(port,' '*(26-len(port)),destination,' '*(24-len(destination)),Xcom_API.get_prog_version()[1:]))
- print('## Baudrate: {}{}source-addr.: {}{}Object of Xcom_API set: {}{} ##'.format(baudrate,' '*(26-len(baudrate)),source,' '*(29-len(source)),object_set,' '*(object_set)))
- print('## COM enable: {}{}CRC-Check enable: {}{}GPIO enable: {}{} ##'.format(COM_ENABLE,' '*(19+COM_ENABLE),crc,' '*(20+crc),GPIO_ENABLE,' '*(GPIO_ENABLE)))
- print('##',' ' *114,'##')
- print('#' *120)
- print('#' *120,'\n')
- # setup
- def setup():
- # Use Global-Variable
- global COM_PARA
- global port
- global baudrate
- global destination
- global crc
- global source
- # get COM-Port
- if len(sys.argv) >= 2:
- port = sys.argv[1]
- COM_PARA['Port'] = port
- header()
- else:
- while True:
- Buffer = input('Do you want to use COM-Port: /dev/ttyAMA0? (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- port = COM_PARA['Port']
- header()
- break
- elif Buffer == 'no' or Buffer == 'n':
- port = input('Enter a COM-Port: ')
- COM_PARA['Port'] = port
- header()
- break
- else:
- print('What do you mean?\n')
- # get baudrate
- if len(sys.argv) >= 3 and (sys.argv[2] == '115200' or sys.argv[2] == '38400'):
- baudrate = sys.argv[2]
- COM_PARA['Baudrate'] = int(baudrate)
- header()
- else:
- while True:
- Buffer = input('Do you want to use a Baudrate of: 115200? (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- baudrate = str(COM_PARA['Baudrate'])
- header()
- break
- elif Buffer == 'no' or Buffer == 'n':
- while True:
- baudrate = input('Enter a Baudrate (115200 or 38400): ')
- if baudrate == '115200' or baudrate == '38400':
- COM_PARA['Baudrate'] = int(baudrate)
- header()
- break
- else:
- print('baudrate: {}, is not supported.'.format(baudrate))
- break
- else:
- print('What do you mean?\n')
- # get destination-address
- if len(sys.argv) >= 4:
- destination = sys.argv[3]
- header()
- else:
- while True:
- Buffer = input('Do you want to use the destination-address: 101? (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- destination = str(101)
- header()
- break
- elif Buffer == 'no' or Buffer == 'n':
- while True:
- destination = input('Enter a destination-address: ')
- try:
- if isinstance(int(destination),int):
- header()
- break
- except ValueError:
- print('destination-address: {}, is not an integer.'.format(destination))
- break
- else:
- print('What do you mean?\n')
- # get source-address
- if len(sys.argv) >= 5:
- source = sys.argv[4]
- header()
- else:
- while True:
- Buffer = input('Do you want to use the source-address: 1? (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- source = str(1)
- header()
- break
- elif Buffer == 'no' or Buffer == 'n':
- while True:
- source = input('Enter a source-address: ')
- try:
- if isinstance(int(source),int):
- header()
- break
- except ValueError:
- print('source-address: {}, is not an integer.'.format(source))
- break
- else:
- print('What do you mean?\n')
-
- # get crc check state
- if len(sys.argv) == 6 and (sys.argv[5] == 'False' or sys.argv[5] == 'True' or sys.argv[5] == '1' or sys.argv[5] == '0'):
- if sys.argv[5] == 'True' or sys.argv[5] == '1':
- crc = True
- else:
- crc = False
- header()
- else:
- while True:
- Buffer = input('Do you want to activate CRC-Check? (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- crc = True
- header()
- break
- elif Buffer == 'no' or Buffer == 'n':
- crc = False
- header()
- break
- else:
- print('What do you mean?\n')
- # open GPIOs
- def open_gpios(LED1,LED2,Hostname):
- # Use Global-Variable
- global GPIO_ERR
- global GPIO_ENABLE
- # Try Function
- try:
- # Enable GPIO-Server
- LED_Handle = GPIO.pi(Hostname)
- # GPIO-Mode
- LED_Handle.set_mode(LED1, GPIO.OUTPUT)
- LED_Handle.set_mode(LED2, GPIO.OUTPUT)
- # GPIO-Start-State
- LED_Handle.write(LED1, GPIO.LOW)
- LED_Handle.write(LED2, GPIO.LOW)
- GPIO_ENABLE = True
- # print completed task
- print('GPIOs on \"{}\" are configured.'.format(Hostname))
-
- # return LED-Handle
- return LED_Handle
- # Handle Errors
- except AttributeError:
- print('There is no connection to the \"{}\" possible.\nEnsure that the Deamon-Host-Process\"pigpiod\" has started with root privileges on the Raspberry Pi to connect to the GPIOs.'.format(Hostname))
- GPIO_ERR = True
- exit()
- # close GPIOs
- def close_gpios(LED_Handle,Hostname,LED1,LED2):
- # Use Global-Variable
- global GPIO_ERR
- global GPIO_ENABLE
- # Try Function
- try:
- # Reset GPIOs
- LED_Handle.clear_bank_1(1<<LED1)
- LED_Handle.clear_bank_1(1<<LED2)
- # close GPIO-Server
- LED_Handle.stop()
- GPIO_ENABLE = False
- # print completed task
- print('GPIOs closed.')
- except AttributeError:
- print('There is no connection to the \"{}\" possible.\n GPIOs can not be closed safely.'.format(Hostname))
- GPIO_ERR = True
- exit()
- # open COM-Port
- def open_com_port(LED_Handle,LED1,COM_PARA):
- # Use Global-Variable
- global COM_ENABLE
- global COM_ERR
- # Try Function
- try:
- # open COM-Port
- COM_Handle = UART.Serial(port = COM_PARA['Port'],
- baudrate = COM_PARA['Baudrate'],
- bytesize = COM_PARA['Bytesize'],
- parity = COM_PARA['Parity'],
- stopbits = COM_PARA['Stopbits'],
- timeout = COM_PARA['Timeout'],
- xonxoff = COM_PARA['XON_XOFF'],
- rtscts = COM_PARA['RTS_CTS'],
- dsrdtr = COM_PARA['DSR_DTR'])
-
- COM_ENABLE = True
- # LED1 on
- LED_Handle.write(LED1, GPIO.HIGH)
- # print completed task
- print('COM-Port: \"{}\", is opened.'.format(COM_Handle.port))
-
- # return COM_Handle
- return COM_Handle
- # Handle Errors
- except UART.SerialException:
- print('COM-Port: \"{}\", can not be opened. Make sure it is available and not used by other programs.'.format(COM_PARA['Port']))
- COM_ERR = True
- exit()
- except ValueError:
- print('COM-Port-Parameters are out of spezifications.')
- COM_ERR = True
- exit()
- except TypeError:
- print('Baudrate: {}, is not supported.'.format(COM_PARA['Baudrate']))
- COM_ERR = True
- exit()
- # close COM-Port
- def close_com_port(COM_Handle,LED_Handle,LED1):
- # Use Global-Variable
- global COM_ENABLE
- global COM_ERR
- # Try Function
- try:
- # save Port-Name
- port = COM_Handle.port
- # close COM-Port
- COM_Handle.close()
- COM_ENABLE = False
- # LED1 off
- LED_Handle.write(LED1, GPIO.LOW)
- # print completed task
- print('COM-Port: \"{}\", is closed.'.format(port))
- # Handle Errors
- except UART.SerialException:
- print('COM-Port: \"{}\", can not be closed.'.format(port))
- COM_ERR = True
- exit()
- # write to COM-Port
- def com_wite(LED_Handle,LED2,COM_Handle,BUFFER):
- # LED2 on
- LED_Handle.write(LED2, GPIO.HIGH)
- # sent Byte-Frame
- COM_Handle.write(BUFFER)
- # LED2 off
- LED_Handle.write(LED2, GPIO.LOW)
- # read from COM-Port
- def com_read(LED_Handle,LED2,COM_Handle):
- # LED2 on
- LED_Handle.write(LED2, GPIO.HIGH)
- # read Byteframe
- BUFFER = COM_Handle.read(30)
-
- # LED2 off
- LED_Handle.write(LED2, GPIO.LOW)
-
- # return Buffer
- return BUFFER
- # Create object of class Xcom_API
- def create_obj_of_class(destination,source,crc):
- # Use Global-Variable
- global object_set
- #Try Function
- try:
- # Create an object of class Xcom_API
- OBJ_Handle = Xcom_API(crc=crc,destination=int(destination),source=int(source))
- object_set = True
- return OBJ_Handle
- # Handle Errors
- except:
- print('Destination- and/or source-address are not supported.')
- exit()
- ##########################################################################################################################################
- # Main-Program #
- ##########################################################################################################################################
- # Try Main-Function
- try:
- # print Header
- header()
- # run setup if to check arguments an create an object of the class Xcom_API
- setup()
-
- # Configure GPIOs and COM-Port and object of class
- LED_Handle = open_gpios(LED1,LED2,Hostname)
- header()
- COM_Handle = open_com_port(LED_Handle,LED1,COM_PARA)
- header()
- OBJ_Handle = create_obj_of_class(destination,source,crc)
- header()
- #Ask for Object_ID create Frame for request
- while True:
- try:
- header()
- print('Known Parameter-Numbers (object_id):')
- print(' maximum current of ac source = 1107\n battery charge current = 1138\n smart boost allowed = 1126')
- print(' inverter allowed = 1124\n type of detection of grid loss = 1552\n charger allowed = 1125')
- print(' charger uses only power from ac = 1646\n ac output voltage = 1286\n inverter frequency = 1112')
- print(' transfer relay allowed = 1128\n limitation of the power boost = 1607\n remote entry active = 1545')
- print('\nKnown Information-Numbers (object_id):')
- print(' battery voltage = 3000\n battery charge current = 3005\n battery temperature = 3001')
- print(' battery voltage ripple = 3006\n state of charge = 3007\n number of battery elements = 3050')
- print(' input voltage = 3011\n input current = 3012\n input frequency = 3084')
- print(' input power = 3138\n output voltage = 3021\n output current = 3022')
- print(' output frequency = 3085\n output power = 3139\n operating state = 3028')
- print(' boost active = 3019\n state of inverter = 3049\n state of transfer relay = 3020')
- print(' state of output relay = 3030\n state of aux relay 1 = 3031\n state of aux relay 2 = 3032')
- print(' state of ground relay = 3074\n state of neutral transfer relay = 3075\n state of remote entry = 3086')
- Buffer = int(input('\nType in an object_id: '))
- Buffer = OBJ_Handle.get_read_frame(Buffer)
- header()
- break
- except:
- while True:
- Buffer = input('\nobject_id unknown, do you want to try it again (yes/no): ')
- if Buffer == 'yes' or Buffer == 'y':
- header()
- break
- elif Buffer == 'no' or Buffer =='n':
- header()
- exit()
- else:
- print('\nWhat do you mean?')
-
- # main while loop:
- while True:
- # send Byteframe
- com_wite(LED_Handle,LED2,COM_Handle,Buffer)
- # read from COM-Port
- Buffer2 = com_read(LED_Handle,LED2,COM_Handle)
-
- # get data from frame print value
- Buffer2 = OBJ_Handle.get_data_from_frame(Buffer2)
- if not Buffer2[0]:
- print('Answer from xtender:',Buffer2[1])
- else:
- print('Answer from xtender:',OBJ_Handle.get_text_from_error_id(Buffer2[1]))
- # Handle Errors
- except KeyboardInterrupt:
- if signal.SIGINT:
- print('\n\nProgram closed with Strg+c.')
- # Exit program safely.
- finally:
- print('\nExit Program.')
- if (('COM_Handle' in locals() and COM_Handle and COM_ENABLE and not COM_ERR) or
- ('LED_Handle' in locals() and LED_Handle and GPIO_ENABLE and not GPIO_ERR)):
- print('\nCleanup work:')
- if 'COM_Handle' in locals() and COM_Handle and COM_ENABLE and not COM_ERR:
- close_com_port(COM_Handle,LED_Handle,LED1)
- if 'LED_Handle' in locals() and LED_Handle and GPIO_ENABLE and not GPIO_ERR:
- close_gpios(LED_Handle,Hostname,LED1,LED2)
|