-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathmute.py
135 lines (114 loc) · 4.54 KB
/
mute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
from datetime import datetime, timedelta
from random import choice
from typing import List
from telegram import Update, User, ChatPermissions, TelegramError
from telegram.ext import (
Updater,
CommandHandler,
CallbackContext,
Filters,
MessageHandler,
)
from skills.towel_like_mode import catch_message, quarantine_user, unquarantine_user
from filters import admin_filter
from mode import cleanup_queue_update
from utils.time import get_duration
logger = logging.getLogger(__name__)
MIN_MUTE_TIME = timedelta(minutes=1)
MAX_MUTE_TIME = timedelta(days=365)
def add_mute(upd: Updater, handlers_group: int):
logger.info("registering mute handlers")
dp = upd.dispatcher
dp.add_handler(
CommandHandler("mute", mute, filters=admin_filter, run_async=True),
handlers_group,
)
dp.add_handler(CommandHandler("mute", mute_self, run_async=True), handlers_group)
dp.add_handler(
CommandHandler("unmute", unmute, filters=admin_filter, run_async=True),
handlers_group,
)
dp.add_handler(
MessageHandler(
Filters.chat_type.groups & ~Filters.status_update,
catch_message,
run_async=True,
),
handlers_group,
)
def _get_minutes(args: List[str]) -> timedelta:
# cmd should be a reply for going to mute user message like "/mute 90"
if len(args) < 1:
raise Exception(
"mute cmd should be a reply for going to mute user message like '/mute 90', "
"where '90' is duration of the mute"
)
return get_duration(args[0])
def mute_user_for_time(
update: Update, context: CallbackContext, user: User, mute_duration: timedelta
):
mute_duration = max(mute_duration, MIN_MUTE_TIME)
mute_duration = min(mute_duration, MAX_MUTE_TIME)
try:
until = datetime.now() + mute_duration
logger.info(
"user: %s[%d] will be muted for %s", user.full_name, user.id, mute_duration
)
update.message.reply_text(f"Таймаут для {user.full_name} на {mute_duration}")
mute_perm = ChatPermissions(
can_add_web_page_previews=False,
can_send_media_messages=False,
can_send_other_messages=False,
can_send_messages=False,
can_send_polls=False,
)
context.bot.restrict_chat_member(
update.effective_chat.id, user.id, mute_perm, until
)
except TelegramError as err:
logger.error("can't mute user %s: %s", user, err)
update.message.reply_text(f"😿 не вышло, потому что: \n\n{err}")
update.message.reply_text("Ну и что? Кинем полотенчик 🧻")
quarantine_user(user, mute_duration)
def mute(update: Update, context: CallbackContext):
user: User = update.message.reply_to_message.from_user
mute_minutes = _get_minutes(context.args)
mute_user_for_time(update, context, user, mute_minutes)
def mute_self(update: Update, context: CallbackContext):
user: User = update.effective_user
mute_user_for_time(update, context, user, timedelta(days=1))
self_mute_messages = [
f"Да как эта штука работает вообще, {user.name}?",
f"Не озоруй, {user.name}, мало ли кто увидит",
f"Зловив {user.name} на вила!",
f"Насилие порождает насилие, {user.name}",
f"Опять ты, {user.name}!",
]
result = update.message.reply_text(choice(self_mute_messages))
cleanup_queue_update(
context.job_queue,
update.message,
result,
600,
remove_cmd=True,
remove_reply=True,
)
def unmute_user(update: Update, context: CallbackContext, user: User) -> None:
try:
update.message.reply_text(f"{user.full_name}, не озоруй! Мало ли кто увидит 🧐")
unmute_perm = ChatPermissions(
can_add_web_page_previews=True,
can_send_media_messages=True,
can_send_other_messages=True,
can_send_messages=True,
can_send_polls=True,
can_invite_users=True,
)
context.bot.restrict_chat_member(update.effective_chat.id, user.id, unmute_perm)
except TelegramError as err:
update.message.reply_text(f"😿 не вышло, потому что: \n\n{err}")
def unmute(update: Update, context: CallbackContext) -> None:
user = update.message.reply_to_message.from_user
unmute_user(update, context, user)
unquarantine_user(user.id)