-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontrol.py
211 lines (184 loc) · 6.25 KB
/
control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# Librerias:
# Libraries:
import os
import sys
import time
import psutil
import asyncio
import requests
import uvicorn
import threading
import subprocess
from fastapi import FastAPI, WebSocket, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
# Configuración de FastAPI:
# Configure FastAPI:
app = FastAPI()
YOUTUBE_API_KEY = "YOUR_YOUTUBE_API_KEY"
YOUTUBE_SEARCH_URL = "https://www.googleapis.com/youtube/v3/search"
clients = []
playlist = []
current_index = 0
# Verificar la validez de la clave de API de YouTube:
# Verify YouTube API key validity:
def verify_youtube_api_key():
try:
params = {
"part": "snippet",
"q": "test",
"key": YOUTUBE_API_KEY,
"type": "video",
"maxResults": 1,
}
response = requests.get(YOUTUBE_SEARCH_URL, params=params)
response.raise_for_status()
print("✅ YouTube API key is valid")
return True
except requests.RequestException as e:
print(f"❌ Error verifying YouTube API key: {e}")
return False
# Configuración de CORS:
# CORS:
cors = CORSMiddleware(
app=app,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Montar directorios estáticos:
# Mount static directories:
app.mount("/player", StaticFiles(directory="player"), name="player")
app.mount("/control", StaticFiles(directory="control"), name="control")
# Buscar música:
# Search music:
@app.get("/search-music")
def search_music(q: str = Query(..., min_length=1)):
try:
params = {
"part": "snippet",
"q": q,
"key": YOUTUBE_API_KEY,
"type": "video",
"maxResults": 15,
"videoCategoryId": "10",
}
# Imprimir parámetros para depuración:
# Print parameters for debugging
print(f"Sending request to YouTube with parameters: {params}")
response = requests.get(YOUTUBE_SEARCH_URL, params=params)
# Imprimir la URL completa para depuración:
# Print complete URL for debugging
print(f"Complete URL: {response.url}")
response.raise_for_status()
# Obtener los resultados de la respuesta:
# Get results from the response
data = response.json()
results = []
for item in data.get("items", []):
try:
video_id = item["id"]["videoId"]
snippet = item["snippet"]
results.append({
"video_id": video_id,
"title": snippet["title"],
"artist": snippet["channelTitle"],
"thumbnail": snippet["thumbnails"]["default"]["url"]
})
except KeyError as e:
print(f"Error processing item: {e}")
continue
return {"results": results}
except requests.RequestException as e:
print(f"Error in YouTube request: {e}")
return JSONResponse(
status_code=500,
content={"error": "Error connecting to YouTube API"}
)
except Exception as e:
print(f"Unexpected error: {e}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
# Página del reproductor:
# Player page:
@app.get("/player", response_class=HTMLResponse)
def player():
return open("player/index.html", encoding="utf-8").read()
# Página de control:
# Control page:
@app.get("/control", response_class=HTMLResponse)
def control():
return open("control/index.html", encoding="utf-8").read()
# Conexión WebSocket:
# WebSocket connection:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
clients.append(websocket)
global current_index
try:
while True:
command = await websocket.receive_text()
if command == "next":
if playlist:
current_index = (current_index + 1) % len(playlist)
command = f"play:{playlist[current_index]}"
elif command == "prev":
if playlist:
current_index = (current_index - 1 + len(playlist)) % len(playlist)
command = f"play:{playlist[current_index]}"
for client in clients:
await client.send_text(command)
except:
clients.remove(websocket)
# Apagar el control y el reproductor:
# Shutdown the control and the player:
@app.get("/shutdown", response_class=HTMLResponse)
async def exit_server():
for client in clients:
try:
await client.send_text("server_shutdown")
except:
pass
for client in clients.copy():
try:
await client.close()
except:
pass
clients.clear()
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'player.py' in ' '.join(proc.info['cmdline']):
proc.kill()
except:
pass
import os
import signal
os.kill(os.getpid(), signal.SIGTERM)
return "Server and processes terminated."
# Ejecutar el control y el reproductor:
# Run the control and player:
if __name__ == "__main__":
def run_api():
uvicorn.run(app, host="0.0.0.0", port=8000)
# Verificar la validez de la clave de API de YouTube:
# Verify YouTube API key validity:
if not verify_youtube_api_key():
print("❌ YouTube API key is not valid. Music search will not work correctly.")
print("⚠️ Please update the API key in control.py")
print("🌐 For more information visit: https://developers.google.com/youtube?hl=es_419")
# Ejecutar el control:
# Run the control:
api_thread = threading.Thread(target=run_api)
api_thread.start()
# Esperar a que el control se cargue:
# Wait for the control to load:
time.sleep(3)
# Ejecutar el reproductor:
# Run the player:
venv_python = os.path.join(os.path.dirname(sys.executable), "python.exe")
subprocess.run([venv_python, "player.py"])