-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathsaver.py
1150 lines (962 loc) · 47 KB
/
saver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import json
import logging
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
import asyncio
from datetime import datetime
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(
format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
MESSAGES_BATCH_SIZE = 100 # Number of messages to process in one batch
BATCH_DELAY = 2 # Delay between batches in seconds
SAVE_INTERVAL = 300 # Save database every 5 minutes
MAX_RETRIES = 3 # Maximum retries for failed message fetches
class ChannelSaver:
def __init__(self):
# Create temp directory if it doesn't exist
self.temp_dir = 'temp/channel_saver'
os.makedirs(self.temp_dir, exist_ok=True)
# Database file path
self.db_path = os.path.join(self.temp_dir, 'database.json')
# Load or create database
self.db = self.load_database()
# Telegram client
try:
self.api_id = int(os.getenv('API_ID'))
if not self.api_id:
raise ValueError("API_ID not found in environment variables")
self.api_hash = os.getenv('API_HASH')
if not self.api_hash:
raise ValueError("API_HASH not found in environment variables")
except (TypeError, ValueError) as e:
logger.error(f"Error loading API credentials: {e}")
print("\nError: Please make sure API_ID and API_HASH are properly set in .env file")
raise
self.client = None
self.phone = None
def load_database(self):
"""Load database from JSON file or create new if doesn't exist"""
if os.path.exists(self.db_path):
try:
with open(self.db_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Corrupted database file, creating new")
return self.create_new_database()
return self.create_new_database()
def create_new_database(self):
"""Create new database structure"""
db = {
'users': {},
'last_login': None,
'sessions': {},
'active_channel': None # Add active_channel field
}
self.save_database(db)
return db
def save_database(self, db=None):
"""Save database to JSON file"""
if db is None:
db = self.db
with open(self.db_path, 'w') as f:
json.dump(db, f, indent=4, default=str)
async def check_authorized(self):
"""Check if user is already authorized"""
if not self.client:
return False
try:
return await self.client.is_user_authorized()
except Exception as e:
logger.error(f"Error checking authorization: {e}")
return False
async def login(self, force=False):
"""Handle login process"""
if not force:
# Try to restore existing session
if await self.restore_session():
return True
# New login required
self.phone = input('Please enter your phone number (international format): ')
# Create new client
self.client = TelegramClient(
self.get_session_path(self.phone),
self.api_id,
self.api_hash
)
await self.client.connect()
if not await self.client.is_user_authorized():
try:
await self.client.send_code_request(self.phone)
code = input('Enter the code you received: ')
await self.client.sign_in(self.phone, code)
except SessionPasswordNeededError:
# 2FA is enabled
password = input('Please enter your 2FA password: ')
await self.client.sign_in(password=password)
# Save session after successful login
me = await self.client.get_me()
await self.save_session(me)
logger.info(f"Successfully logged in as {me.first_name} (@{me.username})")
return True
async def list_channels(self):
"""List all channels/groups user is subscribed to"""
try:
dialogs = await self.client.get_dialogs()
channels = []
# Collect channel info
for i, dialog in enumerate(dialogs, 1):
if dialog.is_channel or dialog.is_group:
entity = dialog.entity
# Store channel info
channel_info = {
'id': entity.id,
'title': entity.title,
'username': getattr(entity, 'username', None),
'participants_count': getattr(entity, 'participants_count', 0),
'type': 'Channel' if dialog.is_channel else 'Group',
'index': i
}
channels.append(channel_info)
# Sort channels by member count (descending)
channels.sort(key=lambda x: x['participants_count'] or 0, reverse=True)
# Update indices after sorting
for i, channel in enumerate(channels, 1):
channel['index'] = i
if not channels:
print("\nNo channels/groups found!")
return []
# Print header
print("\nAvailable Channels and Groups:")
print(f"{'#':>3} | {'Members':>7} | {'Type':^7} | {'Title':<30} | {'Username':<15}")
print("-" * 70)
# Print each channel on one line
for channel in channels:
# Format members count
members = f"{channel['participants_count']:,}" if channel['participants_count'] else 'N/A'
members = members[:7] # Limit length
# Format username
username = f"@{channel['username']}" if channel['username'] else '-'
username = username[:15] # Limit length
# Format title (with ellipsis if too long)
title = channel['title']
if len(title) > 30:
title = title[:27] + "..."
print(f"{channel['index']:3} | {members:>7} | {channel['type']:<7} | {title:<30} | {username:<15}")
print("-" * 70)
print(f"Total: {len(channels)} channels/groups")
return channels
except Exception as e:
logger.error(f"Error listing channels: {e}")
return []
async def select_active_channel(self):
"""Select active channel/group"""
channels = await self.list_channels()
if not channels:
print("\nNo channels/groups found!")
return False
while True:
try:
choice = input("\nEnter channel number to select (or 0 to cancel): ")
if choice == '0':
return False
index = int(choice)
selected = next((c for c in channels if c['index'] == index), None)
if selected:
# Update active channel in database
self.db['active_channel'] = selected
self.save_database()
print(f"\nSelected channel: {selected['title']}")
return True
else:
print("\nInvalid channel number!")
except ValueError:
print("\nPlease enter a valid number!")
def get_active_channel(self):
"""Get currently active channel from database"""
return self.db.get('active_channel')
async def show_active_channel(self):
"""Display information about active channel"""
active = self.get_active_channel()
if active:
print("\nActive Channel/Group:")
print("--------------------")
print(f"Title: {active['title']}")
print(f"Type: {active['type']}")
if active['username']:
print(f"Username: @{active['username']}")
print(f"ID: {active['id']}")
print(f"Members: {active['participants_count']}")
else:
print("\nNo active channel selected!")
def get_session_path(self, phone):
"""Get full path to session file for given phone number"""
return os.path.join(self.temp_dir, f'user_{phone}')
async def save_session(self, me):
"""Save current session info to database"""
if not self.phone:
return
# Update sessions info
self.db['sessions'][self.phone] = {
'session_file': f'user_{self.phone}',
'created_at': self.db['sessions'].get(self.phone, {}).get('created_at', str(datetime.now())),
'last_used': str(datetime.now()),
'user_id': me.id,
'username': me.username,
'active': True
}
# Update last login
self.db['last_login'] = {
'phone': self.phone,
'user_id': me.id,
'username': me.username,
'date': str(datetime.now())
}
# Deactivate other sessions
for phone in self.db['sessions']:
if phone != self.phone:
self.db['sessions'][phone]['active'] = False
self.save_database()
async def restore_session(self):
"""Try to restore last active session"""
if not self.db.get('sessions'):
return False
# Find active session
active_session = None
active_phone = None
for phone, session in self.db['sessions'].items():
if session.get('active'):
active_session = session
active_phone = phone
break
if not active_session:
return False
# Try to restore session
try:
# Ensure any existing client is disconnected
if self.client:
await self.client.disconnect()
self.client = None
self.phone = active_phone
self.client = TelegramClient(
self.get_session_path(self.phone),
self.api_id,
self.api_hash
)
await self.client.connect()
if await self.check_authorized():
# Update last used time
me = await self.client.get_me()
await self.save_session(me)
logger.info(f"Restored session for {self.phone}")
return True
except Exception as e:
logger.error(f"Failed to restore session: {e}")
if self.client:
await self.client.disconnect()
self.client = None
return False
async def list_sessions(self):
"""Display all saved sessions"""
if not self.db['sessions']:
print("\nNo saved sessions found!")
return
print("\nSaved Sessions:")
print("--------------")
for phone, session in self.db['sessions'].items():
status = "ACTIVE" if session['active'] else "inactive"
print(f"\nPhone: {phone} [{status}]")
print(f"Username: @{session['username']}")
print(f"Created: {session['created_at']}")
print(f"Last used: {session['last_used']}")
async def switch_session(self):
"""Switch to a different saved session"""
if not self.db['sessions']:
print("\nNo saved sessions found!")
return False
await self.list_sessions()
while True:
phone = input("\nEnter phone number to switch to (or 0 to cancel): ")
if phone == '0':
return False
if phone in self.db['sessions']:
# Disconnect current client if exists
if self.client:
await self.client.disconnect()
# Update active status
for p, s in self.db['sessions'].items():
s['active'] = (p == phone)
# Create new client with selected session
self.phone = phone
self.client = TelegramClient(
self.get_session_path(phone),
self.api_id,
self.api_hash
)
await self.client.connect()
if await self.check_authorized():
# Update last used
self.db['sessions'][phone]['last_used'] = datetime.now()
self.save_database()
print(f"\nSwitched to session: {phone}")
return True
else:
print("\nSession is no longer valid!")
return False
else:
print("\nInvalid phone number!")
async def cleanup_sessions(self):
"""Remove invalid sessions"""
if not self.db['sessions']:
print("\nNo sessions to clean up!")
return
print("\nChecking sessions validity...")
invalid = []
for phone, session in self.db['sessions'].items():
# Skip active session
if session['active']:
continue
# Try to connect with session
client = TelegramClient(
self.get_session_path(phone),
self.api_id,
self.api_hash
)
try:
await client.connect()
if not await client.is_user_authorized():
invalid.append(phone)
except Exception:
invalid.append(phone)
finally:
await client.disconnect()
if invalid:
print(f"\nFound {len(invalid)} invalid sessions")
if input("Remove them? (y/N): ").lower() == 'y':
for phone in invalid:
# Remove session file
try:
os.remove(self.get_session_path(phone))
except OSError:
pass
# Remove from database
del self.db['sessions'][phone]
self.save_database()
print("\nInvalid sessions removed!")
else:
print("\nAll sessions are valid!")
async def start(self):
"""Main entry point"""
print("\nWelcome to Channel Saver!")
print("------------------------")
# Ensure clean start
if self.client:
await self.client.disconnect()
self.client = None
# Try to restore session first
if await self.restore_session():
print(f"\nRestored session for {self.phone}")
relogin = False
else:
relogin = True
try:
if await self.login(force=relogin):
print("\nSuccessfully connected!")
while True:
# Show active channel in menu if selected
active = self.get_active_channel()
if active:
print(f"\nActive: {active['title']} ({active['type']})")
print("\nOptions:")
print("1. Show account info")
print("2. List channels/groups")
print("3. Select active channel")
print("4. Show active channel info")
print("5. Save channel users")
print("6. Show users statistics")
print("7. List saved sessions")
print("8. Switch session")
print("9. Cleanup invalid sessions")
print("10. Save channel messages")
print("11. List saved users")
print("12. Search messages")
print("13. Logout")
print("14. Exit")
choice = input("\nEnter your choice (1-14): ")
if choice == '1':
me = await self.client.get_me()
print(f"\nAccount Information:")
print(f"Phone: {self.phone}")
print(f"Username: @{me.username}")
print(f"First Name: {me.first_name}")
print(f"Last Name: {me.last_name}")
print(f"User ID: {me.id}")
elif choice == '2':
await self.list_channels()
elif choice == '3':
await self.select_active_channel()
elif choice == '4':
await self.show_active_channel()
elif choice == '5':
await self.save_channel_users()
elif choice == '6':
await self.show_channel_users_stats()
elif choice == '7':
await self.list_sessions()
elif choice == '8':
await self.switch_session()
elif choice == '9':
await self.cleanup_sessions()
elif choice == '10':
print("\nMessage Download Options:")
print("1. Download new messages only")
print("2. Force redownload all messages")
print("3. Back to main menu")
dl_choice = input("\nEnter choice (1-3): ")
if dl_choice == '1':
limit = input("\nEnter number of messages to save (or press Enter for all): ")
limit = int(limit) if limit.strip() else None
await self.save_channel_messages(limit=limit, force_redownload=False)
elif dl_choice == '2':
confirm = input("\nThis will redownload all messages. Continue? (y/N): ").lower()
if confirm == 'y':
limit = input("\nEnter number of messages to save (or press Enter for all): ")
limit = int(limit) if limit.strip() else None
await self.save_channel_messages(limit=limit, force_redownload=True)
elif dl_choice == '3':
continue
elif choice == '11':
await self.list_saved_users()
elif choice == '12':
await self.search_messages()
elif choice == '13':
await self.client.log_out()
print("\nLogged out successfully!")
if self.phone in self.db['sessions']:
del self.db['sessions'][self.phone]
self.db['last_login'] = None
self.db['active_channel'] = None
self.save_database()
break
elif choice == '14':
break
else:
print("\nInvalid choice!")
finally:
if self.client:
await self.client.disconnect()
self.client = None
async def save_channel_users(self):
"""Save all users from active channel to database"""
active = self.get_active_channel()
if not active:
print("\nNo active channel selected!")
return False
try:
print(f"\nFetching users from {active['title']}...")
# Initialize channel users dict if doesn't exist
channel_id = str(active['id'])
if channel_id not in self.db['users']:
self.db['users'][channel_id] = {}
# Get all participants
participants = await self.client.get_participants(active['id'])
# Counter for progress
total = len(participants)
saved = 0
updated = 0
print(f"\nProcessing {total} users...")
for user in participants:
user_dict = {
'id': user.id,
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'phone': getattr(user, 'phone', None),
'bot': user.bot,
'scam': user.scam,
'fake': user.fake,
'premium': user.premium,
'verified': user.verified,
'restricted': user.restricted,
'last_seen': str(datetime.now())
}
user_id = str(user.id)
if user_id in self.db['users'][channel_id]:
# Update existing user
user_dict['first_seen'] = self.db['users'][channel_id][user_id]['first_seen']
self.db['users'][channel_id][user_id].update(user_dict)
updated += 1
else:
# Add new user
user_dict['first_seen'] = str(datetime.now())
self.db['users'][channel_id][user_id] = user_dict
saved += 1
# Show progress every 10 users
if (saved + updated) % 10 == 0:
print(f"Progress: {saved + updated}/{total}")
self.save_database()
print(f"\nOperation completed!")
print(f"New users saved: {saved}")
print(f"Users updated: {updated}")
return True
except Exception as e:
logger.error(f"Error saving channel users: {e}")
print(f"\nError saving users: {str(e)}")
return False
async def show_channel_users_stats(self):
"""Show statistics about saved users in active channel"""
active = self.get_active_channel()
if not active:
print("\nNo active channel selected!")
return
channel_id = str(active['id'])
if channel_id not in self.db['users']:
print("\nNo saved users for this channel!")
return
users = self.db['users'][channel_id]
total = len(users)
bots = sum(1 for u in users.values() if u['bot'])
premium = sum(1 for u in users.values() if u['premium'])
verified = sum(1 for u in users.values() if u['verified'])
print(f"\nChannel Users Statistics:")
print(f"------------------------")
print(f"Total users saved: {total}")
print(f"Bots: {bots}")
print(f"Premium users: {premium}")
print(f"Verified users: {verified}")
print(f"\nLast update: {max(u['last_seen'] for u in users.values())}")
async def save_channel_messages(self, limit: int = None, force_redownload: bool = False):
"""
Save messages from active channel in reverse order with rate limiting
"""
active = self.get_active_channel()
if not active:
print("\nNo active channel selected!")
return False
try:
print("\n" + "="*50)
print(f"Channel: {active['title']}")
print(f"Type: {active['type']}")
print("="*50)
# Initialize channel messages dict if doesn't exist
channel_id = str(active['id'])
if 'messages' not in self.db:
self.db['messages'] = {}
if channel_id not in self.db['messages']:
self.db['messages'][channel_id] = {}
# Get total message count and first message date
print("\nAnalyzing channel messages...")
first_message = None
last_message = None
# Get first message (oldest)
async for msg in self.client.iter_messages(active['id'], limit=1, reverse=True):
first_message = msg
print(f"First message found: #{msg.id} ({msg.date})")
# Get last message (newest)
async for msg in self.client.iter_messages(active['id'], limit=1):
last_message = msg
print(f"Last message found: #{msg.id} ({msg.date})")
if not first_message or not last_message:
print("\nNo messages found in channel!")
return False
total = last_message.id - first_message.id + 1
if limit:
total = min(total, limit)
print(f"\nChannel Information:")
print(f"First Message: #{first_message.id} ({first_message.date})")
print(f"Last Message: #{last_message.id} ({last_message.date})")
print(f"Total Messages: {total}")
print(f"Batch Size: {MESSAGES_BATCH_SIZE} messages")
print(f"Delay between batches: {BATCH_DELAY} seconds")
confirm = input("\nProceed with message download? (y/N): ").lower()
if confirm != 'y':
print("\nOperation cancelled!")
return False
print("\nStarting message download...")
print("="*50)
# Counters
saved = 0
updated = 0
skipped = 0
errors = 0
retry_count = 0
# Progress tracking
start_time = datetime.now()
last_save_time = start_time
# Process messages in batches
current_id = last_message.id
while current_id >= first_message.id:
try:
print(f"\nFetching batch for messages <= {current_id}")
print(f"Batch parameters:")
print(f"- Channel ID: {active['id']}")
print(f"- Limit: {MESSAGES_BATCH_SIZE}")
print(f"- Min ID: {current_id - MESSAGES_BATCH_SIZE}")
print(f"- Max ID: {current_id}")
# Get batch of messages
batch_messages = []
async for message in self.client.iter_messages(
active['id'],
limit=MESSAGES_BATCH_SIZE,
max_id=current_id
):
batch_messages.append(message)
print(f"Retrieved {len(batch_messages)} messages in batch")
if batch_messages:
print(f"First message in batch: #{batch_messages[0].id}")
print(f"Last message in batch: #{batch_messages[-1].id}")
if not batch_messages:
print("\nNo more messages in batch, breaking loop")
break
# Update current_id for next batch
current_id = min(msg.id for msg in batch_messages) - 1
print(f"Next batch will start from ID: {current_id}")
# Process batch
for message in batch_messages:
try:
# Create message dict with all available fields
message_dict = {
'id': message.id,
'date': str(message.date),
'edit_date': str(message.edit_date) if message.edit_date else None,
'from_id': message.from_id.user_id if message.from_id else None,
'text': message.text,
'raw_text': message.raw_text,
'out': message.out,
'mentioned': message.mentioned,
'media_unread': message.media_unread,
'silent': message.silent,
'post': message.post,
'from_scheduled': message.from_scheduled,
'legacy': message.legacy,
'edit_hide': message.edit_hide,
'pinned': message.pinned,
'noforwards': message.noforwards,
'views': getattr(message, 'views', 0),
'forwards': getattr(message, 'forwards', 0),
'has_media': bool(message.media),
'media_type': type(message.media).__name__ if message.media else None,
'grouped_id': str(message.grouped_id) if message.grouped_id else None,
'reactions': [],
'reply_to': message.reply_to.reply_to_msg_id if message.reply_to else None,
'last_update': str(datetime.now())
}
# Add reactions if present
if hasattr(message, 'reactions') and message.reactions:
try:
for reaction in message.reactions.results:
reaction_data = {
'emoticon': reaction.reaction.emoticon if hasattr(reaction.reaction, 'emoticon') else None,
'document_id': reaction.reaction.document_id if hasattr(reaction.reaction, 'document_id') else None,
'count': reaction.count,
# Only add chosen if it exists
'chosen': getattr(reaction, 'chosen', False)
}
message_dict['reactions'].append(reaction_data)
except Exception as reaction_error:
logger.debug(f"Could not process reactions for message {message.id}: {str(reaction_error)}")
# Add basic reaction info without chosen status
for reaction in message.reactions.results:
try:
reaction_data = {
'emoticon': reaction.reaction.emoticon if hasattr(reaction.reaction, 'emoticon') else None,
'document_id': reaction.reaction.document_id if hasattr(reaction.reaction, 'document_id') else None,
'count': reaction.count
}
message_dict['reactions'].append(reaction_data)
except Exception as e:
logger.debug(f"Skipping malformed reaction in message {message.id}: {str(e)}")
continue
msg_id = str(message.id)
if msg_id in self.db['messages'][channel_id] and not force_redownload:
# Check if message needs update
existing = self.db['messages'][channel_id][msg_id]
if (existing.get('views') != message_dict['views'] or
existing.get('forwards') != message_dict['forwards'] or
existing.get('reactions') != message_dict['reactions']):
self.db['messages'][channel_id][msg_id].update(message_dict)
updated += 1
else:
skipped += 1
else:
# Add new message or force update
self.db['messages'][channel_id][msg_id] = message_dict
saved += 1
except Exception as msg_error:
logger.error(f"Error processing message {message.id}: {str(msg_error)}")
errors += 1
continue
# Update progress
current_time = datetime.now()
elapsed = current_time - start_time
speed = (saved + updated + skipped) / elapsed.total_seconds() if elapsed.total_seconds() > 0 else 0
# Save database periodically
if (current_time - last_save_time).total_seconds() > SAVE_INTERVAL:
self.save_database()
last_save_time = current_time
# Update display
print("\033[F\033[K" * 7)
print(f"Progress: {saved + updated + skipped}/{total} messages ({current_time - start_time})")
print(f"New: {saved} | Updated: {updated} | Skipped: {skipped} | Errors: {errors}")
print(f"Speed: {speed:.1f} messages/second")
print(f"Elapsed: {str(elapsed).split('.')[0]}")
print(f"Current Batch: {len(batch_messages)} messages (ID: {current_id})")
print(f"Retries: {retry_count}/{MAX_RETRIES}")
print("-"*50)
# Reset retry count on successful batch
retry_count = 0
except Exception as batch_error:
# Enhance error logging
print(f"\nDebug: Batch error details:")
print(f"- Error type: {type(batch_error).__name__}")
print(f"- Error message: {str(batch_error)}")
print(f"- Current message ID: {current_id}")
logger.error(f"Error processing batch: {str(batch_error)}", exc_info=True)
retry_count += 1
if retry_count >= MAX_RETRIES:
print(f"\nToo many errors, stopping download at message {current_id}")
break
print(f"\nRetrying batch in {BATCH_DELAY * 2} seconds... ({retry_count}/{MAX_RETRIES})")
await asyncio.sleep(BATCH_DELAY * 2)
# Final save
self.save_database()
# Final statistics
end_time = datetime.now()
elapsed = end_time - start_time
speed = (saved + updated + skipped) / elapsed.total_seconds()
print("\n" + "="*50)
print("Download Completed!")
print("="*50)
print(f"\nFinal Statistics:")
print(f"Total Processed: {saved + updated + skipped}")
print(f"New Messages: {saved}")
print(f"Updated Messages: {updated}")
print(f"Skipped Messages: {skipped}")
print(f"Errors: {errors}")
print(f"Total Retries: {retry_count}")
print(f"\nTime Elapsed: {str(elapsed).split('.')[0]}")
print(f"Average Speed: {speed:.1f} messages/second")
print("="*50)
return True
except Exception as e:
logger.error(f"Error saving channel messages: {e}")
print(f"\nError saving messages: {str(e)}")
return False
async def list_saved_users(self):
"""List users saved from active channel"""
active = self.get_active_channel()
if not active:
print("\nNo active channel selected!")
return
channel_id = str(active['id'])
if channel_id not in self.db.get('users', {}):
print("\nNo saved users for this channel!")
return
users = self.db['users'][channel_id]
if not users:
print("\nNo users found!")
return
print("\nSaved Users:")
print("-" * 80)
print(f"{'ID':<12} | {'Username':<15} | {'Name':<20} | {'Type':<8} | {'Status'}")
print("-" * 80)
for user_id, user in sorted(users.items(), key=lambda x: x[1].get('username') or ''):
username = f"@{user['username']}" if user['username'] else '-'
name = f"{user['first_name'] or ''} {user['last_name'] or ''}".strip() or '-'
user_type = 'Bot' if user['bot'] else 'User'
status = []
if user['premium']: status.append('Premium')
if user['verified']: status.append('Verified')
if user['scam']: status.append('Scam')
if user['fake']: status.append('Fake')
if user['restricted']: status.append('Restricted')
status_str = ', '.join(status) if status else '-'
print(f"{user_id:<12} | {username:<15} | {name[:20]:<20} | {user_type:<8} | {status_str}")
print("-" * 80)
print(f"Total Users: {len(users)}")
async def search_messages(self):
"""Search in saved messages"""
active = self.get_active_channel()
if not active:
print("\nNo active channel selected!")
return
channel_id = str(active['id'])
if channel_id not in self.db.get('messages', {}):
print("\nNo saved messages for this channel!")
return
messages = self.db['messages'][channel_id]
if not messages:
print("\nNo messages found!")
return
print("\nSearch Options:")
print("1. Search by text")
print("2. Search by date range")
print("3. Search by message ID")
print("4. Show messages with reactions")
print("5. Show messages with media")
print("6. Show user's last messages")
print("7. Back to main menu")
choice = input("\nEnter your choice (1-7): ")
if choice == '1':
query = input("\nEnter search text: ").lower()
results = []
for msg_id, msg in messages.items():
if msg.get('text') and query in msg['text'].lower():
results.append(msg)
self._display_message_results(results, f"Messages containing '{query}'")
elif choice == '2':
from_date = input("\nEnter start date (YYYY-MM-DD): ")
to_date = input("Enter end date (YYYY-MM-DD): ")
try:
from_dt = datetime.strptime(from_date, '%Y-%m-%d')
to_dt = datetime.strptime(to_date, '%Y-%m-%d')
results = []
for msg_id, msg in messages.items():
msg_date = datetime.strptime(msg['date'].split('+')[0], '%Y-%m-%d %H:%M:%S')
if from_dt <= msg_date <= to_dt:
results.append(msg)
self._display_message_results(results, f"Messages from {from_date} to {to_date}")
except ValueError:
print("\nInvalid date format! Use YYYY-MM-DD")
elif choice == '3':
msg_id = input("\nEnter message ID: ")
if msg_id in messages:
self._display_message_results([messages[msg_id]], "Message found")
else:
print("\nMessage not found!")
elif choice == '4':
results = []
for msg_id, msg in messages.items():
if msg.get('reactions') and len(msg['reactions']) > 0:
results.append(msg)
self._display_message_results(results, "Messages with reactions")
elif choice == '5':
results = []
for msg_id, msg in messages.items():
if msg.get('has_media'):