Python script for downloading Telegram videos

pythontelegramautomationdownloadscript

Recently I needed a way to download all the videos from a telegram channel that I came across, and there was no way I wanted to open the web version and click on the download button for all the hundreds of videos one by one.

So I wrote a script to do it for me.

There are some pre-requisites for the script to work, you need to get the Telegram API keys. This is how to get them :

Getting your API Keys: (API ID / hash pair):

  • Visit https://my.telegram.org/apps and log in with your Telegram Account.
  • Fill out the form to register a new Telegram application. Done! The API key consists of two parts: api_id and api_hash.

Prerequisites

Before running the script, you'll need:

  1. Python 3.7+ installed on your system
  2. Required Python packages:
    • telethon
    • aiofiles
    • tqdm
    • python-dotenv
    • requests

Installation

  1. Install the required packages:

    pip install telethon aiofiles tqdm python-dotenv requests
    
  2. Create a .env file in the same directory as the script with your Telegram API credentials:

    TELEGRAM_API_ID=your_api_id
    TELEGRAM_API_HASH=your_api_hash
    TELEGRAM_CHANNEL=your_channel_id_or_username
    TELEGRAM_BOT_API_KEY=your_bot_api_key  # For notifications
    TELEGRAM_CHAT_ID=your_chat_id  # Where notifications will be sent
    
  3. To get your API credentials:

  4. For notifications:

    • Create a Telegram bot via @BotFather
    • Get your chat ID (you can use @userinfobot)

Basic Usage

Run the script with default settings:

python telegram_downloader.py

This will download videos larger than 100MB from the specified channel to the default videos_output folder.

Command Line Options

The script supports several command-line arguments to customize its behavior:

python telegram_downloader.py --channel @channelname --output my_videos --min-size 200 --concurrent 10

Available options:

  • --channel: Channel username or ID (default: from .env file)
  • --output: Output folder (default: videos_output)
  • --min-size: Minimum video size in MB (default: 100)
  • --concurrent: Maximum concurrent downloads (default: 15)
  • --retries: Maximum number of retry attempts (default: 5)
  • --retry-delay: Initial retry delay in seconds (default: 5)
  • --max-retry-delay: Maximum retry delay in seconds (default: 40)
  • --session: Session name for Telegram authentication (default: anon)
  • --disk-threshold: Disk space threshold in GB (default: 10)

Examples

  1. Download only very large videos:

    python telegram_downloader.py --min-size 500
    
  2. Reduce concurrency for slower connections:

    python telegram_downloader.py --concurrent 5
    
  3. Increase disk space threshold for low-storage systems:

    python telegram_downloader.py --disk-threshold 20
    
  4. Download from a specific channel with custom output:

    python telegram_downloader.py --channel @channelname --output channel_videos
    

Monitoring

Once running, the script will:

  1. Log all activity to console and a log file in the logs directory
  2. Send Telegram notifications about progress if configured
  3. Show progress bars for each download

Session Management

The script creates a Telethon session file (default: anon.session) that stores your authentication information. This allows you to run the script multiple times without re-authenticating.

Interrupting and Resuming

  • You can interrupt the script at any time with Ctrl+C
  • When restarted, it will:
    • Skip already downloaded videos
    • Resume partial downloads from where they left off

This makes it resilient to network interruptions or system restarts.

import os
import sys
import asyncio
import argparse
import aiofiles
import hashlib
import shutil
import requests
import urllib.parse
import traceback
from telethon import TelegramClient, errors
from telethon.tl.types import InputMessagesFilterVideo
import logging
from tqdm.asyncio import tqdm
import dotenv
from datetime import datetime

# Load environment variables from .env file
dotenv.load_dotenv()

# Get credentials from environment variables with fallbacks
api_id = os.getenv('TELEGRAM_API_ID')
api_hash = os.getenv('TELEGRAM_API_HASH')
channel_username = os.getenv('TELEGRAM_CHANNEL', '-100xxxx')

# Telegram notification settings
TELEGRAM_BOT_API_KEY = os.getenv('TELEGRAM_BOT_API_KEY')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID')

# Configuration with defaults (can be overridden by command line arguments)
DEFAULT_OUTPUT_FOLDER = 'videos_output'
DEFAULT_MIN_VIDEO_SIZE = 100 * 1024 * 1024  # 100MB in bytes
DEFAULT_MAX_CONCURRENT = 15
DEFAULT_MAX_RETRIES = 5
DEFAULT_RETRY_DELAY = 5  # in seconds
DEFAULT_MAX_RETRY_DELAY = 40  # in seconds
DEFAULT_DISK_SPACE_THRESHOLD = 10  # GB

# Global variables for tracking downloads
downloaded_files_count = 0
downloaded_files_list = []
download_errors = []

def send_telegram_notification(message):
    """Send a notification message to Telegram"""
    try:
        text_message = urllib.parse.quote(message)
        url = f"https://api.telegram.org/bot{TELEGRAM_BOT_API_KEY}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={text_message}"
        response = requests.request("GET", url)
        if response.status_code == 200:
            logger.info("Telegram notification sent successfully")
        else:
            logger.error(f"Failed to send Telegram notification. Status code: {response.status_code}")
    except Exception as e:
        logger.error(f"Error sending Telegram notification: {str(e)}")

def check_disk_space(path, threshold_gb=10):
    """Check if available disk space is below threshold"""
    try:
        total, used, free = shutil.disk_usage(path)
        free_gb = free / (1024 * 1024 * 1024)  # Convert bytes to GB
        
        if free_gb < threshold_gb:
            message = f"⚠️ WARNING: Low disk space! Only {free_gb:.2f} GB remaining on server (threshold: {threshold_gb} GB)"
            logger.warning(message)
            send_telegram_notification(message)
            return False
        return True
    except Exception as e:
        error_msg = f"Error checking disk space: {str(e)}"
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")
        return True  # Continue despite error

def notify_downloaded_files():
    """Send notification with the list of downloaded files"""
    global downloaded_files_count, downloaded_files_list
    
    if downloaded_files_count > 0:
        files_str = "\n".join(downloaded_files_list[-10:])  # Last 10 files
        message = f"✅ Downloaded {downloaded_files_count} videos.\nLast {min(10, len(downloaded_files_list))} files:\n{files_str}"
        send_telegram_notification(message)
        downloaded_files_list = []  # Reset the list
        downloaded_files_count = 0  # Reset the counter

# Parse command line arguments
def parse_args():
    parser = argparse.ArgumentParser(description='Download large videos from a Telegram channel')
    parser.add_argument('--channel', type=str, help='Telegram channel username or ID', default=channel_username)
    parser.add_argument('--output', type=str, help='Output folder for downloaded videos', default=DEFAULT_OUTPUT_FOLDER)
    parser.add_argument('--min-size', type=int, help='Minimum video size in MB', default=DEFAULT_MIN_VIDEO_SIZE // (1024 * 1024))
    parser.add_argument('--concurrent', type=int, help='Maximum concurrent downloads', default=DEFAULT_MAX_CONCURRENT)
    parser.add_argument('--retries', type=int, help='Maximum number of retries', default=DEFAULT_MAX_RETRIES)
    parser.add_argument('--retry-delay', type=int, help='Initial retry delay in seconds', default=DEFAULT_RETRY_DELAY)
    parser.add_argument('--max-retry-delay', type=int, help='Maximum retry delay in seconds', default=DEFAULT_MAX_RETRY_DELAY)
    parser.add_argument('--session', type=str, help='Session name', default='anon')
    parser.add_argument('--disk-threshold', type=int, help='Disk space threshold in GB', default=DEFAULT_DISK_SPACE_THRESHOLD)
    return parser.parse_args()

# Setup logging
def setup_logging():
    log_dir = 'logs'
    os.makedirs(log_dir, exist_ok=True)
    log_filename = os.path.join(log_dir, f'telegram_downloader_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_filename),
            logging.StreamHandler(sys.stdout)
        ]
    )
    return logging.getLogger(__name__)

async def load_downloaded_videos(downloaded_videos_file):
    try:
        if os.path.exists(downloaded_videos_file):
            async with aiofiles.open(downloaded_videos_file, 'r') as f:
                content = await f.read()
                return set(line.strip() for line in content.splitlines())
        return set()
    except Exception as e:
        error_msg = f"Error loading downloaded videos: {str(e)}"
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")
        return set()

async def save_downloaded_video(video_id, downloaded_videos_file):
    try:
        async with aiofiles.open(downloaded_videos_file, 'a') as f:
            await f.write(f"{video_id}\n")
    except Exception as e:
        error_msg = f"Error saving downloaded video ID {video_id}: {str(e)}"
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")

async def refetch_message(client, message_id, channel_entity):
    try:
        return await client.get_messages(channel_entity, ids=message_id)
    except Exception as e:
        error_msg = f"Error refetching message {message_id}: {str(e)}"
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")
        return None

async def verify_file_integrity(file_path, expected_size):
    """Verify file integrity by checking file size"""
    try:
        file_stat = os.stat(file_path)
        return file_stat.st_size == expected_size
    except Exception as e:
        error_msg = f"Error verifying file integrity: {str(e)}"
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")
        return False

async def download_video(client, message, output_folder, downloaded_videos_file, max_retries, initial_retry_delay, max_retry_delay, disk_threshold):
    global downloaded_files_count, downloaded_files_list, download_errors
    
    # Check disk space before downloading
    if not check_disk_space(output_folder, disk_threshold):
        return
    
    original_video_name = f"{message.date.strftime('%Y-%m-%d_%H-%M-%S')}_{message.file.name}"
    temp_video_name = original_video_name + ".temp"
    temp_video_path = os.path.join(output_folder, temp_video_name)
    final_video_path = os.path.join(output_folder, original_video_name)
    
    logger.info(f"Downloading {message.id}: {original_video_name}...")
    
    # Check if temp file exists and get its size
    initial_offset = 0
    if os.path.exists(temp_video_path):
        initial_offset = os.path.getsize(temp_video_path)
        logger.info(f"Resuming download from {initial_offset} bytes")
    
    # Create progress bar
    pbar = tqdm(
        total=message.media.document.size,
        initial=initial_offset,
        unit='B',
        unit_scale=True,
        desc=f"{original_video_name[:30]}..." if len(original_video_name) > 30 else original_video_name
    )
    
    retry_count = 0
    wait_time = initial_retry_delay
    
    while retry_count < max_retries:
        try:
            # Check disk space before each attempt
            if not check_disk_space(output_folder, disk_threshold):
                return
                
            offset = initial_offset
            async with aiofiles.open(temp_video_path, 'ab') as file:
                async for chunk in client.iter_download(message.media.document, offset=offset):
                    await file.write(chunk)
                    offset += len(chunk)
                    pbar.update(len(chunk))
            
            # Verify file integrity
            if await verify_file_integrity(temp_video_path, message.media.document.size):
                os.rename(temp_video_path, final_video_path)
                await save_downloaded_video(message.id, downloaded_videos_file)
                logger.info(f"Successfully downloaded and verified {message.id}: {original_video_name}")
                
                # Update download tracking
                downloaded_files_count += 1
                downloaded_files_list.append(original_video_name)
                
                # Check if we should send notification (every 10 downloads)
                if downloaded_files_count >= 10:
                    notify_downloaded_files()
                
                break
            else:
                logger.warning(f"File size mismatch for {message.id}: {original_video_name}. Retrying...")
                raise errors.TimedOutError()
                
        except errors.FileReferenceExpiredError:
            logger.info(f"File reference expired for {message.id}: {original_video_name}. Refetching message...")
            message = await refetch_message(client, message.id, message.chat_id)
            if message:
                continue
            else:
                error_msg = f"Failed to refetch message for {message.id}: {original_video_name}. Skipping..."
                logger.error(error_msg)
                download_errors.append(error_msg)
                send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg}")
                break
                
        except errors.TimedOutError:
            retry_count += 1
            error_msg = f"Timeout on {message.id}: {original_video_name}. Retrying in {wait_time} seconds..."
            logger.error(error_msg)
            await asyncio.sleep(wait_time)
            wait_time = min(wait_time * 2, max_retry_delay)
            
            if retry_count == max_retries:
                download_errors.append(error_msg)
                send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg} (After {max_retries} retries)")
            
        except Exception as e:
            retry_count += 1
            error_msg = f"Error downloading {message.id}: {original_video_name}. Error: {str(e)}. Retrying in {wait_time} seconds..."
            logger.error(error_msg)
            await asyncio.sleep(wait_time)
            wait_time = min(wait_time * 2, max_retry_delay)
            
            if retry_count == max_retries:
                download_errors.append(error_msg)
                send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg} (After {max_retries} retries)")
            
    if retry_count == max_retries:
        logger.error(f"Failed to download {message.id}: {original_video_name} after {max_retries} retries.")
        
    pbar.close()

async def main():
    args = parse_args()
    
    if not api_id or not api_hash:
        error_msg = "API credentials not found. Please set TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables."
        logger.error(error_msg)
        send_telegram_notification(f"⚠️ ERROR: {error_msg}")
        print("\nTo set up your environment:")
        print("1. Create a .env file in the same directory as this script")
        print("2. Add the following lines to the .env file:")
        print("   TELEGRAM_API_ID=your_api_id")
        print("   TELEGRAM_API_HASH=your_api_hash")
        print("   TELEGRAM_CHANNEL=your_channel_id")
        print("3. Replace the values with your actual credentials")
        print("\nYou can get API credentials from https://my.telegram.org/apps")
        return
    
    # Convert min_size from MB to bytes
    min_video_size = args.min_size * 1024 * 1024
    
    # Setup output folder
    if not os.path.exists(args.output):
        os.makedirs(args.output)
    
    # Setup downloaded videos tracking
    downloaded_videos_file = os.path.join(args.output, 'downloaded_videos.txt')
    downloaded_videos = await load_downloaded_videos(downloaded_videos_file)
    
    # Create semaphore for concurrency control
    semaphore = asyncio.Semaphore(args.concurrent)
    
    # Create and connect client
    client = TelegramClient(args.session, api_id, api_hash)
    
    try:
        await client.start()
        
        # Validate channel
        try:
            channel_id = args.channel
            if not channel_id.startswith('-100') and not channel_id.isdigit():
                # Try to resolve username
                channel_entity = await client.get_entity(channel_id)
                channel_id = channel_entity.id
            else:
                channel_id = int(channel_id)
                channel_entity = await client.get_entity(channel_id)
        except Exception as e:
            error_msg = f"Error resolving channel {args.channel}: {str(e)}"
            logger.error(error_msg)
            send_telegram_notification(f"⚠️ ERROR: {error_msg}")
            return
        
        # Initial disk space check
        check_disk_space(args.output, args.disk_threshold)
        
        logger.info(f"Downloading videos larger than {args.min_size}MB from channel {channel_id}...")
        send_telegram_notification(f"🎬 Started downloading videos larger than {args.min_size}MB from channel {channel_id}")
        
        # Collect all video messages
        messages = []
        async for message in client.iter_messages(channel_entity, filter=InputMessagesFilterVideo()):
            if str(message.id) not in downloaded_videos and hasattr(message.media, 'document') and message.media.document.size >= min_video_size:
                messages.append(message)
        
        total_videos = len(messages)
        logger.info(f"Found {total_videos} videos to download")
        send_telegram_notification(f"🔍 Found {total_videos} videos to download")
        
        # Download videos with concurrency control
        async def download_with_semaphore(message):
            async with semaphore:
                await download_video(
                    client, 
                    message, 
                    args.output, 
                    downloaded_videos_file, 
                    args.retries, 
                    args.retry_delay, 
                    args.max_retry_delay,
                    args.disk_threshold
                )
        
        tasks = [download_with_semaphore(message) for message in messages]
        await asyncio.gather(*tasks)
        
        # Send final notification if there are any remaining downloads
        if downloaded_files_count > 0:
            notify_downloaded_files()
            
        # Notify about any errors
        if download_errors:
            error_count = len(download_errors)
            error_msg = f"⚠️ Completed with {error_count} errors. Check logs for details."
            logger.warning(error_msg)
            send_telegram_notification(error_msg)
        
        completion_msg = f"✅ Finished downloading videos from channel {channel_id}"
        logger.info(completion_msg)
        send_telegram_notification(completion_msg)
        
    except KeyboardInterrupt:
        interrupt_msg = "⚠️ Interrupted by user. Exiting..."
        logger.warning(interrupt_msg)
        send_telegram_notification(interrupt_msg)
    except Exception as e:
        crash_msg = f"💥 SCRIPT CRASHED: {str(e)}\n{traceback.format_exc()}"
        logger.critical(crash_msg)
        send_telegram_notification(crash_msg)
    finally:
        await client.disconnect()

if __name__ == "__main__":
    # Setup logger
    logger = setup_logging()
    
    # Send startup notification
    send_telegram_notification("🚀 Telegram Video Downloader script started")
    
    # Run the async main function
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.warning("Interrupted by user. Exiting...")
        send_telegram_notification("⚠️ Script stopped by keyboard interrupt")
    except Exception as e:
        crash_msg = f"💥 FATAL ERROR: {str(e)}\n{traceback.format_exc()}"
        logger.critical(crash_msg)
        send_telegram_notification(crash_msg)


There are many things that the script does for me, some of them are :

  1. Asynchronous Downloads: The script uses Python's asyncio library to perform asynchronous downloads. This means it can handle multiple download tasks simultaneously without blocking the main execution thread, leading to more efficient use of resources and faster overall download times.

  2. Semaphore for Concurrent Download Limit: A semaphore is utilized to limit the number of concurrent downloads. This prevents the script from overloading the network or system resources by restricting the number of files being downloaded at the same time.

  3. Telethon for Telegram API Interaction: The script leverages the Telethon library, a powerful Python toolkit for interacting with Telegram's API. It allows for seamless access and control over Telegram channels, messages, and media files.

  4. Resumable Downloads: One of the notable features of the script is its ability to resume incomplete downloads. If a download is interrupted due to a network issue or a program restart, the script can resume from where it left off, avoiding the need to re-download the entire file.

  5. Exponential Backoff on Retries: The script implements an exponential backoff strategy for retries. If a download fails due to a timeout, it waits for a certain period before retrying, and this wait time increases exponentially with each failed attempt. This strategy helps in efficiently managing network issues.

  6. Error Handling: The script includes robust error handling mechanisms, particularly for network-related issues like timeouts. It logs errors and handles them gracefully, ensuring the script's stability even in less-than-ideal network conditions.

  7. Progress Tracking and Display: Progress of each download is tracked and displayed in the console. This feature provides real-time feedback on the status of each download, including which files are currently being downloaded and their progress percentage.

  8. Filtering and Downloading Large Files: The script is specifically designed to filter and download large video files from Telegram channels. It allows setting a minimum file size threshold to target larger media files.

  9. Organized File Management: Downloaded files are systematically renamed and organized. The script initially saves files with a .temp extension during the download process and renames them to their original names once the download is complete.

  10. Customizable Parameters: Key parameters such as the number of concurrent downloads, minimum file size for downloads, and retry behavior are easily customizable, making the script adaptable to different user requirements and network conditions.