import pygame import speech_recognition as sr from openai import OpenAI from pathlib import Path import os import io import soundfile as sf import sounddevice as sd import random import csv # Initialize Pygame mixer for audio playback pygame.mixer.init() # Set your OpenAI API key here api_key = 'sk-proj-wwWaxim1Qt13uqzSS6660xjT3BlbkFJK0rZvx78AJiWG3Ot7d3S' # Replace with your actual OpenAI API key client = OpenAI(api_key=api_key) # Cache for content and previous responses content_cache = {} response_cache = {} # Function to read text file def read_text_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except Exception as e: print(f"Error reading text file: {e}") return "" # Function to read CSV file def read_csv_file(file_path): content = "" try: with open(file_path, mode='r', encoding='utf-8') as file: reader = csv.reader(file) for row in reader: content += ' '.join(row) + ' ' except Exception as e: print(f"Error reading CSV file: {e}") return content # Function to recognize speech from the microphone def recognize_speech(): recognizer = sr.Recognizer() with sr.Microphone() as source: print("Listening...") audio = recognizer.listen(source) try: print("Recognizing...") text = recognizer.recognize_google(audio, language='en-US') # Play a random audio response random_audio = random.choice(["ty.mp3", "th.mp3", "sure.mp3", "sure1.mp3"]) pygame.mixer.music.load(random_audio) pygame.mixer.music.play() print(f"You said: {text}") return text except sr.UnknownValueError: print("Sorry, I did not understand that.") return None except sr.RequestError: print("Sorry, there was an error with the speech recognition service.") return None # Function to create messages for OpenAI API def create_messages(question, file_content): return [ {"role": "system", "content": "Your name is Futurebot..."}, {"role": "user", "content": f"{file_content}\n\nQ: {question}\nA:"} ] # Function to get a response from OpenAI def get_response_from_openai(messages): stream = client.chat.completions.create( model="gpt-3.5-turbo", max_tokens=150, temperature=0.5, messages=messages, stream=True, ) for chunk in stream: yield chunk # Function to generate and play speech in chunks def generate_speech(text): if text.strip(): spoken_response = client.audio.speech.create( model="tts-1", voice="alloy", input=text ) buffer = io.BytesIO() for chunk in spoken_response.iter_bytes(chunk_size=4096): buffer.write(chunk) buffer.seek(0) with sf.SoundFile(buffer, 'r') as sound_file: data = sound_file.read(dtype='int16') sd.play(data, sound_file.samplerate) sd.wait() # Function to load files once at startup def load_files_once(text_file_path, csv_file_path): text_content = read_text_file(text_file_path) csv_content = read_csv_file(csv_file_path) return text_content + ' ' + csv_content # Function to reload files if they have been modified since the last load def load_files_if_updated(text_file_path, csv_file_path, last_mod_time): text_mod_time = os.path.getmtime(text_file_path) csv_mod_time = os.path.getmtime(csv_file_path) if text_mod_time > last_mod_time['text'] or csv_mod_time > last_mod_time['csv']: print("Files updated, reloading...") last_mod_time['text'] = text_mod_time last_mod_time['csv'] = csv_mod_time return read_text_file(text_file_path) + ' ' + read_csv_file(csv_file_path) return None # No update # Function to cache and retrieve responses def get_cached_response(question, combined_content): # Generate a unique key based on question and content cache_key = (question, combined_content) # Check if the response is already cached if cache_key in response_cache: print("Using cached response") return response_cache[cache_key] # Otherwise, generate a new response messages = create_messages(question, combined_content) response_generator = get_response_from_openai(messages) accumulated_response = "" # Process the response chunk by chunk for response_chunk in response_generator: chunk_content = response_chunk.choices[0].delta.content if response_chunk.choices else None # Check if chunk_content is not None if chunk_content: accumulated_response += chunk_content # Check for sentence end or length if '.' in chunk_content or len(accumulated_response) > 600: print(accumulated_response, end="", flush=True) generate_speech(accumulated_response) accumulated_response = "" # Reset accumulated response for the next chunk if accumulated_response: # Generate speech for any remaining text print(accumulated_response, end="", flush=True) generate_speech(accumulated_response) # Cache the response for future use response_cache[cache_key] = accumulated_response return accumulated_response # Main function to handle user query def chatbot(question, combined_content): response = get_cached_response(question, combined_content) print("Answer: ", response) generate_speech(response) if __name__ == "__main__": text_file_path = 'Allinone.txt' # Path to your text file csv_file_path = 'device_data.csv' # Path to your CSV file # Load the files once at startup last_mod_time = {'text': 0, 'csv': 0} combined_content = load_files_once(text_file_path, csv_file_path) last_mod_time['text'] = os.path.getmtime(text_file_path) last_mod_time['csv'] = os.path.getmtime(csv_file_path) # Main loop while True: question = recognize_speech() if question: # Check if files have been updated, and reload if necessary updated_content = load_files_if_updated(text_file_path, csv_file_path, last_mod_time) if updated_content: combined_content = updated_content # Call the chatbot with the current content chatbot(question, combined_content) else: print("Sorry, I didn't get that. Please ask again.")