-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path!Module_Template.py
More file actions
220 lines (189 loc) · 7.14 KB
/
!Module_Template.py
File metadata and controls
220 lines (189 loc) · 7.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""
Universal Task Module Template
------------------------------
Instructions:
1. Copy this file and rename it for a new task module.
2. Implement your logic in process_item().
3. The module automatically handles:
- Async Telegram client connection
- Groups from CLI (file or direct links)
- Per-group output folders
- Optional --limit
- Live progress tracking
"""
import os
import sys
from telethon import TelegramClient
from telethon.tl.types import User # or other types your module needs
# ---------------------------------------------------------------------------
# Path setup — makes config.py importable from anywhere
# ---------------------------------------------------------------------------
_HERE = os.path.dirname(os.path.abspath(__file__))
if _HERE not in sys.path:
sys.path.insert(0, _HERE)
try:
import config as _cfg
except ImportError:
import getpass
from pathlib import Path
def _load_dotenv(path):
vals = {}
try:
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
vals[key.strip()] = val.strip().strip('"').strip("'")
except OSError:
pass
return vals
class _cfg:
_api_id = _api_hash = None
_session = "session"
VERBOSE = True; INFO = True; SUCCESS = True; PROGRESS = True
WARNING = False; ERROR = False
GROUP_FILE = "groups.txt"
DEFAULT_LIMIT = 1000
REPLY_ITER_LIMIT = 500
_here = Path(__file__).resolve().parent
for _p in dict.fromkeys([_here / ".env", Path.home() / ".env", Path.cwd() / ".env"]):
if _p.is_file():
_d = _load_dotenv(_p)
_api_id = _d.get("API_ID") or _api_id
_api_hash = _d.get("API_HASH") or _api_hash
_session = _d.get("SESSION_NAME", _session)
if _api_id and _api_hash:
print(f"[*] Credentials loaded from {_p}")
break
if not (_api_id and _api_hash):
_api_id = os.getenv("API_ID") or _api_id
_api_hash = os.getenv("API_HASH") or _api_hash
if not (_api_id and _api_hash):
print("\n[!] Telegram API credentials not found.")
print(" Get yours at https://my.telegram.org/apps\n")
while not _api_id:
_raw = input(" API_ID (numeric): ").strip()
if _raw.isdigit():
_api_id = _raw
else:
print(" API_ID must be a number — please try again.")
while not _api_hash:
_raw = getpass.getpass(" API_HASH (hidden): ").strip()
if _raw:
_api_hash = _raw
else:
print(" API_HASH cannot be empty — please try again.")
if input("\n Save to .env in current directory? [y/N] ").strip().lower() == "y":
_env_path = Path.cwd() / ".env"
with open(_env_path, "a", encoding="utf-8") as _f:
_f.write(f"\nAPI_ID={_api_id}\nAPI_HASH={_api_hash}\n")
print(f" Saved to {_env_path}\n")
API_ID = int(_api_id)
API_HASH = str(_api_hash)
SESSION_NAME = _session
API_ID = _cfg.API_ID
API_HASH = _cfg.API_HASH
SESSION_NAME = _cfg.SESSION_NAME
def info(message): print(f"[*] {message}") if _cfg.VERBOSE or _cfg.INFO else None
def error(message): print(f"[!] {message}") if _cfg.VERBOSE or _cfg.ERROR else None
def warning(message): print(f"[!] {message}") if _cfg.VERBOSE or _cfg.WARNING else None
def success(message): print(f"[✓] {message}") if _cfg.VERBOSE or _cfg.SUCCESS else None
def progress(message): print(f"[+] {message}") if _cfg.VERBOSE or _cfg.PROGRESS else None
def get_client():
return TelegramClient(SESSION_NAME, API_ID, API_HASH)
async def connect_client():
client = get_client()
try:
await client.start()
info("Connected to Telegram API")
return client
except Exception as e:
error(f"Failed to connect to Telegram API: {e}")
raise
def read_groups_from_file(file_path=None):
path = file_path or _cfg.GROUP_FILE
if not os.path.isfile(path):
return []
with open(path, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
OUTPUT_DIR = os.getcwd()
# ------------------------
# CLI Arguments
# ------------------------
def get_args(parser):
"""
Add module-specific CLI arguments here
Example:
parser.add_argument("--keyword", type=str, help="Search keyword")
"""
parser.add_argument(
"--limit",
type=int,
default=0,
help="Maximum number of messages/items to scan per group (0 = all)"
)
parser.add_argument(
"--example",
type=str,
default="default",
help="Example argument"
)
# ------------------------
# Main entry point
# ------------------------
async def run(args):
client = await connect_client()
groups = args.groups or read_groups_from_file()
module_output = os.path.join(OUTPUT_DIR, "module_template") # adjust name
for group in groups:
await process_group(client, group, args, module_output)
await client.disconnect()
# ------------------------
# Process each group
# ------------------------
async def process_group(client, group, args, module_output):
"""
Handles per-group output folder, scanning, and progress display.
Calls process_item() for each message/item.
"""
group_safe = group.replace("/", "_")
output_dir = os.path.join(module_output, group_safe)
os.makedirs(output_dir, exist_ok=True)
existing_items = set()
total_messages = args.limit or await client.get_messages_count(group)
scanned = 0
new_items = 0
async for msg in client.iter_messages(group, limit=args.limit or None):
scanned += 1
added = await process_item(client, msg, output_dir, args, existing_items)
if added:
new_items += 1
# Live progress display
print(f"\rScanning messages: {scanned}/{total_messages} | New items: {new_items}", end="")
print(f"\n[✓] Processed {new_items} new items from {group} to {output_dir}")
# ------------------------
# Module-specific logic
# ------------------------
async def process_item(client, msg, output_dir, args, existing_items):
"""
Replace this with your module logic.
Return True if a new item was created (for progress counter), False otherwise.
Examples:
- Saving messages to CSV/JSON
- Downloading media
- Filtering by keyword
"""
# Example: no-op (override this in your module)
return False
if __name__ == "__main__":
import argparse
import asyncio
parser = argparse.ArgumentParser(description="Universal Task Module (Template)")
get_args(parser)
args = parser.parse_args()
try:
asyncio.run(run(args))
except KeyboardInterrupt:
pass