ArslansahBot/ArslansahBot.py

555 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

TOKEN_Telegram = "7511591614:AAH3bPBxrukeLriSCuXNI02TWKzEpNUM8oE"
from telebot import TeleBot
from telebot import types
from mastodon import Mastodon
import mysql.connector
bot = TeleBot(TOKEN_Telegram)
bot_tid = "7511591614"
messages = {
"start_welcome": "Arslanşah Mastodon Botuna Hoşgeldiniz.\nBu bot ile kanalınızı Mastodon'a bağlayabilirsiniz. Arslanşah Sunucumuzda hesap açmak isteyebilirsiniz. :)\n\n https://arslansah.com.tr/ ",
"ccon_username": "Bağlamak istediğiniz kanalın kullanıcı adını başında @ işareti olarak gönderiniz.",
"ccon_warn": "Arslanşah Mastodon Botunu bağlamak istediğiniz kanala 'mesaj gönderme izni' ile yönetici yapmanız gerekmektedir. Mesaj gönderme izni yeterlidir.",
"ccon_none_username": "Bu kullanıcı adına sahip kanal bulunamadı.",
"ccon_alread_username": "Bu kullanıcı adına sahip kanal veritabanımıza zaten kayıtlı.",
"ccon_success": "Veritabanına kanalınız başarıyla eklendi.",
"sc_step1": "İstemci anahtarını giriniz.",
"sc_step2": "İstemci gizli anahtarını giriniz.",
"sc_step3": "Erişim belirtecini giriniz.",
"sc_invalid": "Anahtar bilgilerini kontrol ediniz, doğruluklarından emin olunuz. Hesabınızın arslansah.com.tr adresinde kayıtlı olduğuna emin olunuz.",
"sc_success": "Servis başarıyla eklendi.",
"sm_active_success": "Hızlı mod aktifleştirildi.",
"sm_disable_success": "Hızlı mod devre dışı bırakıldı.",
"sp_success": "<b>Arslanşah Mastodon Sunucusunda Gönderi Paylaşıldı.</b>\n%s",
"sp_failed": "*Mastodon'da gönderi paylaşılamadı.*\n Lütfen bot yöneticisi ile iletişime geçiniz. bc1428@vuhuv.com"
}
class MastodonConnect:
def __init__(self, data):
self.mastodon = Mastodon(
client_id=data["client_id"],
client_secret=data["client_secret"],
access_token=data["access_token"],
api_base_url='https://arslansah.com.tr'
)
def getAccountInfo(self):
account_info = self.mastodon.account_verify_credentials()
print(account_info)
return {
"id": account_info.id,
"username": account_info.username,
"display_name": account_info.display_name,
"url": account_info.uri
}
def status_post(self, text):
try:
return self.mastodon.status_post(text)
except:
return False
class DatabaseConnect:
def __init__(self):
self.db = mysql.connector.connect(
host="localhost",
user="botes",
passwd="m%Kiq$AQTeO6@9%neqCscxlKms^MIU7LQbA!NCHJ%993plYXgFRvW*G&h0aYjB*b",
db="botesdb"
)
self.cursor = self.db.cursor()
self.create()
def create(self):
self.cursor.execute("CREATE TABLE IF NOT EXISTS channels (id INT PRIMARY KEY AUTO_INCREMENT, t_id VARCHAR(255) NOT NULL UNIQUE)")
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS
admins
(id INT PRIMARY KEY AUTO_INCREMENT,
t_id VARCHAR(20) NOT NULL UNIQUE,
speed_mode BOOLEAN DEFAULT '0')
""")
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS
channel_admins
(id INT PRIMARY KEY AUTO_INCREMENT,
channel_id INT,
admin_id INT,
role BOOLEAN,
FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE,
FOREIGN KEY (admin_id) REFERENCES admins(id) ON DELETE CASCADE)
""")
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS
services
(id INT PRIMARY KEY AUTO_INCREMENT,
client_id VARCHAR(100) NOT NULL UNIQUE,
client_secret VARCHAR(100) NOT NULL UNIQUE,
access_token VARCHAR(100) NOT NULL UNIQUE,
channel_id INT NOT NULL UNIQUE,
FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE)
""")
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS
speed_modes
(id INT PRIMARY KEY AUTO_INCREMENT,
admin_id INT NOT NULL UNIQUE,
service_id INT NOT NULL UNIQUE,
FOREIGN KEY (admin_id) REFERENCES admins(id) ON DELETE CASCADE,
FOREIGN KEY (service_id) REFERENCES services(id) ON DELETE CASCADE)
""")
def getMy(self, admin_tid):
self.cursor.execute("SELECT * FROM admins WHERE t_id = %s", (admin_tid,))
result = self.cursor.fetchone()
return {
"id": result[0],
"tid": result[1],
"speed": result[2]
}
def getMyChannels(self, admin_tid):
self.cursor.execute("""SELECT c.*, a.*, ca.role FROM channels c
JOIN channel_admins ca ON ca.channel_id = c.id
JOIN admins a ON a.id = ca.admin_id
WHERE a.t_id=%s""", (admin_tid, ))
result = self.cursor.fetchall()
datas = []
for get in result:
data = {
"channel": {
"id": get[0],
"tid": get[1]
},
"admin": {
"id": get[2],
"tid": get[3],
"speed": get[4]
},
"channel_admin": {
"role": get[5]
}
}
datas.append(data)
return datas
def getMyServices(self, admin_tid):
self.cursor.execute("""SELECT s.* FROM admins a
JOIN channel_admins ca ON a.id = ca.admin_id
JOIN services s ON s.channel_id = ca.channel_id
WHERE a.t_id = %s""", (admin_tid,))
result = self.cursor.fetchall()
datas = []
for get in result:
data = {
"id": get[0],
"client_id": get[1],
"client_secret": get[2],
"access_token": get[3],
"channel_id": get[4]
}
datas.append(data)
return datas
class Channels:
def __init__(self, admin_tid):
self.admin_tid = admin_tid
self.db = DatabaseConnect()
try:
self.db_admin = self.db.getMy(admin_tid)
except:
self.db_admin = []
if(len(self.db_admin) > 0):
self.db_datas = self.db.getMyChannels(admin_tid)
self.db_channels = []
self.db_services = self.db.getMyServices(admin_tid)
self.myChannelLists()
def myChannelLists(self):
for get in self.db_datas:
self.db_channels.append(get["channel"])
class Main():
def __init__(self):
self.message = ""
self.Database = DatabaseConnect()
self.db = self.Database.db
self.cursor = self.Database.cursor
def update_data(self, message):
self.data = Channels(message)
if(len(self.data.db_admin) == 0):
self.cursor.execute("INSERT INTO admins(t_id) VALUES (%s)", (self.message_tid,))
self.db.commit()
if(self.cursor.lastrowid):
self.data = Channels(message)
bot.send_message(self.message_cid, "Kullanıcı oluşturuldu.", parse_mode='Html')
def status_post(self, service, message):
try:
mastodon = MastodonConnect(service)
result = mastodon.status_post(message)
if(result):
bot.send_message(self.message_cid, messages["sp_success"] % result.uri, parse_mode='Html')
else:
bot.send_message(self.message_cid, messages["sp_failed"], parse_mode='Html')
except:
bot.send_message(self.message_cid, "<b>Mastodon'da gönderi paylaşılamadı.</b>\n Lütfen bot yöneticisi ile iletişime geçiniz. bc1428@vuhuv.com", parse_mode='Html')
def set_message(self, message):
self.message = message
self.message_cid = message.chat.id
self.message_tid = message.from_user.id
self.message_text = message.text
self.update_data(self.message_tid)
def start(self):
@bot.message_handler(commands=['start'])
def start(message):
self.set_message(message)
c_cone = types.InlineKeyboardButton('Kanal Bağla', callback_data='kanal_bagla')
c_list = types.InlineKeyboardButton('Bağlı Kanalları Görüntüle', callback_data='kanallar')
c_del = types.InlineKeyboardButton('Kanal Sil', callback_data='kanal_sil')
s_cone = types.InlineKeyboardButton('Servis Bağla', callback_data='servis_bagla')
s_list = types.InlineKeyboardButton('Bağlı Servisleri Görüntüle', callback_data='servisler')
s_del = types.InlineKeyboardButton('Servis Sil', callback_data='servis_sil')
m_toggle = types.InlineKeyboardButton('Hızlı Mod', callback_data='hizli_mod')
p_post = types.InlineKeyboardButton('Gönderi Paylaş', callback_data='gonderi_paylas')
buttons = types.InlineKeyboardMarkup()
buttons.add(c_cone, c_list)
buttons.add(s_cone, s_list)
buttons.add(c_del, s_del)
buttons.add(m_toggle)
buttons.add(p_post)
bot.send_message(self.message_cid, "Bir eylem seçiniz.", parse_mode='Html', reply_markup=buttons)
@bot.callback_query_handler(func=lambda call: True)
def redirect(call):
command = call.data
if (command == "kanal_bagla"):
self.channelConnect()
elif (command == "kanallar"):
self.channelsList()
elif (command == "servis_bagla"):
self.serviceConnect()
elif (command == "servisler"):
self.servicesList()
elif (command == "hizli_mod"):
self.speedMode()
elif (command == "gonderi_paylas"):
self.statusPost()
elif (command == "kanal_sil"):
self.channelDelete()
elif (command == "servis_sil"):
self.serviceDelete()
bot.answer_callback_query(call.id)
self.set_message(message)
@bot.callback_query_handler(func=lambda call: len(call.data.split(",")) > 1)
def redirect(call):
print(5)
split = call.data.split(",")
key = split[0]
if(key == "sc"):
value = split[2]
data = dict({
"client_id": "",
"client_secret": "",
"access_token": "",
})
def step1(call):
data["client_id"] = call.text
bot.send_message(self.message_cid, messages["sc_step2"], parse_mode='Html')
bot.register_next_step_handler(self.message, step2)
def step2(call):
data["client_secret"] = call.text
bot.send_message(self.message_cid, messages["sc_step3"], parse_mode='Html')
bot.register_next_step_handler(self.message, step3)
def step3(call):
data["access_token"] = call.text
try:
mt = MastodonConnect(data)
account = mt.getAccountInfo()
if(account):
self.cursor.execute("INSERT INTO services(client_id, client_secret, access_token, channel_id) VALUES (%s, %s, %s, %s)", (data["client_id"], data["client_secret"], data["access_token"], value))
self.db.commit()
if(self.cursor.lastrowid):
bot.send_message(self.message_cid, messages["sc_success"], parse_mode='Html')
except:
bot.send_message(self.message_cid, messages["sc_invalid"], parse_mode='Html')
bot.send_message(self.message_cid, messages["sc_step1"], parse_mode='Html')
bot.register_next_step_handler(self.message, step1)
elif(key == "sm"):
admin_id = self.data.db_datas[0]['admin']['id']
channel_id = split[2]
self.cursor.execute("UPDATE admins SET speed_mode=%s WHERE id=%s", (channel_id, admin_id))
self.db.commit()
if(self.cursor.rowcount):
bot.send_message(self.message_cid, messages["sm_active_success"], parse_mode='Html')
elif(key == "sp"):
bot.send_message(self.message_cid, "Paylaşmak istediğiniz gönderiyi gönderiniz.", parse_mode='Html')
def step1(call):
service_id = split[2]
for service in self.data.db_services:
self.status_post(service, call.text)
try:
self.cursor.execute("SELECT * FROM channels c JOIN channel_admins ca ON ca.channel_id = c.id WHERE c.id = %s", (service["channel_id"],))
result = self.cursor.fetchone()
if(result):
bot.send_message(result[1], call.text)
bot.send_message(self.message_cid, "<b>Telegram kanalınızda gönderi paylaşıldı.</b>", parse_mode='Html')
else:
bot.send_message(self.message_cid, "Kanal bulunamadı.")
except:
bot.send_message(self.message_cid, messages["sm_active_success"], parse_mode='Html')
bot.register_next_step_handler(self.message, step1)
elif (key == "cd"):
try:
value = split[2]
self.cursor.execute("DELETE FROM channels WHERE id=%s", (value,))
if(self.cursor.rowcount):
self.db.commit()
bot.send_message(self.message_cid, f"{value} id adresli kanal silindi.")
except:
bot.send_message(self.message_cid, f"Bir hata oluştu. Yöneticiye ulaşın.")
elif (key == "sd"):
value = split[2]
self.cursor.execute("DELETE FROM services WHERE id=%s", (value,))
try:
if(self.cursor.rowcount):
self.db.commit()
bot.send_message(self.message_cid, f"{value} id adresli servis silindi.")
except:
bot.send_message(self.message_cid, f"Bir hata oluştu. Yöneticiye ulaşın.")
def textStart(self):
@bot.message_handler(content_types=["text"])
def textStart(message):
self.set_message(message)
speed_mode = self.data.db_admin["speed"]
if(speed_mode):
services = self.data.db_services
for service in services:
if(service["channel_id"] == speed_mode):
self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"],))
result = self.cursor.fetchone()
if(result):
bot.send_message(result[1], message.text)
bot.send_message(message.chat.id, "<b>Telegram kanalınızda gönderi paylaşıldı.</b>", parse_mode='Html')
else:
bot.send_message(message.chat.id, "<b>Telegram kanalınız bulunamadı.</b>", parse_mode='Html')
mastodon = MastodonConnect(service)
result = mastodon.status_post(message.text)
if(result):
bot.send_message(message.chat.id, messages["sp_success"] % result.uri, parse_mode='Html')
else:
bot.send_message(message.chat.id, messages["sp_failed"], parse_mode='Html')
else:
print(99)
def channelConnect(self):
bot.send_message(self.message_cid, messages["ccon_username"], parse_mode='Html')
def step_one(message):
message_text = message.text
isAt = message_text.startswith("@") # Atınlan mesajın bir kullanıcı adı olup olmadığını test eder.
if(isAt):
try:
chat = bot.get_chat(message.text) # Kanalın mevcut olup olmadığını sorgular.
if(chat): # Kanal mevcutsa burası çalışır.
try:
try:
isMember = bot.get_chat_member(chat.id, bot_tid) # Botun kanala üye olup olmadığını sorgular
if(isMember): # Bot kanala üyeyse burası çalışır.
self.cursor.execute("INSERT INTO channels(t_id) VALUES (%s)", (chat.id,))
self.cursor.execute("INSERT INTO channel_admins(channel_id, admin_id, role) VALUES (%s, %s, %s)", (self.cursor.lastrowid, self.data.db_admin["id"], True))
self.db.commit()
if(self.cursor.lastrowid):
bot.send_message(self.message_cid, messages["ccon_success"], parse_mode='Html')
else: # Bot kanala üye değilse çalışır.
bot.send_message(self.message_cid, messages["ccon_warn"], parse_mode='Html')
except:
bot.send_message(self.message_cid, messages["ccon_warn"], parse_mode='Html')
except: # Üyelik doğrulaması sırasında kodda ata keşfedilirse bursı çalışır.
bot.send_message(self.message_cid, messages["ccon_alread_username"], parse_mode='Html')
except: # Kanalın mevcutluğu denetlenirken bir hata ile karşılaşılırsa bursı çalışır.
bot.send_message(self.message_cid, messages["ccon_none_username"], parse_mode='Html')
bot.register_next_step_handler(self.message, step_one) # Kanal kaydının birinci adımını çalıştırır. Bu adımda kanalın mevcut olup olmadığı ve botun kanala üye olup olmadığı sorgulanarak veritabanına kayıt işlemi gerçekleştirilir.
def channelDelete(self):
channels = self.data.db_channels
buttons = types.InlineKeyboardMarkup()
print(self.data)
if(len(channels) > 0):
for channel in channels:
chat = bot.get_chat(channel["tid"])
button = types.InlineKeyboardButton(f"{chat.title} (@{chat.username})", callback_data=f'cd,step-1,{channel["id"]}')
buttons.add(button)
bot.send_message(self.message_cid, "Silmek istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons)
else:
bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html')
def channelsList(self): # Hesaba bağlı olan kanlları listelemek için bir fontsiyon
text = "<b>Hesabınıza bağlı olan kanallar:</b>\n"
count = 1
for get in self.data.db_channels:
chat = bot.get_chat(get["tid"])
text += f'{count}. {chat.title} (@{chat.username})\n'
count += 1
bot.send_message(self.message_cid, text, parse_mode='Html')
def serviceConnect(self):
channels = self.data.db_channels
buttons = types.InlineKeyboardMarkup()
if(len(channels) > 0):
for channel in channels:
chat = bot.get_chat(channel["tid"])
button = types.InlineKeyboardButton(f"{chat.title} (@{chat.username})", callback_data=f'sc,step-1,{channel["id"]}')
buttons.add(button)
bot.send_message(self.message_cid, "Servisi bağlamak istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons)
else:
bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html')
def serviceDelete(self):
services = self.data.db_services
buttons = types.InlineKeyboardMarkup()
if(len(services) > 0):
for service in services:
# Mastodon
mastodon = MastodonConnect({
'client_id': service["client_id"],
'client_secret': service["client_secret"],
'access_token': service["access_token"]
}).getAccountInfo()
# Telegram
self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"],))
chat = bot.get_chat(self.cursor.fetchone()[1])
button = types.InlineKeyboardButton(f"M: {mastodon['display_name']} (@{mastodon['username']}) - T: {chat.title} (@{chat.username})", callback_data=f'sd,step-1,{service["id"]}')
buttons.add(button)
bot.send_message(self.message_cid, "Silmek istediğiniz servisi seçiniz.", parse_mode='Html', reply_markup=buttons)
else:
bot.send_message(self.message_cid, "Hesabınıza bağlı servis bulunamadı.", parse_mode='Html')
def servicesList(self):
text = "<b>Bağlı servisler</b>\n--------------\n"
services = self.data.db_services
if(len(services)):
for service in services:
mt = MastodonConnect({
'client_id': service["client_id"],
'client_secret': service["client_secret"],
'access_token': service["access_token"]
})
self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"], ))
resultChannel = self.cursor.fetchone()
if(resultChannel):
text += f"<b>SERVICE ID {service['id']}</b>\n\n"
text += f"<b>Mastodon</b>\n"
try:
account = mt.getAccountInfo()
# text += f"<b>İsim:</b> {account['display_name']}"
text += f"<b>Kullanıcı Adı:</b> {account['username']}\n\n"
except:
text += f"Mastodon hesabında bağlantı hatası var. Lütfen kontrol ediniz.\n\n"
text += f"<b>client_id:</b> {service['client_id']}\n"
text += f"<b>client_secret:</b> {service['client_secret']}\n"
text += f"<b>access_token:</b> {service['access_token']}\n"
channel_tid = resultChannel[1]
channel = bot.get_chat(channel_tid)
text += f"\n<b>Telegram</b>\n"
text += f"<b>İsim:</b> {channel.title}\n"
text += f"<b>Kullanıcı Adı:</b> @{channel.username}\n"
text += "--------------"
bot.send_message(self.message_cid, text, parse_mode='Html')
else:
bot.send_message(self.message_cid, "Bağlı servis kaydı bulunmadı.", parse_mode='Html')
def speedMode(self):
speed_mode = self.data.db_admin["speed"]
if(speed_mode == 0):
buttons = types.InlineKeyboardMarkup()
if(len(self.data.db_services) > 0):
for service in self.data.db_services:
self.cursor.execute("SELECT t_id, id FROM channels WHERE id=%s", (service['channel_id'],))
channel = self.cursor.fetchone()
if(channel):
result = bot.get_chat(channel[0])
button = types.InlineKeyboardButton(f"{result.title} (@{result.username})", callback_data=f'sm,channel_select,{channel[1]}')
buttons.add(button)
bot.send_message(self.message_cid, "Hızlı moda bağlamak istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons)
else:
bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html')
elif(speed_mode > 0):
admin_id = self.data.db_admin["id"]
self.cursor.execute("UPDATE admins SET speed_mode=%s WHERE id=%s", (0, admin_id))
self.db.commit()
bot.send_message(self.message_cid, messages["sm_disable_success"], parse_mode='Html')
def statusPost(self):
buttons = types.InlineKeyboardMarkup()
if(len(self.data.db_services) > 0):
for service in self.data.db_services:
print(service)
self.cursor.execute("SELECT t_id, id FROM channels WHERE id=%s", (service['channel_id'],))
channel = self.cursor.fetchone()
if(channel):
result = bot.get_chat(channel[0])
button = types.InlineKeyboardButton(f"{result.title} (@{result.username})", callback_data=f"sp,sp_post,{service['id']}")
buttons.add(button)
bot.send_message(self.message_cid, "Gönderi paylaşacağınız servisi seçiniz.", parse_mode='Html', reply_markup=buttons)
else:
bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html')
if __name__ == "__main__":
main = Main()
main.start()
main.textStart()
bot.polling()