commit a548b5eba566177bb73f304f0f25c773aca7e579 Author: batuhancoskun Date: Sat Apr 12 14:13:53 2025 +0000 Upload files to "/" diff --git a/ArslansahBot.py b/ArslansahBot.py new file mode 100644 index 0000000..1a68e35 --- /dev/null +++ b/ArslansahBot.py @@ -0,0 +1,555 @@ +TOKEN_Telegram = "7511591614:AAH3bPBxrukeLriSCuXNI02TWKzEpNUM8oE" + +from telebot import TeleBot +from telebot import types +from mastodon import Mastodon +import mysql.connector + +bot = TeleBot(TOKEN_Telegram) +bot_tid = "7511591614" + +messages = { + "start_welcome": "Arslanşah Mastodon Botuna Hoşgeldiniz.\nBu bot ile kanalınızı Mastodon'a bağlayabilirsiniz. Arslanşah Sunucumuzda hesap açmak isteyebilirsiniz. :)\n\n https://arslansah.com.tr/ ", + "ccon_username": "Bağlamak istediğiniz kanalın kullanıcı adını başında @ işareti olarak gönderiniz.", + "ccon_warn": "Arslanşah Mastodon Botunu bağlamak istediğiniz kanala 'mesaj gönderme izni' ile yönetici yapmanız gerekmektedir. Mesaj gönderme izni yeterlidir.", + "ccon_none_username": "Bu kullanıcı adına sahip kanal bulunamadı.", + "ccon_alread_username": "Bu kullanıcı adına sahip kanal veritabanımıza zaten kayıtlı.", + "ccon_success": "Veritabanına kanalınız başarıyla eklendi.", + "sc_step1": "İstemci anahtarını giriniz.", + "sc_step2": "İstemci gizli anahtarını giriniz.", + "sc_step3": "Erişim belirtecini giriniz.", + "sc_invalid": "Anahtar bilgilerini kontrol ediniz, doğruluklarından emin olunuz. Hesabınızın arslansah.com.tr adresinde kayıtlı olduğuna emin olunuz.", + "sc_success": "Servis başarıyla eklendi.", + "sm_active_success": "Hızlı mod aktifleştirildi.", + "sm_disable_success": "Hızlı mod devre dışı bırakıldı.", + "sp_success": "Arslanşah Mastodon Sunucusunda Gönderi Paylaşıldı.\n%s", + "sp_failed": "*Mastodon'da gönderi paylaşılamadı.*\n Lütfen bot yöneticisi ile iletişime geçiniz. bc1428@vuhuv.com" +} + +class MastodonConnect: + def __init__(self, data): + self.mastodon = Mastodon( + client_id=data["client_id"], + client_secret=data["client_secret"], + access_token=data["access_token"], + api_base_url='https://arslansah.com.tr' + ) + def getAccountInfo(self): + account_info = self.mastodon.account_verify_credentials() + + print(account_info) + + return { + "id": account_info.id, + "username": account_info.username, + "display_name": account_info.display_name, + "url": account_info.uri + } + + def status_post(self, text): + try: + return self.mastodon.status_post(text) + except: + return False + +class DatabaseConnect: + def __init__(self): + self.db = mysql.connector.connect( + host="localhost", + user="botes", + passwd="m%Kiq$AQTeO6@9%neqCscxlKms^MIU7LQbA!NCHJ%993plYXgFRvW*G&h0aYjB*b", + db="botesdb" + ) + self.cursor = self.db.cursor() + self.create() + + + def create(self): + self.cursor.execute("CREATE TABLE IF NOT EXISTS channels (id INT PRIMARY KEY AUTO_INCREMENT, t_id VARCHAR(255) NOT NULL UNIQUE)") + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS + admins + (id INT PRIMARY KEY AUTO_INCREMENT, + t_id VARCHAR(20) NOT NULL UNIQUE, + speed_mode BOOLEAN DEFAULT '0') + """) + + + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS + channel_admins + (id INT PRIMARY KEY AUTO_INCREMENT, + channel_id INT, + admin_id INT, + role BOOLEAN, + FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE, + FOREIGN KEY (admin_id) REFERENCES admins(id) ON DELETE CASCADE) + """) + + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS + services + (id INT PRIMARY KEY AUTO_INCREMENT, + client_id VARCHAR(100) NOT NULL UNIQUE, + client_secret VARCHAR(100) NOT NULL UNIQUE, + access_token VARCHAR(100) NOT NULL UNIQUE, + channel_id INT NOT NULL UNIQUE, + FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE) + """) + + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS + speed_modes + (id INT PRIMARY KEY AUTO_INCREMENT, + admin_id INT NOT NULL UNIQUE, + service_id INT NOT NULL UNIQUE, + FOREIGN KEY (admin_id) REFERENCES admins(id) ON DELETE CASCADE, + FOREIGN KEY (service_id) REFERENCES services(id) ON DELETE CASCADE) + """) + + def getMy(self, admin_tid): + self.cursor.execute("SELECT * FROM admins WHERE t_id = %s", (admin_tid,)) + result = self.cursor.fetchone() + return { + "id": result[0], + "tid": result[1], + "speed": result[2] + } + + def getMyChannels(self, admin_tid): + self.cursor.execute("""SELECT c.*, a.*, ca.role FROM channels c + JOIN channel_admins ca ON ca.channel_id = c.id + JOIN admins a ON a.id = ca.admin_id + WHERE a.t_id=%s""", (admin_tid, )) + + result = self.cursor.fetchall() + + datas = [] + + for get in result: + data = { + "channel": { + "id": get[0], + "tid": get[1] + }, + "admin": { + "id": get[2], + "tid": get[3], + "speed": get[4] + }, + "channel_admin": { + "role": get[5] + } + } + datas.append(data) + return datas + + def getMyServices(self, admin_tid): + self.cursor.execute("""SELECT s.* FROM admins a + JOIN channel_admins ca ON a.id = ca.admin_id + JOIN services s ON s.channel_id = ca.channel_id + WHERE a.t_id = %s""", (admin_tid,)) + + result = self.cursor.fetchall() + + datas = [] + + for get in result: + data = { + "id": get[0], + "client_id": get[1], + "client_secret": get[2], + "access_token": get[3], + "channel_id": get[4] + } + datas.append(data) + return datas + + +class Channels: + def __init__(self, admin_tid): + self.admin_tid = admin_tid + + self.db = DatabaseConnect() + try: + self.db_admin = self.db.getMy(admin_tid) + except: + self.db_admin = [] + + if(len(self.db_admin) > 0): + self.db_datas = self.db.getMyChannels(admin_tid) + self.db_channels = [] + self.db_services = self.db.getMyServices(admin_tid) + + self.myChannelLists() + + def myChannelLists(self): + for get in self.db_datas: + self.db_channels.append(get["channel"]) + + +class Main(): + def __init__(self): + self.message = "" + self.Database = DatabaseConnect() + self.db = self.Database.db + self.cursor = self.Database.cursor + + def update_data(self, message): + self.data = Channels(message) + + if(len(self.data.db_admin) == 0): + self.cursor.execute("INSERT INTO admins(t_id) VALUES (%s)", (self.message_tid,)) + self.db.commit() + if(self.cursor.lastrowid): + self.data = Channels(message) + bot.send_message(self.message_cid, "Kullanıcı oluşturuldu.", parse_mode='Html') + + def status_post(self, service, message): + try: + mastodon = MastodonConnect(service) + result = mastodon.status_post(message) + if(result): + bot.send_message(self.message_cid, messages["sp_success"] % result.uri, parse_mode='Html') + else: + bot.send_message(self.message_cid, messages["sp_failed"], parse_mode='Html') + except: + bot.send_message(self.message_cid, "Mastodon'da gönderi paylaşılamadı.\n Lütfen bot yöneticisi ile iletişime geçiniz. bc1428@vuhuv.com", parse_mode='Html') + + def set_message(self, message): + self.message = message + self.message_cid = message.chat.id + self.message_tid = message.from_user.id + self.message_text = message.text + + self.update_data(self.message_tid) + + def start(self): + @bot.message_handler(commands=['start']) + def start(message): + self.set_message(message) + + c_cone = types.InlineKeyboardButton('Kanal Bağla', callback_data='kanal_bagla') + c_list = types.InlineKeyboardButton('Bağlı Kanalları Görüntüle', callback_data='kanallar') + c_del = types.InlineKeyboardButton('Kanal Sil', callback_data='kanal_sil') + + s_cone = types.InlineKeyboardButton('Servis Bağla', callback_data='servis_bagla') + s_list = types.InlineKeyboardButton('Bağlı Servisleri Görüntüle', callback_data='servisler') + s_del = types.InlineKeyboardButton('Servis Sil', callback_data='servis_sil') + + m_toggle = types.InlineKeyboardButton('Hızlı Mod', callback_data='hizli_mod') + + p_post = types.InlineKeyboardButton('Gönderi Paylaş', callback_data='gonderi_paylas') + + buttons = types.InlineKeyboardMarkup() + buttons.add(c_cone, c_list) + buttons.add(s_cone, s_list) + buttons.add(c_del, s_del) + buttons.add(m_toggle) + buttons.add(p_post) + + bot.send_message(self.message_cid, "Bir eylem seçiniz.", parse_mode='Html', reply_markup=buttons) + + @bot.callback_query_handler(func=lambda call: True) + def redirect(call): + command = call.data + if (command == "kanal_bagla"): + self.channelConnect() + elif (command == "kanallar"): + self.channelsList() + elif (command == "servis_bagla"): + self.serviceConnect() + elif (command == "servisler"): + self.servicesList() + elif (command == "hizli_mod"): + self.speedMode() + elif (command == "gonderi_paylas"): + self.statusPost() + elif (command == "kanal_sil"): + self.channelDelete() + elif (command == "servis_sil"): + self.serviceDelete() + bot.answer_callback_query(call.id) + self.set_message(message) + + @bot.callback_query_handler(func=lambda call: len(call.data.split(",")) > 1) + def redirect(call): + print(5) + split = call.data.split(",") + key = split[0] + if(key == "sc"): + value = split[2] + data = dict({ + "client_id": "", + "client_secret": "", + "access_token": "", + }) + + def step1(call): + data["client_id"] = call.text + bot.send_message(self.message_cid, messages["sc_step2"], parse_mode='Html') + bot.register_next_step_handler(self.message, step2) + + def step2(call): + data["client_secret"] = call.text + bot.send_message(self.message_cid, messages["sc_step3"], parse_mode='Html') + bot.register_next_step_handler(self.message, step3) + + def step3(call): + data["access_token"] = call.text + try: + mt = MastodonConnect(data) + account = mt.getAccountInfo() + if(account): + self.cursor.execute("INSERT INTO services(client_id, client_secret, access_token, channel_id) VALUES (%s, %s, %s, %s)", (data["client_id"], data["client_secret"], data["access_token"], value)) + self.db.commit() + if(self.cursor.lastrowid): + bot.send_message(self.message_cid, messages["sc_success"], parse_mode='Html') + except: + bot.send_message(self.message_cid, messages["sc_invalid"], parse_mode='Html') + + bot.send_message(self.message_cid, messages["sc_step1"], parse_mode='Html') + bot.register_next_step_handler(self.message, step1) + + elif(key == "sm"): + admin_id = self.data.db_datas[0]['admin']['id'] + channel_id = split[2] + self.cursor.execute("UPDATE admins SET speed_mode=%s WHERE id=%s", (channel_id, admin_id)) + self.db.commit() + if(self.cursor.rowcount): + bot.send_message(self.message_cid, messages["sm_active_success"], parse_mode='Html') + + elif(key == "sp"): + bot.send_message(self.message_cid, "Paylaşmak istediğiniz gönderiyi gönderiniz.", parse_mode='Html') + def step1(call): + service_id = split[2] + for service in self.data.db_services: + self.status_post(service, call.text) + try: + self.cursor.execute("SELECT * FROM channels c JOIN channel_admins ca ON ca.channel_id = c.id WHERE c.id = %s", (service["channel_id"],)) + result = self.cursor.fetchone() + if(result): + bot.send_message(result[1], call.text) + bot.send_message(self.message_cid, "Telegram kanalınızda gönderi paylaşıldı.", parse_mode='Html') + else: + bot.send_message(self.message_cid, "Kanal bulunamadı.") + except: + bot.send_message(self.message_cid, messages["sm_active_success"], parse_mode='Html') + bot.register_next_step_handler(self.message, step1) + + elif (key == "cd"): + try: + value = split[2] + self.cursor.execute("DELETE FROM channels WHERE id=%s", (value,)) + if(self.cursor.rowcount): + self.db.commit() + bot.send_message(self.message_cid, f"{value} id adresli kanal silindi.") + except: + bot.send_message(self.message_cid, f"Bir hata oluştu. Yöneticiye ulaşın.") + + elif (key == "sd"): + value = split[2] + self.cursor.execute("DELETE FROM services WHERE id=%s", (value,)) + try: + if(self.cursor.rowcount): + self.db.commit() + bot.send_message(self.message_cid, f"{value} id adresli servis silindi.") + except: + bot.send_message(self.message_cid, f"Bir hata oluştu. Yöneticiye ulaşın.") + + def textStart(self): + @bot.message_handler(content_types=["text"]) + def textStart(message): + self.set_message(message) + speed_mode = self.data.db_admin["speed"] + if(speed_mode): + services = self.data.db_services + for service in services: + if(service["channel_id"] == speed_mode): + self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"],)) + result = self.cursor.fetchone() + if(result): + bot.send_message(result[1], message.text) + bot.send_message(message.chat.id, "Telegram kanalınızda gönderi paylaşıldı.", parse_mode='Html') + else: + bot.send_message(message.chat.id, "Telegram kanalınız bulunamadı.", parse_mode='Html') + + mastodon = MastodonConnect(service) + result = mastodon.status_post(message.text) + if(result): + bot.send_message(message.chat.id, messages["sp_success"] % result.uri, parse_mode='Html') + else: + bot.send_message(message.chat.id, messages["sp_failed"], parse_mode='Html') + else: + print(99) + + def channelConnect(self): + bot.send_message(self.message_cid, messages["ccon_username"], parse_mode='Html') + def step_one(message): + message_text = message.text + isAt = message_text.startswith("@") # Atınlan mesajın bir kullanıcı adı olup olmadığını test eder. + if(isAt): + try: + chat = bot.get_chat(message.text) # Kanalın mevcut olup olmadığını sorgular. + if(chat): # Kanal mevcutsa burası çalışır. + try: + try: + isMember = bot.get_chat_member(chat.id, bot_tid) # Botun kanala üye olup olmadığını sorgular + if(isMember): # Bot kanala üyeyse burası çalışır. + self.cursor.execute("INSERT INTO channels(t_id) VALUES (%s)", (chat.id,)) + self.cursor.execute("INSERT INTO channel_admins(channel_id, admin_id, role) VALUES (%s, %s, %s)", (self.cursor.lastrowid, self.data.db_admin["id"], True)) + self.db.commit() + if(self.cursor.lastrowid): + bot.send_message(self.message_cid, messages["ccon_success"], parse_mode='Html') + else: # Bot kanala üye değilse çalışır. + bot.send_message(self.message_cid, messages["ccon_warn"], parse_mode='Html') + except: + bot.send_message(self.message_cid, messages["ccon_warn"], parse_mode='Html') + except: # Üyelik doğrulaması sırasında kodda ata keşfedilirse bursı çalışır. + bot.send_message(self.message_cid, messages["ccon_alread_username"], parse_mode='Html') + except: # Kanalın mevcutluğu denetlenirken bir hata ile karşılaşılırsa bursı çalışır. + bot.send_message(self.message_cid, messages["ccon_none_username"], parse_mode='Html') + bot.register_next_step_handler(self.message, step_one) # Kanal kaydının birinci adımını çalıştırır. Bu adımda kanalın mevcut olup olmadığı ve botun kanala üye olup olmadığı sorgulanarak veritabanına kayıt işlemi gerçekleştirilir. + + def channelDelete(self): + channels = self.data.db_channels + buttons = types.InlineKeyboardMarkup() + print(self.data) + if(len(channels) > 0): + for channel in channels: + chat = bot.get_chat(channel["tid"]) + button = types.InlineKeyboardButton(f"{chat.title} (@{chat.username})", callback_data=f'cd,step-1,{channel["id"]}') + buttons.add(button) + bot.send_message(self.message_cid, "Silmek istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons) + else: + bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html') + + def channelsList(self): # Hesaba bağlı olan kanlları listelemek için bir fontsiyon + text = "Hesabınıza bağlı olan kanallar:\n" + + count = 1 + for get in self.data.db_channels: + chat = bot.get_chat(get["tid"]) + text += f'{count}. {chat.title} (@{chat.username})\n' + count += 1 + + bot.send_message(self.message_cid, text, parse_mode='Html') + + def serviceConnect(self): + channels = self.data.db_channels + buttons = types.InlineKeyboardMarkup() + + if(len(channels) > 0): + for channel in channels: + chat = bot.get_chat(channel["tid"]) + button = types.InlineKeyboardButton(f"{chat.title} (@{chat.username})", callback_data=f'sc,step-1,{channel["id"]}') + buttons.add(button) + bot.send_message(self.message_cid, "Servisi bağlamak istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons) + else: + bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html') + + def serviceDelete(self): + services = self.data.db_services + buttons = types.InlineKeyboardMarkup() + + if(len(services) > 0): + for service in services: + # Mastodon + mastodon = MastodonConnect({ + 'client_id': service["client_id"], + 'client_secret': service["client_secret"], + 'access_token': service["access_token"] + }).getAccountInfo() + + # Telegram + self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"],)) + chat = bot.get_chat(self.cursor.fetchone()[1]) + + button = types.InlineKeyboardButton(f"M: {mastodon['display_name']} (@{mastodon['username']}) - T: {chat.title} (@{chat.username})", callback_data=f'sd,step-1,{service["id"]}') + buttons.add(button) + bot.send_message(self.message_cid, "Silmek istediğiniz servisi seçiniz.", parse_mode='Html', reply_markup=buttons) + else: + bot.send_message(self.message_cid, "Hesabınıza bağlı servis bulunamadı.", parse_mode='Html') + + def servicesList(self): + text = "Bağlı servisler\n--------------\n" + services = self.data.db_services + if(len(services)): + for service in services: + mt = MastodonConnect({ + 'client_id': service["client_id"], + 'client_secret': service["client_secret"], + 'access_token': service["access_token"] + }) + + self.cursor.execute("SELECT * FROM channels WHERE id=%s", (service["channel_id"], )) + resultChannel = self.cursor.fetchone() + + if(resultChannel): + text += f"SERVICE ID {service['id']}\n\n" + text += f"Mastodon\n" + try: + account = mt.getAccountInfo() + # text += f"İsim: {account['display_name']}" + text += f"Kullanıcı Adı: {account['username']}\n\n" + except: + text += f"Mastodon hesabında bağlantı hatası var. Lütfen kontrol ediniz.\n\n" + text += f"client_id: {service['client_id']}\n" + text += f"client_secret: {service['client_secret']}\n" + text += f"access_token: {service['access_token']}\n" + + channel_tid = resultChannel[1] + channel = bot.get_chat(channel_tid) + + text += f"\nTelegram\n" + text += f"İsim: {channel.title}\n" + text += f"Kullanıcı Adı: @{channel.username}\n" + text += "--------------" + + bot.send_message(self.message_cid, text, parse_mode='Html') + else: + bot.send_message(self.message_cid, "Bağlı servis kaydı bulunmadı.", parse_mode='Html') + + def speedMode(self): + speed_mode = self.data.db_admin["speed"] + if(speed_mode == 0): + buttons = types.InlineKeyboardMarkup() + if(len(self.data.db_services) > 0): + for service in self.data.db_services: + self.cursor.execute("SELECT t_id, id FROM channels WHERE id=%s", (service['channel_id'],)) + channel = self.cursor.fetchone() + if(channel): + result = bot.get_chat(channel[0]) + button = types.InlineKeyboardButton(f"{result.title} (@{result.username})", callback_data=f'sm,channel_select,{channel[1]}') + buttons.add(button) + bot.send_message(self.message_cid, "Hızlı moda bağlamak istediğiniz kanalı seçiniz.", parse_mode='Html', reply_markup=buttons) + else: + bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html') + elif(speed_mode > 0): + admin_id = self.data.db_admin["id"] + self.cursor.execute("UPDATE admins SET speed_mode=%s WHERE id=%s", (0, admin_id)) + self.db.commit() + bot.send_message(self.message_cid, messages["sm_disable_success"], parse_mode='Html') + + def statusPost(self): + buttons = types.InlineKeyboardMarkup() + if(len(self.data.db_services) > 0): + for service in self.data.db_services: + print(service) + self.cursor.execute("SELECT t_id, id FROM channels WHERE id=%s", (service['channel_id'],)) + channel = self.cursor.fetchone() + if(channel): + result = bot.get_chat(channel[0]) + button = types.InlineKeyboardButton(f"{result.title} (@{result.username})", callback_data=f"sp,sp_post,{service['id']}") + buttons.add(button) + bot.send_message(self.message_cid, "Gönderi paylaşacağınız servisi seçiniz.", parse_mode='Html', reply_markup=buttons) + else: + bot.send_message(self.message_cid, "Hesabınıza bağlı kanal bulunamadı.", parse_mode='Html') + + +if __name__ == "__main__": + main = Main() + main.start() + main.textStart() + +bot.polling() \ No newline at end of file