load lines commented

This commit is contained in:
jothi
2025-09-09 17:28:23 +05:30
parent 6f3f8568ea
commit d0914d2c0e
14 changed files with 1701 additions and 3220 deletions

484
rppds.py Normal file
View File

@@ -0,0 +1,484 @@
#!/usr/bin/env python3
# main.py
import sys
import os
import json
import threading
import queue
import subprocess
from pathlib import Path
from PyQt5.QtWidgets import (
QApplication, QWidget, QStackedWidget, QVBoxLayout, QPushButton, QLabel,
QLineEdit, QTextEdit, QHBoxLayout, QFileDialog, QMessageBox
)
from PyQt5.QtCore import Qt, pyqtSignal, QObject, QThread
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineProfile, QWebEngineDownloadItem
# VOSK imports (make sure vosk and sounddevice installed)
try:
from vosk import Model, KaldiRecognizer
import sounddevice as sd
except Exception as e:
Model = None
KaldiRecognizer = None
sd = None
print("Vosk/sounddevice not available:", e)
# ---------- Helpers & Workers ----------
class WorkerSignals(QObject):
finished = pyqtSignal()
error = pyqtSignal(str)
result = pyqtSignal(object)
progress = pyqtSignal(int)
class GenericWorker(QThread):
"""Simple generic worker that runs a function in a QThread."""
result_ready = pyqtSignal(object)
error = pyqtSignal(str)
def __init__(self, fn, *args, **kwargs):
super().__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
try:
res = self.fn(*self.args, **self.kwargs)
self.result_ready.emit(res)
except Exception as e:
self.error.emit(str(e))
# ---------- Vosk Voice Worker ----------
class VoskWorker(QThread):
text_ready = pyqtSignal(str)
error = pyqtSignal(str)
def __init__(self, model_path, device=None, samplerate=16000, parent=None):
super().__init__(parent)
self.model_path = model_path
self.device = device
self.samplerate = samplerate
self._running = True
def stop(self):
self._running = False
def run(self):
if Model is None or sd is None:
self.error.emit("Vosk or sounddevice not installed")
return
if not Path(self.model_path).exists():
self.error.emit(f"Vosk model not found at {self.model_path}")
return
model = Model(self.model_path)
rec = KaldiRecognizer(model, self.samplerate)
try:
with sd.RawInputStream(samplerate=self.samplerate, blocksize=8000, dtype='int16',
channels=1, device=self.device) as stream:
while self._running:
data = stream.read(4000)[0]
if rec.AcceptWaveform(data):
text = rec.Result()
parsed = json.loads(text).get("text", "")
if parsed:
self.text_ready.emit(parsed)
else:
# partial = rec.PartialResult()
pass
except Exception as e:
self.error.emit(str(e))
# ---------- TTS helper ----------
def speak_text(text):
# Try pyttsx3 first (if installed); else fallback to espeak
try:
import pyttsx3
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
except Exception:
# fallback: use espeak command-line
try:
subprocess.run(['espeak', text], check=False)
except Exception as e:
print("TTS failed:", e)
# ---------- Screens ----------
class LoginWindow(GradientWidget):
API_URL = "https://pds1.iotsignin.com/api/testing/user/get-data"
AUTH_TOKEN = "Bearer sb-eba140ab-74bb-44a4-8d92-70a636940def!b1182|it-rt-dev-cri-stjllphr!b68:616d8991-307b-4ab1-be37-7894a8c6db9d$0p0fE2I7w1Ve23-lVSKQF0ka3mKrTVcKPJYELr-i4nE="
def __init__(self, stack):
super().__init__()
self.stack = stack
self._init_ui()
def _init_ui(self):
layout = QVBoxLayout(self)
layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.setSpacing(40)
logo = QLabel()
pix = QPixmap("cri_logo.png.png")
if not pix.isNull():
pix = pix.scaled(150, 150, Qt.AspectRatioMode.KeepAspectRatio)
logo.setPixmap(pix)
else:
logo.setText("CRI Logo")
logo.setStyleSheet("font-size: 24px; font-weight: bold; color: #333;")
layout.addWidget(logo, alignment=Qt.AlignmentFlag.AlignCenter)
self.email_input = QLineEdit()
self.email_input.setPlaceholderText("USER NAME")
self.email_input.setFixedWidth(400)
self.email_input.setStyleSheet("""
background: rgba(255,255,255,0.7);
border: 2px solid #F3C2C2;
border-radius: 10px;
padding: 12px;
font-size: 20px;
color: #333;
""")
layout.addWidget(self.email_input, alignment=Qt.AlignmentFlag.AlignCenter)
self.password_input = QLineEdit()
self.password_input.setPlaceholderText("PASSWORD")
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.password_input.setFixedWidth(400)
self.password_input.setStyleSheet("""
background: rgba(255,255,255,0.7);
border: 2px solid #F3C2C2;
border-radius: 10px;
padding: 12px;
font-size: 20px;
color: #333;
""")
layout.addWidget(self.password_input, alignment=Qt.AlignmentFlag.AlignCenter)
eye_action = QAction(QIcon("eye_closed.png"), "", self.password_input)
eye_action.setCheckable(True)
self.password_input.addAction(eye_action, QLineEdit.ActionPosition.TrailingPosition)
def toggle_password_visibility(checked):
self.password_input.setEchoMode(
QLineEdit.EchoMode.Normal if checked else QLineEdit.EchoMode.Password
)
eye_action.setIcon(QIcon("eye.png") if checked else QIcon("eye_closed.png"))
eye_action.toggled.connect(toggle_password_visibility)
self.login_btn = QPushButton("LOGIN")
self.login_btn.setFixedWidth(300)
self.login_btn.setStyleSheet("""
QPushButton {
font-size: 22px;
font-weight: bold;
color: white;
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #C71585, stop:1 #DB7093);
border: none;
border-radius: 14px;
padding: 16px;
}
QPushButton:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #D87093, stop:1 #FF69B4);
}
""")
layout.addWidget(self.login_btn, alignment=Qt.AlignmentFlag.AlignCenter)
self.email_input.returnPressed.connect(self.password_input.setFocus)
self.password_input.returnPressed.connect(self.login_btn.setFocus)
self.login_btn.clicked.connect(self.perform_login)
def perform_login(self):
email_input = self.email_input.text().strip()
password = self.password_input.text().strip()
if not email_input or not password:
QMessageBox.warning(self, "Error", "Please enter email and password")
return
headers = {
"Authorization": self.AUTH_TOKEN,
"User-Name": email_input,
"User-Pass": password
}
try:
resp = requests.get(self.API_URL, headers=headers, timeout=10)
resp.raise_for_status()
data = resp.json()
except requests.exceptions.RequestException as e:
QMessageBox.critical(self, "Network Error", f"Failed to connect:\n{e}")
return
except ValueError as e:
QMessageBox.critical(self, "Response Error", f"Invalid response:\n{e}")
return
if data.get("status_code") == "ERROR":
desc = data.get("status_description", "Login failed")
QMessageBox.warning(self, "Login Failed", desc)
else:
user_email = data.get("email", email_input)
QMessageBox.information(self, "Success", "Login successful!")
selector = self.stack.widget(1)
selector.email = user_email
selector.password = password
self.stack.setCurrentIndex(1)
class SelectorScreen(QWidget):
select_bot = pyqtSignal(str)
def __init__(self):
super().__init__()
self._build_ui()
def _build_ui(self):
layout = QVBoxLayout()
layout.addWidget(QLabel("Selector Screen - choose action"))
btn_web = QPushButton("Open WebAssistant")
btn_prod = QPushButton("Production Bot")
btn_invoice = QPushButton("Invoice Bot")
btn_web.clicked.connect(lambda: self.select_bot.emit("webassistant"))
btn_prod.clicked.connect(lambda: self.select_bot.emit("production"))
btn_invoice.clicked.connect(lambda: self.select_bot.emit("invoice"))
layout.addWidget(btn_web)
layout.addWidget(btn_prod)
layout.addWidget(btn_invoice)
self.setLayout(layout)
class WebAssistantScreen(QWidget):
# navigation signals
go_back = pyqtSignal()
def __init__(self, download_folder=None, model_path=None):
super().__init__()
self.download_folder = download_folder or str(Path.home() / "Downloads")
self.model_path = model_path
self.vosk_worker = None
self._build_ui()
def _build_ui(self):
layout = QVBoxLayout()
header = QHBoxLayout()
back_btn = QPushButton("Back")
back_btn.clicked.connect(lambda: self.go_back.emit())
self.url_edit = QLineEdit("https://www.example.com")
go_btn = QPushButton("Go")
go_btn.clicked.connect(self._load_url)
header.addWidget(back_btn)
header.addWidget(self.url_edit)
header.addWidget(go_btn)
layout.addLayout(header)
# Web view
self.webview = QWebEngineView()
profile = QWebEngineProfile.defaultProfile()
profile.downloadRequested.connect(self._on_download_requested)
self.webview.setUrl(self.url_edit.text())
layout.addWidget(self.webview)
# Voice area
vbox = QHBoxLayout()
self.voice_out = QPushButton("Speak Selected Text")
self.voice_out.clicked.connect(self._speak_selected)
self.voice_in = QPushButton("Start Voice Input")
self.voice_in.setCheckable(True)
self.voice_in.clicked.connect(self._toggle_voice)
vbox.addWidget(self.voice_in)
vbox.addWidget(self.voice_out)
layout.addLayout(vbox)
# log
self.log = QTextEdit()
self.log.setReadOnly(True)
layout.addWidget(self.log)
self.setLayout(layout)
def _load_url(self):
url = self.url_edit.text().strip()
if not url.startswith("http"):
url = "http://" + url
self.webview.setUrl(url)
def _on_download_requested(self, download_item: QWebEngineDownloadItem):
# This handles downloads initiated by the web page
suggested = download_item.downloadFileName()
dest, _ = QFileDialog.getSaveFileName(self, "Save File", str(Path(self.download_folder) / suggested))
if not dest:
download_item.cancel()
return
download_item.setPath(dest)
download_item.accept()
download_item.finished.connect(lambda: self.log.append(f"Downloaded: {dest}"))
def _speak_selected(self):
# get selected text from webview (async)
def got_selection(text):
if not text:
self.log.append("No text selected")
return
self.log.append("TTS: " + text)
threading.Thread(target=speak_text, args=(text,), daemon=True).start()
# run JS to get selection
self.webview.page().runJavaScript("window.getSelection().toString();", got_selection)
def _toggle_voice(self, checked):
if checked:
self.voice_in.setText("Stop Voice Input")
self.log.append("Starting voice input...")
self.vosk_worker = VoskWorker(self.model_path or "models/vosk-model-small-en-us-0.15")
self.vosk_worker.text_ready.connect(self._on_voice_text)
self.vosk_worker.error.connect(self._on_voice_error)
self.vosk_worker.start()
else:
self.voice_in.setText("Start Voice Input")
if self.vosk_worker:
self.vosk_worker.stop()
self.vosk_worker = None
self.log.append("Stopped voice input")
def _on_voice_text(self, text):
self.log.append(f"Voice: {text}")
# You can send the recognized text to the web page or your bot API
# Example: inject into active text field
js = f"""
(function() {{
var el = document.activeElement;
if (el && ('value' in el)) {{
el.value = el.value + " {text}";
}} else {{
console.log("No active element");
}}
}})();
"""
self.webview.page().runJavaScript(js)
def _on_voice_error(self, err):
self.log.append("Voice error: " + str(err))
class BotBaseScreen(QWidget):
go_back = pyqtSignal()
def __init__(self, bot_name="bot"):
super().__init__()
self.bot_name = bot_name
self._build_ui()
def _build_ui(self):
layout = QVBoxLayout()
header = QHBoxLayout()
back_btn = QPushButton("Back")
back_btn.clicked.connect(lambda: self.go_back.emit())
header.addWidget(QLabel(f"{self.bot_name.capitalize()}"))
header.addWidget(back_btn)
layout.addLayout(header)
self.log = QTextEdit()
self.log.setReadOnly(True)
layout.addWidget(self.log)
# Example controls for sending API calls
self.send_btn = QPushButton("Run Bot Task (API call)")
self.send_btn.clicked.connect(self._run_task)
layout.addWidget(self.send_btn)
self.setLayout(layout)
def _run_task(self):
self.log.append(f"Starting {self.bot_name} job...")
# placeholder for heavy work - use GenericWorker to avoid blocking UI
def fake_job():
import time
time.sleep(2)
return {"status": "ok", "bot": self.bot_name}
worker = GenericWorker(fake_job)
worker.result_ready.connect(lambda r: self.log.append(str(r)))
worker.error.connect(lambda e: self.log.append("Error: " + e))
worker.start()
class ProductionBotScreen(BotBaseScreen):
def __init__(self):
super().__init__(bot_name="production")
class InvoiceBotScreen(BotBaseScreen):
def __init__(self):
super().__init__(bot_name="invoice")
# ---------- Application Controller ----------
class AppController(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("Pi Assistant App")
self.resize(1024, 720)
layout = QVBoxLayout()
self.stack = QStackedWidget()
layout.addWidget(self.stack)
self.setLayout(layout)
# screens
self.login = LoginScreen()
self.selector = SelectorScreen()
self.webassistant = WebAssistantScreen(model_path="models/vosk-model-small-en-us-0.15")
self.production = ProductionBotScreen()
self.invoice = InvoiceBotScreen()
# add to stack
self.stack.addWidget(self.login) # idx 0
self.stack.addWidget(self.selector) # idx 1
self.stack.addWidget(self.webassistant) # idx 2
self.stack.addWidget(self.production) # idx 3
self.stack.addWidget(self.invoice) # idx 4
# wire signals
self.login.login_success.connect(self.on_login)
self.selector.select_bot.connect(self.on_select)
self.webassistant.go_back.connect(self.show_selector)
self.production.go_back.connect(self.show_selector)
self.invoice.go_back.connect(self.show_selector)
self.show_login()
def show_login(self):
self.stack.setCurrentWidget(self.login)
def show_selector(self):
self.stack.setCurrentWidget(self.selector)
def show_webassistant(self):
self.stack.setCurrentWidget(self.webassistant)
def show_production(self):
self.stack.setCurrentWidget(self.production)
def show_invoice(self):
self.stack.setCurrentWidget(self.invoice)
def on_login(self, user_info):
# Called after login: you can check roles and direct user
print("Logged in:", user_info)
self.show_selector()
def on_select(self, name):
if name == "webassistant":
self.show_webassistant()
elif name == "production":
self.show_production()
elif name == "invoice":
self.show_invoice()
# ---------- main ----------
def main():
app = QApplication(sys.argv)
# ensure WebEngine resources are loaded properly
controller = AppController()
controller.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()