В прошлом гайде мы создали класс для ведения логов и запускатор нашего бота.
В этой части мы создадим "движок" и тело бота, так же добавим базу для плагинов, приступим.
Перед тем, как продолжить, сразу создадим класс для удобного использования апи вк, я буду использоваю
Теперь приступим к
Здесь будет основное "тело" бота, с которого начинается прочитывание сообщений и передача данных движку (Опять много кода)
Теперь бот умеет запускаться, но обрабатывать сообщения он не будет, исправим это:
Движок будет инициализировать плагины при старте и обрабатывать сообщения, если это команда, то отправлять плагину
Основное есть, теперь он будет обрабатывать все сообщения, которые являются командами, напишем теперь базу для плагинов:
Все готово,
На этом все, в следующей части мы напишем первые простенькие плагины и посмотрим как работает наш бот.
В этой части мы создадим "движок" и тело бота, так же добавим базу для плагинов, приступим.
Перед тем, как продолжить, сразу создадим класс для удобного использования апи вк, я буду использоваю
Пожалуйста, авторизуйтесь для просмотра ссылки.
(Много кода, но это не весь):
Python:
from Settings import BotSettings as Set
from Log import Log
import random
import vk_api
import time
vk = vk_api.VkApi(login=Set.Bot.Login, password=Set.Bot.Password)
vk.auth()
class VK(object):
def __init__(self):
self.vk = vk
def exec(self, method, parameters):
time.sleep(Set.Bot.SleepTime)
return self.vk.method(method, parameters)
class Messages(object):
def __init__(self):
self.chatID = Set.Chat.ChatID
self.chatPeer = Set.Chat.ChatPeer
self.parameters = dict()
self.method = ''
def send(self, message=None, lat=None, long=None, attachment=None, forward_messages=None):
self.method = 'messages.send'
self.parameters = {'peer_id': self.chatPeer, 'chat_id': self.chatID, 'random_id': random.randint(1, 2**64)}
if message:
self.parameters['message'] = message
if lat:
self.parameters['lat'] = lat
if long:
self.parameters['long'] = long
if attachment:
self.parameters['attachment'] = attachment
if forward_messages:
self.parameters['forward_messages'] = forward_messages
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_history(self, count=None, offset=None, start_message_id=None, fields=None, rev=None):
self.method = 'messages.getHistory'
self.parameters['peer_id'] = self.chatPeer
if count is not None:
self.parameters['count'] = count
else:
self.parameters['count'] = 5
if offset:
self.parameters['offset'] = offset
if fields:
self.parameters['fields'] = fields
if rev:
self.parameters['rev'] = rev
if start_message_id:
self.parameters['start_message_id'] = start_message_id
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def remove_chat_user(self, member_id=None):
try:
self.method = 'messages.removeChatUser'
self.parameters['chat_id'] = self.chatID
self.parameters['member_id'] = member_id
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def set_chat_photo(self, file=None):
try:
self.method = 'messages.setChatPhoto'
if file:
self.parameters['file'] = file
else:
self.parameters['file'] = Set.Chat.Photo
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def edit_chat(self, title=None):
try:
self.method = 'messages.editChat'
self.parameters['chat_id'] = self.chatID
if title:
self.parameters['title'] = title
else:
self.parameters['title'] = Set.Chat.Title
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def add_chat_user(self, from_id=None):
self.method = 'messages.addChatUser'
self.parameters['chat_id'] = self.chatID
if from_id:
self.parameters['from_id'] = from_id
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def edit(self, message=None, message_id=None, lat=None, long=None, attachment=None):
self.method = 'messages.edit'
self.parameters = {'peer_id': self.chatPeer}
if message:
self.parameters['message'] = message
if lat:
self.parameters['lat'] = lat
if long:
self.parameters['long'] = long
if attachment:
self.parameters['attachment'] = attachment
if message_id:
self.parameters['message_id'] = message_id
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_by_id(self, message_ids=None):
self.method = 'messages.getById'
self.parameters['message_ids'] = message_ids
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_chat(self, chat_id=None, fields=None):
self.method = 'messages.getChat'
self.parameters['chat_id'] = self.chatID
if chat_id:
self.parameters['chat_id'] = chat_id
if fields:
self.parameters['fields'] = fields
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_invite_link(self, reset=None):
self.method = 'messages.getInviteLink'
self.parameters['peer_id'] = self.chatPeer
if reset:
self.parameters['reset'] = reset
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def pin(self, message_id=None):
self.method = 'messages.pin'
self.parameters['peer_id'] = self.chatPeer
if message_id:
self.parameters['message_id'] = message_id
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def unpin(self, group_id=None):
self.method = 'messages.pin'
self.parameters['peer_id'] = self.chatPeer
if group_id:
self.parameters['group_id'] = group_id
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
class Users(object):
def __init__(self):
self.parameters = dict()
self.method = ''
def get(self, user_ids=None, fields=None, name_case=None):
self.method = 'users.get'
if user_ids:
self.parameters['user_ids'] = user_ids
if fields:
self.parameters['fields'] = fields
if name_case:
self.parameters['name_case'] = name_case
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_followers(self):
self.method = 'users.getFollowers'
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
def get_nearby(self, latitude=None, longitude=None, radius=None, fields=None):
self.method = 'users.getNearby'
if latitude:
self.parameters['latitude'] = latitude
if longitude:
self.parameters['longitude'] = longitude
if radius:
self.parameters['radius'] = radius
if fields:
self.parameters['fields'] = fields
try:
return VK().exec(self.method, self.parameters)
except Exception as e:
Log.show_error(e)
return False
Main.py
:Здесь будет основное "тело" бота, с которого начинается прочитывание сообщений и передача данных движку (Опять много кода)
Python:
from Settings import BotSettings as Set
from Settings import DataBase as DB
from threading import Thread
from Engine import Engine
from Log import Log
from VK import VK
import Utils
class Main(object):
@staticmethod
def start():
try:
Log.show_info(f'Бот запущен.')
if not Set.Bot.Start:
VK().Messages().send(message=f'{Set.Bot.Name}: Успешный запуск.')
Set.Bot.GlobalID = VK().Messages().get_history(count=1)['items'][0]['id']
Set.Bot.Start = True
Set.Bot.Users = VK().Messages().get_chat()['users']
while True:
chat_title = VK().Messages().get_chat()['title']
if chat_title != Set.Chat.Title:
VK().Messages().edit_chat(title=Set.Chat.Title)
response = VK().Messages().get_history(count=10, offset=-10, start_message_id=Set.Chat.GlobalID, fields='first_name, last_name')
if response is not False: # Если запрос правильный
response = response['items'][::-1]
if len(response):
for item in response:
if int(item['id']) > Set.Chat.GlobalID and item['from_id'] != Set.Bot.MainID:
Set.Chat.GlobalID = int(item['id']) # проверяем только новые сообщения
s = ''
p = None
if 'text' in item: # если сообщение содержит текст
if len(item['text']) > 0:
s += str.encode(item['text']).decode('utf-8')
else:
s += '(None)'
log = s
u = item['from_id']
if 'attachments' in item: # если в сообщении есть прикрепленные материалы
if len(item['attachments']):
log += ' ('
for a in item['attachments']:
tp = a['type']
a = a[str(tp)]
if tp == 'doc': # документ
log += f'doc: {a["title"]} (doc{a["owner_id"]}_{a["id"]}'
if 'access_key' in a:
log += '_' + a["access_key"]
log += ')'
if tp == 'photo': # фото
p = a['sizes'][len(a['sizes']) - 1]['url']
log += f'photo: photo{a["owner_id"]}_{a["id"]}'
if 'access_key' in a:
log += '_' + a["access_key"]
if tp == 'audio': # аудио
log += f'audio: {a["artist"]} - ' \
f'{a["title"]} (audio{a["owner_id"]}_{a["id"]}'
if 'access_key' in a:
log += '_' + a["access_key"]
log += ')'
if tp == 'video': # видео
log += f'video: {a["title"]} (video{a["owner_id"]}_{a["id"]}'
if 'access_key' in a:
log += '_' + a["access_key"]
log += ')'
if tp == 'link': # ссылка
log += f'link: {a["title"]} ({a["url"]})'
if tp == 'sticker': # стикер
log += f'sticker: product_{a["product_id"]} id_{a["sticker_id"]}'
log += ', '
log = log[:len(log) - 2]
log += ')'
if 'geo' in item: # если кто-то отправил гео-положение
log += f' (geo: {item["geo"]["coordinates"]})'
if 'place' in item['geo']:
log += f' ({item["geo"]["place"]["country"]}, {item["geo"]["place"]["city"]})'
if 'fwd_messages' in item: # если кто-то переслал сообщение
if len(item['fwd_messages']):
log += f' (forward: '
for mes in item['fwd_messages']:
log += f'{Utils.VKEngine.get_username_by_id(mes["from_id"])} ({mes["text"]}), '
log = log[:len(log) - 2]
log += ')'
Log.show_vk(Utils.VKEngine.get_username_by_id(u) + ': ' + log)
if 'action' in item: # если произошло какое-то действие
if item['action']['type'] == 'chat_title_update': # если изменили имя чата
VK().Messages().send(message='Название менять нельзя.')
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
VK().Messages().edit_chat(title=Set.Chat.Title)
if item['action']['type'] == 'chat_photo_update': # если изменили фото чата
VK().Messages().send(message='Фото менять нельзя.')
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
VK().Messages().set_chat_photo(file=Set.Chat.Photo)
if item['action']['type'] == 'chat_invite_user_by_link': # если пользователь зашел в чат по ссылке
if not int(item['from_id']) in DB.BlackList:
VK().Messages().send(message='Добро пожаловать, ' + Utils.VKEngine.get_link_by_id(item['from_id']) + '.')
else:
VK().Messages().send(message='Сорен, ты в ЧС.')
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
if item['action']['type'] == 'chat_invite_user' or item['action']['type'] == 'chat_kick_user': # если кто-то пригласил или кикнул кого-то
kick = False
if not int(item['from_id']) in DB.Privilege:
VK().Messages().remove_chat_user(member_id=item['from_id'])
kick = True
if int(item['from_id']) != int(item['action']['member_id']):
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
response = VK().Messages().get_chat()['users']
for user in response:
if int(user) in DB.BlackList:
kick = True
VK().Messages().remove_chat_user(member_id=user)
if not kick:
VK().Messages().send(message='Добро пожаловать, ' + Utils.VKEngine.get_username_by_id(item['action']['member_id'] + '.'))
if item['action']['type'] == 'chat_pin_message': # если кто-то закрепил сообщение
if not int(item['action']['member_id']) in DB.Admins:
VK().Messages().send(message='Закреп менять нельзя.')
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
VK().Messages().unpin()
VK().Messages().pin(message_id=Set.Chat.PinID['message_id'])
else:
Set.Chat.PinID['message_id'] = item['id']
open('DB\\pin.txt', 'w').write(str(Set.Chat.PinID))
if item['action'] == 'chat_unpin_message': # если кто-то открепил сообщение
if not int(item['action_mid']) in DB.Admins:
VK().Messages().send(message='Закреп удалять нельзя.')
VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
VK().Messages().pin(message_id=Set.Chat.PinID['message_id'])
elif len(s) > 2: # если длина сообщения больше 2, то она будет обрабатываться
if (Set.Bot.Debug and int(u) == Set.Bot.CreatorID) or not Set.Bot.Debug:
c = s[0]
if c in Set.Chat.Prefixes:
s = s[1:len(s)]
md = 'a' if int(u) in DB.Admins else ('m' if int(u) in DB.Moders else 'u')
def start_thread(): # запускаем обработку в отдельном потоке, чтобы бот дальше принимал сообщения
Engine().cmd(s, p, u, md, item['id'])
Thread(target=start_thread).start()
else:
if s[0] in Set.Chat.Prefixes:
VK().Messages().send('Бот в режиме дебага.')
except Exception as e:
Log.show_error(e)
Движок будет инициализировать плагины при старте и обрабатывать сообщения, если это команда, то отправлять плагину
Python:
import os
import inspect
from Plugins.BasePlugin import BasePlugin as Base
from Settings import BotSettings as Set
from Log import Log
from VK import VK
import Utils
import collections
class Engine(object):
def __init__(self):
self.plugin_dir = Set.Bot.PluginsDir
self.modules = []
self.package_obj = None
def initialize(self):
check1 = []
check2 = []
check3 = []
if not Set.Bot.Run:
tr = 0
fl = 0
al = 0
Set.Bot.Run = True
len_plugins = len(os.listdir(self.plugin_dir)) - 4
counter_plugins = 1
for fname in os.listdir(self.plugin_dir):
if fname.endswith(".py"):
module_name = fname[: -3]
if module_name != "Base" and module_name != "__init__":
try:
self.package_obj = __import__(self.plugin_dir + "." + module_name)
self.modules.append(module_name)
Log.show_service(f"[{counter_plugins}/{len_plugins}] {module_name} loaded")
tr += 1
except Exception as e:
Log.show_error(f"[{counter_plugins}/{len_plugins}] {module_name} not loaded: {e}")
fl += 1
al += 1
counter_plugins += 1
Log.show_service(f'Loaded: {tr}/{al} ({fl} errors)')
for module_name in self.modules:
module_obj = getattr(self.package_obj, module_name)
for elem in dir(module_obj):
obj = getattr(module_obj, elem)
if inspect.isclass(obj):
if issubclass(obj, Base):
a = obj()
if 'BasePlugin' not in str(a):
Set.Bot.Plugins.append(a)
name = str(a)[str(a).find('.')+1:str(a).find(' ')]
name = name[name.find('.')+1:]
check1.append(name)
check3.append(a.name)
for word in a.words:
check2.append(word)
if len(check1) != len(set(check1)):
_mas = [item for item, count in collections.Counter(check1).items() if count > 1]
Log.show_error('There are plugins with the same plugin name: ' + str(_mas)) # проверяем, есть ли плагины с одинаковым именем
raise KeyboardInterrupt
if len(check2) != len(set(check2)):
_mas = [item for item, count in collections.Counter(check2).items() if count > 1]
Log.show_error('There are plugins with the same keywords: ' + str(_mas)) # проверяем, есть ли плагины с одинаковыми ключевыми словами
raise KeyboardInterrupt
if len(check3) != len(set(check3)):
_mas = [item for item, count in collections.Counter(check1).items() if count > 1]
Log.show_error('There are plugins with the same name: ' + str(_mas)) # проверяем, есть ли плагины с одинаковым названием
raise KeyboardInterrupt
Set.Bot.Words = check2
@staticmethod
def cmd(t=None, p=None, u=None, m=None, d=None):
md = f"{Set.Bot.Name} [v{Set.Bot.Version}]"
Set.Bot.Answer = False
Set.Bot.StartNow = False
for a in Set.Bot.Plugins:
res = dict()
res['message'] = ''
res['attachment'] = ''
res = a.cmd(t, p, u, m, d) # тут он отправляет плагину сообщение для обработки
if res is not False: # если ответ есть, то обрабатывает дальше
if len(str(res)) > 0:
p = str(a)[str(a).find('.')+1:str(a).find(' ')]
p = p[p.find('.')+1:]
Log.show_plugin(res['plugin'] + ' (' + p + ')')
Set.Bot.Answer = True
if not res['message'] is None or not res['attachment'] is None:
if res['message']: # отправляет текст
res['message'] = f"{res['message']}\n{md}"
VK().Messages().send(message=res['message'], forward_messages=d)
elif res['attachment']: # или приложение
VK().Messages().send(message=md, attachment=res['attachment'], forward_messages=d)
if not res['lat'] is None and not res['long'] is None:
Set.Bot.Answer = True # здесь местоположение
VK().Messages().send(message=md, lat=res['lat'], long=res['long'], forward_messages=d)
if not res['forward_messages'] is None: # пересланные сообщения
VK().Messages().send(message=md, forward_messages=res['forward_messages'])
if not Set.Bot.Answer and m != 'b' and not Set.Bot.StartNow:
Utils.HelpEngine.what() # если такой команды нет
Log.show_plugin('None')
Python:
import Utils
from Settings import BotSettings as Set
class BasePlugin(object):
def __init__(self): # тут основные свойства плагина
self.name = 'name' # его имя
self.description = 'description' # описание
self.words = ['word'] # ключевые слова
self.mode = '' # режим
self.text = '' # текст
self.photo = '' # фото
self.result = {'message': '', 'attachment': None, 'lat': None, 'long': None, 'forward_messages': None, 'plugin': None}
self.level = 'bamu' # уровень доступа
self.message = None # сообщение
self.ulvl = None # опять доступ
self.user = None # пользователь
self.hide = False # если True, то плагин скрывается
self.job = True # если False, то плагин не работает
self.debug = False # если True, то плагин в режиме дебага, с ним работать могут только админы
def run(self):
self.result['plugin'] = self.name
if self.mode == '?': # отправляет информацию о плагине
self.get_info()
else: # иначе запускает его
self.func()
return self.result
def cmd(self, text, photo, user, mode, message_id):
self.message = message_id
self.result['message'] = ''
self.result['attachment'] = ''
self.user = user
self.photo = photo
self.text = str.encode(text).decode('utf-8')
if text[0] == '?':
self.mode = '?'
text = text[1:len(text)]
elif text[0] == '-':
self.mode = '-'
text = text[1:len(text)]
elif text[0] == '+':
self.mode = '+'
text = text[1:len(text)]
else:
self.mode = 'None'
helping = ''
if ' ' in text:
helping = text[text.find(' ')+1:len(text)]
text = text[0:text.find(' ')]
if text.lower() in self.words:
if self.job or self.debug:
if not self.debug:
if mode in self.level:
self.ulvl = mode
self.text = helping
return self.run()
else:
Set.Bot.StartNow = True
Utils.HelpEngine.non()
return False
else:
if mode == 'a':
self.ulvl = mode
self.text = helping
return self.run()
else:
Set.Bot.StartNow = True
Utils.HelpEngine.debug()
return False
else:
Utils.HelpEngine.job()
return False
def func(self):
pass
def get_info(self):
self.result['message'] = 'Название плагина: ' + self.name + \
'\nОписание: ' + self.description + \
'\nКоманды: ' + ', '.join(self.words)
Settings.py
изменился по ходу написания на:
Python:
class BotSettings:
class Log:
All = True
Text = True
Console = True
class Bot:
Login = '' # логин
Password = '' # пароль
Start = False
Name = 'DataStockBot'
Version = '1'
SleepTime = 0.25
Plugins = list()
MainID = None # ид бота
Debug = False
CreatorID = None # ваш ид
StartNow = False
Run = False
Answer = False
Words = None
PluginsDir = 'Plugins' # папка с плагинами
class Chat:
Users = list()
Title = 'DataStock Chat' # название чата
Photo = '' # тут должно быть фото, как его загрузить я покажу в следующей части
ChatID = 1 # ид чата от лица бота
ChatPeer = 2000000000 + ChatID
GlobalID = None # ид последнего сообщения
PinID = None # ид закрепленного сообщения
Prefixes = ('!', '/', '\\')
class DataBase:
BlackList = list()
WhiteList = list()
Admins = list()
Moders = list()
Stuff = Admins + Moders
Privilege = WhiteList + Stuff