-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathmessage-file-bot.py
102 lines (83 loc) · 3.14 KB
/
message-file-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""send contact card to specific contact"""
# pylint: disable=R0801
import asyncio
import logging
from typing import Optional, Union
from wechaty_puppet import FileBox, ScanStatus # type: ignore
from wechaty_puppet import MessageType
from wechaty import Wechaty, Contact
from wechaty.user import Message, Room
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
class MyBot(Wechaty):
"""
listen wechaty event with inherited functions, which is more friendly for
oop developer
"""
def __init__(self):
super().__init__()
async def on_message(self, msg: Message):
"""
listen for message event
"""
from_contact = msg.talker()
text = msg.text()
room = msg.room()
if room:
await room.ready()
# send contact-card
if msg.type() == MessageType.MESSAGE_TYPE_CONTACT:
# we can receive the contact-card event, and get the contact from message
contact = await msg.to_contact()
if text == 'send card':
# find one of my friend to send to `from_contact`
contacts = await bot.Contact.find_all()
if contacts:
# send one of my friend to the talker
# !! this interface is not supported now
await from_contact.say(contacts[0])
print('have sended')
elif msg.type() == MessageType.MESSAGE_TYPE_IMAGE:
img = await msg.to_file_box()
# save the image as local file
await img.to_file(f'./{img.name}')
# send image file to the room
if room:
await room.say(img)
elif msg.type() == MessageType.MESSAGE_TYPE_VIDEO:
video = await msg.to_file_box()
# save the video as local file
await video.to_file(f'./{video.name}')
# send video file to the room
if room:
await room.say(video)
elif msg.type() == MessageType.MESSAGE_TYPE_AUDIO:
audio = await msg.to_file_box()
# save the audio file as local file
await audio.to_file(f'./{audio.name}')
# !! we can't send audio to room/contact
print('done')
async def on_login(self, contact: Contact):
"""login event. It will be triggered every time you login"""
log.info(f'user: {contact} has login')
async def on_scan(self,
qr_code: str,
status: ScanStatus,
data: Optional[str] = None):
"""scan event, It will be triggered when you scan the qrcode to login.
And it will not be triggered when you have logined
"""
if status == ScanStatus.Waiting:
print("qr_code: ", "https://wechaty.js.org/qrcode/" + qr_code)
else:
contact = self.Contact.load(self.contact_id)
print(f'user <{contact}> scan status: {status.name} , '
f'qr_code: {qr_code}')
bot: Optional[MyBot] = None
async def main():
"""doc"""
# pylint: disable=W0603
global bot
bot = MyBot()
await bot.start()
asyncio.run(main())