import os import time import logging import subprocess import json from dotenv import load_dotenv from pyrogram import Client, filters from pyrogram.errors import FloodWait import asyncio from utils import ( get_local_videos, process_local_thumbnail, get_batch_directories, get_batch_videos ) load_dotenv() BOT_TOKEN = os.getenv("BOT_TOKEN") API_ID = int(os.getenv("API_ID")) API_HASH = os.getenv("API_HASH") if not BOT_TOKEN or not API_ID or not API_HASH: raise ValueError("Please configure BOT_TOKEN, API_ID and API_HASH in .env") USER_SESSION = "user_session" SESSION_DIR = os.path.join(os.path.dirname(__file__), "sessions") os.makedirs(SESSION_DIR, exist_ok=True) TEMP_DIR = os.path.join(os.path.dirname(__file__), "temp") os.makedirs(TEMP_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) Bot = Client( os.path.join(SESSION_DIR, "Thumb-Bot"), bot_token=BOT_TOKEN, api_id=API_ID, api_hash=API_HASH ) User = Client( os.path.join(SESSION_DIR, USER_SESSION), api_id=API_ID, api_hash=API_HASH ) def remove_extension(name: str) -> str: return os.path.splitext(name)[0] def get_video_dimensions(path: str): cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "json", path ] try: result = subprocess.run( cmd, capture_output=True, text=True, check=False ) if not result.stdout: return 1280, 720 data = json.loads(result.stdout) streams = data.get("streams") if not streams: return 1280, 720 width = streams[0].get("width", 1280) height = streams[0].get("height", 720) return int(width), int(height) except Exception: return 1280, 720 def get_video_duration(path: str): cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ] try: result = subprocess.run(cmd, capture_output=True, text=True) return int(float(result.stdout.strip())) except: return 0 async def send_video_safe( client, chat_id, video_path, caption=None, thumb=None ): width, height = get_video_dimensions(video_path) duration = get_video_duration(video_path) while True: try: await client.send_video( chat_id=chat_id, video=video_path, caption=caption, thumb=thumb, width=width, height=height, duration=duration, supports_streaming=True ) break except FloodWait as e: logger.warning( f"FloodWait while uploading {os.path.basename(video_path)}. " f"Sleeping {e.value + 1} seconds..." ) await asyncio.sleep(e.value + 1) @Bot.on_message(filters.command("start")) async def start(_, m): await m.reply( "Video uploader bot ready.\n\n" "Commands:\n" "/send_video - Upload videos from single folder\n" "/batch - Upload videos from batch folders" ) @Bot.on_message(filters.command("send_video")) async def send_local(_, m): status = await m.reply("Scanning local videos...") videos, _ = get_local_videos() if not videos: await status.edit("No videos found.") return for v in videos: msg = await m.reply(f"Uploading {v['filename']}...") thumb = None if v["thumb_path"]: thumb = process_local_thumbnail(v["thumb_path"]) await send_video_safe( User, m.chat.id, v["video_path"], caption=remove_extension(v["filename"]), thumb=thumb ) await msg.delete() if thumb and os.path.exists(thumb): os.remove(thumb) await status.edit("All videos uploaded.") @Bot.on_message(filters.command("batch")) async def batch(_, m): status = await m.reply("Scanning batch folders...") batches = get_batch_directories() if not batches: await status.edit("No batch folders found.") return for batch_dir in batches: videos = get_batch_videos(batch_dir) if not videos: continue thumb = None if videos[0]["thumb_path"]: thumb = process_local_thumbnail(videos[0]["thumb_path"]) for v in videos: msg = await m.reply(f"Uploading {v['filename']}...") await send_video_safe( User, m.chat.id, v["video_path"], caption=remove_extension(v["filename"]), thumb=thumb ) await msg.delete() if thumb and os.path.exists(thumb): os.remove(thumb) await status.edit("Batch upload complete.") print("Bot started successfully") with User: Bot.run()