123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import pygame
- import speech_recognition as sr
- from openai import OpenAI
- from pathlib import Path
- import os
- import io
- import soundfile as sf
- import sounddevice as sd
- import random
- import csv
- # Initialize Pygame mixer for audio playback
- pygame.mixer.init()
- # Set your OpenAI API key here
- api_key = 'sk-proj-wwWaxim1Qt13uqzSS6660xjT3BlbkFJK0rZvx78AJiWG3Ot7d3S' # Replace with your actual OpenAI API key
- client = OpenAI(api_key=api_key)
- # Cache for content and previous responses
- content_cache = {}
- response_cache = {}
- # Function to read text file
- def read_text_file(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- return file.read()
- except Exception as e:
- print(f"Error reading text file: {e}")
- return ""
- # Function to read CSV file
- def read_csv_file(file_path):
- content = ""
- try:
- with open(file_path, mode='r', encoding='utf-8') as file:
- reader = csv.reader(file)
- for row in reader:
- content += ' '.join(row) + ' '
- except Exception as e:
- print(f"Error reading CSV file: {e}")
- return content
- # Function to recognize speech from the microphone
- def recognize_speech():
- recognizer = sr.Recognizer()
- with sr.Microphone() as source:
- print("Listening...")
- audio = recognizer.listen(source)
- try:
- print("Recognizing...")
- text = recognizer.recognize_google(audio, language='en-US')
- # Play a random audio response
- random_audio = random.choice(["ty.mp3", "th.mp3", "sure.mp3", "sure1.mp3"])
- pygame.mixer.music.load(random_audio)
- pygame.mixer.music.play()
- print(f"You said: {text}")
- return text
- except sr.UnknownValueError:
- print("Sorry, I did not understand that.")
- return None
- except sr.RequestError:
- print("Sorry, there was an error with the speech recognition service.")
- return None
- # Function to create messages for OpenAI API
- def create_messages(question, file_content):
- return [
- {"role": "system", "content": "Your name is Futurebot..."},
- {"role": "user", "content": f"{file_content}\n\nQ: {question}\nA:"}
- ]
- # Function to get a response from OpenAI
- def get_response_from_openai(messages):
- stream = client.chat.completions.create(
- model="gpt-3.5-turbo",
- max_tokens=150,
- temperature=0.5,
- messages=messages,
- stream=True,
- )
- for chunk in stream:
- yield chunk
- # Function to generate and play speech in chunks
- def generate_speech(text):
- if text.strip():
- spoken_response = client.audio.speech.create(
- model="tts-1",
- voice="alloy",
- input=text
- )
- buffer = io.BytesIO()
- for chunk in spoken_response.iter_bytes(chunk_size=4096):
- buffer.write(chunk)
- buffer.seek(0)
- with sf.SoundFile(buffer, 'r') as sound_file:
- data = sound_file.read(dtype='int16')
- sd.play(data, sound_file.samplerate)
- sd.wait()
- # Function to load files once at startup
- def load_files_once(text_file_path, csv_file_path):
- text_content = read_text_file(text_file_path)
- csv_content = read_csv_file(csv_file_path)
- return text_content + ' ' + csv_content
- # Function to reload files if they have been modified since the last load
- def load_files_if_updated(text_file_path, csv_file_path, last_mod_time):
- text_mod_time = os.path.getmtime(text_file_path)
- csv_mod_time = os.path.getmtime(csv_file_path)
- if text_mod_time > last_mod_time['text'] or csv_mod_time > last_mod_time['csv']:
- print("Files updated, reloading...")
- last_mod_time['text'] = text_mod_time
- last_mod_time['csv'] = csv_mod_time
- return read_text_file(text_file_path) + ' ' + read_csv_file(csv_file_path)
- return None # No update
- # Function to cache and retrieve responses
- def get_cached_response(question, combined_content):
- # Generate a unique key based on question and content
- cache_key = (question, combined_content)
- # Check if the response is already cached
- if cache_key in response_cache:
- print("Using cached response")
- return response_cache[cache_key]
- # Otherwise, generate a new response
- messages = create_messages(question, combined_content)
- response_generator = get_response_from_openai(messages)
- accumulated_response = ""
- # Process the response chunk by chunk
- for response_chunk in response_generator:
- chunk_content = response_chunk.choices[0].delta.content if response_chunk.choices else None
- # Check if chunk_content is not None
- if chunk_content:
- accumulated_response += chunk_content
-
- # Check for sentence end or length
- if '.' in chunk_content or len(accumulated_response) > 600:
- print(accumulated_response, end="", flush=True)
- generate_speech(accumulated_response)
- accumulated_response = "" # Reset accumulated response for the next chunk
- if accumulated_response: # Generate speech for any remaining text
- print(accumulated_response, end="", flush=True)
- generate_speech(accumulated_response)
- # Cache the response for future use
- response_cache[cache_key] = accumulated_response
- return accumulated_response
- # Main function to handle user query
- def chatbot(question, combined_content):
- response = get_cached_response(question, combined_content)
- print("Answer: ", response)
- generate_speech(response)
- if __name__ == "__main__":
- text_file_path = 'Allinone.txt' # Path to your text file
- csv_file_path = 'device_data.csv' # Path to your CSV file
- # Load the files once at startup
- last_mod_time = {'text': 0, 'csv': 0}
- combined_content = load_files_once(text_file_path, csv_file_path)
- last_mod_time['text'] = os.path.getmtime(text_file_path)
- last_mod_time['csv'] = os.path.getmtime(csv_file_path)
- # Main loop
- while True:
- question = recognize_speech()
- if question:
- # Check if files have been updated, and reload if necessary
- updated_content = load_files_if_updated(text_file_path, csv_file_path, last_mod_time)
- if updated_content:
- combined_content = updated_content
- # Call the chatbot with the current content
- chatbot(question, combined_content)
- else:
- print("Sorry, I didn't get that. Please ask again.")
|