-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·557 lines (465 loc) · 19.7 KB
/
main.py
File metadata and controls
executable file
·557 lines (465 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
import warnings
warnings.filterwarnings('ignore') # Suppress all warnings
import os
# Set environment variables to suppress warnings from various libraries
os.environ['PYTHONWARNINGS'] = 'ignore' # Suppress Python warnings
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
import time
import torch
from transformers import pipeline
from pynput import keyboard
import subprocess # Add subprocess module for running shell commands
# Replace PyQt5 with Tkinter
import tkinter as tk
from tkinter import scrolledtext
# Suppress PyTorch warnings
torch.set_warn_always(False)
# Global variables to store the loaded models
transcriber = None
text_generator = None
# Flag to control the application running state
running = True
# Track modifier key states
alt_pressed = False
key_1_pressed = False # Track when '1' key is pressed
key_2_pressed = False # Track when '2' key is pressed
key_4_pressed = False # Track when '4' key is pressed
key_5_pressed = False # Track when '5' key is pressed
# New variables for continuous recording
recording = False
recording_for_generation = False # Track if we're recording for text generation
audio_frames = []
gen_audio_frames = [] # Separate frames for text generation
sample_rate = 16000
stream = None
gen_stream = None # Separate stream for text generation
# Add variables to track shortcut press state
alt_1_pressed = False # Track if Alt+1 were pressed simultaneously
alt_2_pressed = False # Track if Alt+2 were pressed simultaneously
alt_4_pressed = False # Track if Alt+4 were pressed simultaneously
alt_5_pressed = False # Track if Alt+5 were pressed simultaneously
# Store the latest transcription
last_transcription = ""
# App launcher recording variables
app_recording = False
app_audio_frames = []
app_stream = None
# Replace PyQt5 app reference with Tkinter root
tk_root = None
def initialize_models():
"""Initialize both the transcription and text generation models at startup."""
global transcriber, text_generator
print("Initializing speech recognition model...")
# Set up device for torch
if torch.backends.mps.is_available():
device = torch.device("cuda")
print("Using CUDA")
else:
device = torch.device("cpu")
print("CUDA not available, using CPU")
# Initialize the pipeline for automatic speech recognition with the device
transcriber = pipeline("automatic-speech-recognition",
model="distil-whisper/distil-small.en",
device=device)
print("Speech recognition model loaded!")
# Initialize the text generation pipeline with Qwen
print("Initializing text generation model...")
text_generator = pipeline(
'text-generation',
model="Qwen/Qwen2.5-0.5B-Instruct",
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
print("Text generation model loaded and ready!")
def generate_text(prompt, system_instruction=None):
"""Generate text using the text generation model."""
global text_generator
if text_generator is None:
print("Error: Text generation model not initialized!")
return "Text generation failed - model not loaded"
print("Generating text from prompt...")
# Set default system instruction if none provided
if system_instruction is None:
system_instruction = "You are a helpful AI assistant. Directly address the query, without providing any extra information in the beginning or end."
try:
messages = [
{"role": "system", "content": system_instruction},
{"role": "user", "content": prompt}
]
result = text_generator(
messages,
max_length=1000,
do_sample=False
)
# Extract the assistant's response from the generated text
generated_content = result[0]['generated_text'][2]['content']
return generated_content
except Exception as e:
print(f"Text generation error: {e}")
return f"Text generation failed: {str(e)}"
def audio_callback(indata, frames, time, status):
"""This is called for each audio block during recording."""
if status:
print(f"Audio callback status: {status}")
audio_frames.append(indata.copy())
def gen_audio_callback(indata, frames, time, status):
"""This is called for each audio block during recording for text generation."""
if status:
print(f"Audio callback status: {status}")
gen_audio_frames.append(indata.copy())
def app_audio_callback(indata, frames, time, status):
"""Audio callback for app launcher recording."""
if status:
print(f"Audio callback status: {status}")
app_audio_frames.append(indata.copy())
def start_recording():
"""Start continuous recording."""
global recording, audio_frames, stream, sample_rate
recording = True
audio_frames = []
# Start audio stream for continuous recording
stream = sd.InputStream(callback=audio_callback, channels=1,
samplerate=sample_rate, dtype='int16')
stream.start()
print("Recording started... Speak while holding Alt+1")
def stop_recording():
"""Stop recording and process the captured audio."""
global recording, audio_frames, stream, last_transcription
if not recording or stream is None:
return
recording = False
stream.stop()
stream.close()
print("Recording stopped!")
if len(audio_frames) > 0:
# Convert audio frames to a single numpy array
audio_data = np.concatenate(audio_frames, axis=0)
# Save audio to file and transcribe
audio_file = save_audio(audio_data, sample_rate)
transcription = transcribe_audio(audio_file)
last_transcription = transcription # Store for potential text generation
print("\nTranscription:")
print("-" * 50)
print(transcription)
print("-" * 50)
# Use xdotool to type the transcribed text
try:
subprocess.run(["xdotool", "type", transcription], check=True)
print("Text inserted with xdotool")
except subprocess.CalledProcessError as e:
print(f"Error using xdotool: {e}")
except FileNotFoundError:
print("xdotool not found. Please install it with 'sudo apt-get install xdotool'")
else:
print("No audio recorded or recording was too short")
def start_recording_for_generation():
"""Start continuous recording for text generation."""
global recording_for_generation, gen_audio_frames, gen_stream, sample_rate
recording_for_generation = True
gen_audio_frames = []
# Start audio stream for continuous recording
gen_stream = sd.InputStream(callback=gen_audio_callback, channels=1,
samplerate=sample_rate, dtype='int16')
gen_stream.start()
print("Recording started for text generation... Speak while holding Alt+2")
def stop_recording_and_generate():
"""Stop recording, transcribe audio, and generate text from the transcription."""
global recording_for_generation, gen_audio_frames, gen_stream
if not recording_for_generation or gen_stream is None:
return
recording_for_generation = False
gen_stream.stop()
gen_stream.close()
print("Recording stopped for text generation!")
if len(gen_audio_frames) > 0:
# Convert audio frames to a single numpy array
audio_data = np.concatenate(gen_audio_frames, axis=0)
# Save audio to file and transcribe
audio_file = save_audio(audio_data, sample_rate, "gen_audio.wav")
transcription = transcribe_audio(audio_file)
print("\nTranscription for text generation:")
print("-" * 50)
print(transcription)
print("-" * 50)
# Generate text with the standard assistant system instruction
print("Generating text from transcription...")
generated_text = generate_text(transcription)
print("\nGenerated Text:")
print("-" * 50)
print(generated_text)
print("-" * 50)
# Split the generated text by newlines
lines = generated_text.split("\n")
try:
# Type each line separately with xdotool
for i, line in enumerate(lines):
# Type the current line
subprocess.run(["xdotool", "type", line], check=True)
# If not the last line, press Enter to create a new line
if i < len(lines) - 1:
subprocess.run(["xdotool", "key", "Return"], check=True)
print("Generated text inserted with xdotool line by line")
except subprocess.CalledProcessError as e:
print(f"Error using xdotool: {e}")
except FileNotFoundError:
print("xdotool not found. Please install it with 'sudo apt-get install xdotool'")
else:
print("No audio recorded or recording was too short")
def start_recording_for_app_launch():
"""Start recording for application launch."""
global app_recording, app_audio_frames, app_stream, sample_rate
app_recording = True
app_audio_frames = []
# Start audio stream for continuous recording
app_stream = sd.InputStream(callback=app_audio_callback, channels=1,
samplerate=sample_rate, dtype='int16')
app_stream.start()
print("Recording started for app launch... Speak while holding Alt+4")
def stop_recording_and_launch_app():
"""Stop recording, transcribe audio, and launch appropriate application."""
global app_recording, app_audio_frames, app_stream
if not app_recording or app_stream is None:
return
app_recording = False
app_stream.stop()
app_stream.close()
print("Recording stopped for app launch!")
if len(app_audio_frames) > 0:
# Convert audio frames to a single numpy array
audio_data = np.concatenate(app_audio_frames, axis=0)
# Save audio to file and transcribe
audio_file = save_audio(audio_data, sample_rate, "app_audio.wav")
transcription = transcribe_audio(audio_file)
print("\nTranscription for app launch:")
print("-" * 50)
print(transcription)
print("-" * 50)
# Launch the application based on transcription
open_applications(transcription)
print("Application launch command processed")
else:
print("No audio recorded or recording was too short")
def save_audio(audio_data, sample_rate, filename="recorded_audio.wav"):
"""Save audio data to a WAV file."""
wav.write(filename, sample_rate, audio_data)
return filename
def transcribe_audio(audio_file):
"""Transcribe audio using the pre-loaded model."""
print("Transcribing audio...")
# Use the global model that's already loaded
global transcriber
if transcriber is None:
print("Error: Model not initialized!")
return "Transcription failed - model not loaded"
try:
# Add return_timestamps=True to handle longer recordings (>30 seconds)
result = transcriber(audio_file, return_timestamps=True)
return result["text"]
except Exception as e:
print(f"Transcription error: {e}")
# For very long recordings, try chunking as fallback
print("Attempting to process audio with chunking...")
try:
result = transcriber(audio_file, return_timestamps=True, chunk_length_s=30)
return result["text"]
except Exception as e2:
print(f"Chunking attempt also failed: {e2}")
return "Transcription failed - audio may be too long or in incorrect format"
def open_applications(query):
"""Launch applications based on keywords in the query."""
app_mapping = {
"code": "code",
"browser": "firefox",
"file": "nautilus"
}
query = query.lower()
# Check for each application keyword in the query
for keyword, command in app_mapping.items():
if keyword in query:
try:
subprocess.Popen([command])
print(f"Launching {command}...")
except Exception as e:
print(f"Failed to launch {command}: {e}")
def extract_selected_text():
"""Extract currently selected text using xclip."""
try:
# Get the selected text from the primary selection
selected_text = subprocess.check_output(['xclip', '-o', '-selection', 'primary'],
universal_newlines=True)
return selected_text
except subprocess.CalledProcessError as e:
print(f"Error using xclip: {e}")
return None
except FileNotFoundError:
print("xclip not found. Please install it with 'sudo apt-get install xclip'")
return None
def summarize_selected_text():
"""Extract selected text and generate a summary."""
# Extract the selected text
selected_text = extract_selected_text()
if not selected_text or selected_text.strip() == "":
print("No text selected or clipboard is empty.")
return
print("Selected text extracted. Generating summary...")
# Generate summary with specific summarization instruction
system_instruction = "You are a summarization assistant. Provide a concise summary of the given text without adding any extra information or commentary."
summary = generate_text(selected_text, system_instruction)
print("\nSummary:")
print("-" * 50)
print(summary)
print("-" * 50)
# Split the summary by periods to get sentences
sentences = summary.split('.')
try:
# Type each sentence separately with xdotool, followed by newline
for i, sentence in enumerate(sentences):
# Skip empty sentences
sentence = sentence.strip()
if not sentence:
continue
# Add period back except for the last sentence (which might not have had one)
if i < len(sentences) - 1 or summary.endswith('.'):
sentence = sentence + '.'
# Type the current sentence
subprocess.run(["xdotool", "type", sentence], check=True)
# Press Enter to create a new line after each sentence
subprocess.run(["xdotool", "key", "Return"], check=True)
print("Summary inserted with xdotool sentence by sentence")
except subprocess.CalledProcessError as e:
print(f"Error using xdotool: {e}")
except FileNotFoundError:
print("xdotool not found. Please install it with 'sudo apt-get install xdotool'")
def on_key_press(key):
"""Handle key press events."""
global running, alt_pressed, key_1_pressed, key_2_pressed, key_4_pressed, key_5_pressed
global recording, recording_for_generation, app_recording
global alt_1_pressed, alt_2_pressed, alt_4_pressed, alt_5_pressed, last_transcription
# Check for Alt key (left Alt only)
if key == keyboard.Key.alt_l:
alt_pressed = True
# Check for '1', '2', '4', '5' keys
try:
if hasattr(key, 'char'):
if key.char == '1':
key_1_pressed = True
elif key.char == '2':
key_2_pressed = True
elif key.char == '4':
key_4_pressed = True
elif key.char == '5':
key_5_pressed = True
except AttributeError:
pass
# Toggle recording when Alt+1 are pressed
if alt_pressed and key_1_pressed and not alt_1_pressed:
alt_1_pressed = True
# Toggle recording state
if not recording:
start_recording()
else:
stop_recording()
# Toggle text generation recording when Alt+2 is pressed
if alt_pressed and key_2_pressed and not alt_2_pressed:
alt_2_pressed = True
# Toggle recording for generation state
if not recording_for_generation:
start_recording_for_generation()
else:
stop_recording_and_generate()
# Toggle app launcher recording when Alt+4 is pressed
if alt_pressed and key_4_pressed and not alt_4_pressed:
alt_4_pressed = True
# Toggle recording for app launch state
if not app_recording:
start_recording_for_app_launch()
else:
stop_recording_and_launch_app()
# Handle Alt+5 for text summarization
if alt_pressed and key_5_pressed and not alt_5_pressed:
alt_5_pressed = True
summarize_selected_text()
def on_key_release(key):
"""Handle key release events."""
global alt_pressed, key_1_pressed, key_2_pressed, key_4_pressed, key_5_pressed
global alt_1_pressed, alt_2_pressed, alt_4_pressed, alt_5_pressed
# Reset modifier key states when released (left Alt only)
if key == keyboard.Key.alt_l:
alt_pressed = False
# Reset key combination flags
alt_1_pressed = False if not key_1_pressed else alt_1_pressed
alt_2_pressed = False if not key_2_pressed else alt_2_pressed
alt_4_pressed = False if not key_4_pressed else alt_4_pressed
alt_5_pressed = False if not key_5_pressed else alt_5_pressed
# Check for key releases
try:
if hasattr(key, 'char'):
if key.char == '1':
key_1_pressed = False
if not alt_pressed:
alt_1_pressed = False
elif key.char == '2':
key_2_pressed = False
if not alt_pressed:
alt_2_pressed = False
elif key.char == '4':
key_4_pressed = False
if not alt_pressed:
alt_4_pressed = False
elif key.char == '5':
key_5_pressed = False
if not alt_pressed:
alt_5_pressed = False
except AttributeError:
pass
def main():
print("Voice Transcription & Text Generation Tool")
print("=" * 50)
# Initialize the models once at startup
initialize_models()
print("Commands:")
print(" Press 'Alt+1' once to start recording for transcription")
print(" Press 'Alt+1' again to stop recording and transcribe")
print(" Press 'Alt+2' once to start recording for text generation")
print(" Press 'Alt+2' again to stop recording, transcribe and generate text")
print(" Press 'Alt+4' once to start recording for app launch")
print(" Press 'Alt+4' again to stop recording and launch application")
print(" Press 'Alt+5' to summarize selected text")
print("\nWaiting for input...")
# Initialize the keyboard listener with both press and release callbacks
listener = keyboard.Listener(on_press=on_key_press, on_release=on_key_release)
listener.start()
global running, tk_root
try:
while running:
time.sleep(0.1) # Sleep to avoid high CPU usage
# Update tkinter UI if it exists
if tk_root is not None:
try:
tk_root.update()
except tk.TclError:
# If Tkinter is closed, just continue
pass
except KeyboardInterrupt:
print("Program interrupted")
finally:
if recording:
stop_recording()
if recording_for_generation:
gen_stream.stop()
gen_stream.close()
if app_recording:
app_stream.stop()
app_stream.close()
listener.stop()
# Clean up tkinter if it was initialized
if tk_root is not None:
try:
tk_root.destroy()
except:
pass
if __name__ == "__main__":
main()