forked from BuddyChewChew/My-Streams
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_AX1S.py
More file actions
77 lines (64 loc) · 2.47 KB
/
update_AX1S.py
File metadata and controls
77 lines (64 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import requests
import os
import re
import tempfile
import shutil
from datetime import datetime
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
def update_playlist():
# Get M3U URL from environment variable
m3u_url = os.getenv('AX1S_SOURCE_URL')
if not m3u_url:
raise ValueError("AX1S_SOURCE_URL environment variable not set")
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
try:
# Fetch the M3U content with timeout
response = http.get(
m3u_url,
timeout=30, # 30 seconds timeout
headers={'User-Agent': 'M3U-Playlist-Updater/1.0'}
)
response.raise_for_status() # Raise an exception for bad status codes
# Use the content as-is since it already includes the EPG URL and header
m3u_content = response.text
# Write to the repository root
output_filename = "AX1S.m3u8"
# Use the current working directory (repository root)
output_path = os.path.join(os.getcwd(), output_filename)
print(f"Writing to: {output_path}")
# Write directly to the output file
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(m3u_content)
# Verify the file was written
if not os.path.exists(output_path):
raise Exception(f"Failed to create {output_filename}")
file_size = os.path.getsize(output_path)
if file_size == 0:
raise Exception(f"Created empty file {output_filename}")
print(f"Successfully updated {output_filename}")
print(f"File size: {file_size} bytes")
except Exception as e:
print(f"Error writing to {output_path}: {str(e)}")
if os.path.exists(output_path):
os.remove(output_path)
raise
return True # Success
except Exception as e:
print(f"Error updating M3U playlist: {str(e)}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
update_playlist()