-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathbusy-bot.py
89 lines (75 loc) · 2.88 KB
/
busy-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Python Wechaty - https://github.com/wechaty/python-wechaty
Authors: Huan LI (李卓桓) <https://github.com/huan>
Jingjing WU (吴京京) <https://github.com/wj-Mcat>
2020 @ Copyright Wechaty Contributors <https://github.com/wechaty>
Licensed under the Apache License, Version 2.0 (the 'License');
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an 'AS IS' BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import logging
from typing import Optional
from wechaty import (
Wechaty, Contact, Message
)
# waiting for wechaty-puppet new version
from wechaty_puppet import ScanStatus # type: ignore
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
class MyBot(Wechaty):
"""
listen wechaty event with inherited functions, which is more friendly for
oop developer
"""
def __init__(self):
super().__init__()
self.busy = False
self.auto_reply_comment = "Automatic Reply: I cannot read your message because I'm busy now, will talk to you when I get back."
async def on_message(self, msg: Message):
"""
listen for message event
"""
from_contact = msg.talker()
text = msg.text()
room = msg.room()
to = msg.to()
if room is None and to.contact_id == self.contact_id:
# say msg to the bot
if text == '#status':
msg = 'busy' if self.busy else 'free'
await from_contact.say(
f'My status: {msg}')
await from_contact.say(self.auto_reply_comment)
elif text == '#free':
await from_contact.say('auto reply stopped.')
elif text == '#busy':
self.busy = True
await from_contact.say('auto reply enabled')
else:
await from_contact.say(self.auto_reply_comment)
async def on_login(self, contact: Contact):
print(f'user: {contact} has login')
async def on_scan(self,
qr_code: str,
status: ScanStatus,
data: Optional[str] = None):
if status == ScanStatus.Waiting:
print("qr_code: ", "https://wechaty.js.org/qrcode/" + qr_code)
else:
contact = self.Contact.load(self.contact_id)
print(f'user <{contact}> scan status: {status.name} , '
f'qr_code: {qr_code}')
bot: Optional[MyBot] = None
async def main():
"""doc"""
global bot
bot = MyBot()
await bot.start()
asyncio.run(main())