Python script for downloading Telegram videos
Recently I needed a way to download all the videos from a telegram channel that I came across, and there was no way I wanted to open the web version and click on the download button for all the hundreds of videos one by one.
So I wrote a script to do it for me.
There are some pre-requisites for the script to work, you need to get the Telegram API keys. This is how to get them :
Getting your API Keys: (API ID / hash pair):
- Visit https://my.telegram.org/apps and log in with your Telegram Account.
- Fill out the form to register a new Telegram application. Done! The API key consists of two parts: api_id and api_hash.
Prerequisites
Before running the script, you'll need:
- Python 3.7+ installed on your system
- Required Python packages:
- telethon
- aiofiles
- tqdm
- python-dotenv
- requests
Installation
-
Install the required packages:
pip install telethon aiofiles tqdm python-dotenv requests
-
Create a
.env
file in the same directory as the script with your Telegram API credentials:TELEGRAM_API_ID=your_api_id TELEGRAM_API_HASH=your_api_hash TELEGRAM_CHANNEL=your_channel_id_or_username TELEGRAM_BOT_API_KEY=your_bot_api_key # For notifications TELEGRAM_CHAT_ID=your_chat_id # Where notifications will be sent
-
To get your API credentials:
- Visit https://my.telegram.org/apps
- Create a new application
- Copy the API ID and API Hash
-
For notifications:
- Create a Telegram bot via @BotFather
- Get your chat ID (you can use @userinfobot)
Basic Usage
Run the script with default settings:
python telegram_downloader.py
This will download videos larger than 100MB from the specified channel to the default videos_output
folder.
Command Line Options
The script supports several command-line arguments to customize its behavior:
python telegram_downloader.py --channel @channelname --output my_videos --min-size 200 --concurrent 10
Available options:
--channel
: Channel username or ID (default: from .env file)--output
: Output folder (default: videos_output)--min-size
: Minimum video size in MB (default: 100)--concurrent
: Maximum concurrent downloads (default: 15)--retries
: Maximum number of retry attempts (default: 5)--retry-delay
: Initial retry delay in seconds (default: 5)--max-retry-delay
: Maximum retry delay in seconds (default: 40)--session
: Session name for Telegram authentication (default: anon)--disk-threshold
: Disk space threshold in GB (default: 10)
Examples
-
Download only very large videos:
python telegram_downloader.py --min-size 500
-
Reduce concurrency for slower connections:
python telegram_downloader.py --concurrent 5
-
Increase disk space threshold for low-storage systems:
python telegram_downloader.py --disk-threshold 20
-
Download from a specific channel with custom output:
python telegram_downloader.py --channel @channelname --output channel_videos
Monitoring
Once running, the script will:
- Log all activity to console and a log file in the
logs
directory - Send Telegram notifications about progress if configured
- Show progress bars for each download
Session Management
The script creates a Telethon session file (default: anon.session
) that stores your authentication information. This allows you to run the script multiple times without re-authenticating.
Interrupting and Resuming
- You can interrupt the script at any time with Ctrl+C
- When restarted, it will:
- Skip already downloaded videos
- Resume partial downloads from where they left off
This makes it resilient to network interruptions or system restarts.
import os
import sys
import asyncio
import argparse
import aiofiles
import hashlib
import shutil
import requests
import urllib.parse
import traceback
from telethon import TelegramClient, errors
from telethon.tl.types import InputMessagesFilterVideo
import logging
from tqdm.asyncio import tqdm
import dotenv
from datetime import datetime
# Load environment variables from .env file
dotenv.load_dotenv()
# Get credentials from environment variables with fallbacks
api_id = os.getenv('TELEGRAM_API_ID')
api_hash = os.getenv('TELEGRAM_API_HASH')
channel_username = os.getenv('TELEGRAM_CHANNEL', '-100xxxx')
# Telegram notification settings
TELEGRAM_BOT_API_KEY = os.getenv('TELEGRAM_BOT_API_KEY')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID')
# Configuration with defaults (can be overridden by command line arguments)
DEFAULT_OUTPUT_FOLDER = 'videos_output'
DEFAULT_MIN_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB in bytes
DEFAULT_MAX_CONCURRENT = 15
DEFAULT_MAX_RETRIES = 5
DEFAULT_RETRY_DELAY = 5 # in seconds
DEFAULT_MAX_RETRY_DELAY = 40 # in seconds
DEFAULT_DISK_SPACE_THRESHOLD = 10 # GB
# Global variables for tracking downloads
downloaded_files_count = 0
downloaded_files_list = []
download_errors = []
def send_telegram_notification(message):
"""Send a notification message to Telegram"""
try:
text_message = urllib.parse.quote(message)
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_API_KEY}/sendMessage?chat_id={TELEGRAM_CHAT_ID}&text={text_message}"
response = requests.request("GET", url)
if response.status_code == 200:
logger.info("Telegram notification sent successfully")
else:
logger.error(f"Failed to send Telegram notification. Status code: {response.status_code}")
except Exception as e:
logger.error(f"Error sending Telegram notification: {str(e)}")
def check_disk_space(path, threshold_gb=10):
"""Check if available disk space is below threshold"""
try:
total, used, free = shutil.disk_usage(path)
free_gb = free / (1024 * 1024 * 1024) # Convert bytes to GB
if free_gb < threshold_gb:
message = f"⚠️ WARNING: Low disk space! Only {free_gb:.2f} GB remaining on server (threshold: {threshold_gb} GB)"
logger.warning(message)
send_telegram_notification(message)
return False
return True
except Exception as e:
error_msg = f"Error checking disk space: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
return True # Continue despite error
def notify_downloaded_files():
"""Send notification with the list of downloaded files"""
global downloaded_files_count, downloaded_files_list
if downloaded_files_count > 0:
files_str = "\n".join(downloaded_files_list[-10:]) # Last 10 files
message = f"✅ Downloaded {downloaded_files_count} videos.\nLast {min(10, len(downloaded_files_list))} files:\n{files_str}"
send_telegram_notification(message)
downloaded_files_list = [] # Reset the list
downloaded_files_count = 0 # Reset the counter
# Parse command line arguments
def parse_args():
parser = argparse.ArgumentParser(description='Download large videos from a Telegram channel')
parser.add_argument('--channel', type=str, help='Telegram channel username or ID', default=channel_username)
parser.add_argument('--output', type=str, help='Output folder for downloaded videos', default=DEFAULT_OUTPUT_FOLDER)
parser.add_argument('--min-size', type=int, help='Minimum video size in MB', default=DEFAULT_MIN_VIDEO_SIZE // (1024 * 1024))
parser.add_argument('--concurrent', type=int, help='Maximum concurrent downloads', default=DEFAULT_MAX_CONCURRENT)
parser.add_argument('--retries', type=int, help='Maximum number of retries', default=DEFAULT_MAX_RETRIES)
parser.add_argument('--retry-delay', type=int, help='Initial retry delay in seconds', default=DEFAULT_RETRY_DELAY)
parser.add_argument('--max-retry-delay', type=int, help='Maximum retry delay in seconds', default=DEFAULT_MAX_RETRY_DELAY)
parser.add_argument('--session', type=str, help='Session name', default='anon')
parser.add_argument('--disk-threshold', type=int, help='Disk space threshold in GB', default=DEFAULT_DISK_SPACE_THRESHOLD)
return parser.parse_args()
# Setup logging
def setup_logging():
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_filename = os.path.join(log_dir, f'telegram_downloader_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
async def load_downloaded_videos(downloaded_videos_file):
try:
if os.path.exists(downloaded_videos_file):
async with aiofiles.open(downloaded_videos_file, 'r') as f:
content = await f.read()
return set(line.strip() for line in content.splitlines())
return set()
except Exception as e:
error_msg = f"Error loading downloaded videos: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
return set()
async def save_downloaded_video(video_id, downloaded_videos_file):
try:
async with aiofiles.open(downloaded_videos_file, 'a') as f:
await f.write(f"{video_id}\n")
except Exception as e:
error_msg = f"Error saving downloaded video ID {video_id}: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
async def refetch_message(client, message_id, channel_entity):
try:
return await client.get_messages(channel_entity, ids=message_id)
except Exception as e:
error_msg = f"Error refetching message {message_id}: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
return None
async def verify_file_integrity(file_path, expected_size):
"""Verify file integrity by checking file size"""
try:
file_stat = os.stat(file_path)
return file_stat.st_size == expected_size
except Exception as e:
error_msg = f"Error verifying file integrity: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
return False
async def download_video(client, message, output_folder, downloaded_videos_file, max_retries, initial_retry_delay, max_retry_delay, disk_threshold):
global downloaded_files_count, downloaded_files_list, download_errors
# Check disk space before downloading
if not check_disk_space(output_folder, disk_threshold):
return
original_video_name = f"{message.date.strftime('%Y-%m-%d_%H-%M-%S')}_{message.file.name}"
temp_video_name = original_video_name + ".temp"
temp_video_path = os.path.join(output_folder, temp_video_name)
final_video_path = os.path.join(output_folder, original_video_name)
logger.info(f"Downloading {message.id}: {original_video_name}...")
# Check if temp file exists and get its size
initial_offset = 0
if os.path.exists(temp_video_path):
initial_offset = os.path.getsize(temp_video_path)
logger.info(f"Resuming download from {initial_offset} bytes")
# Create progress bar
pbar = tqdm(
total=message.media.document.size,
initial=initial_offset,
unit='B',
unit_scale=True,
desc=f"{original_video_name[:30]}..." if len(original_video_name) > 30 else original_video_name
)
retry_count = 0
wait_time = initial_retry_delay
while retry_count < max_retries:
try:
# Check disk space before each attempt
if not check_disk_space(output_folder, disk_threshold):
return
offset = initial_offset
async with aiofiles.open(temp_video_path, 'ab') as file:
async for chunk in client.iter_download(message.media.document, offset=offset):
await file.write(chunk)
offset += len(chunk)
pbar.update(len(chunk))
# Verify file integrity
if await verify_file_integrity(temp_video_path, message.media.document.size):
os.rename(temp_video_path, final_video_path)
await save_downloaded_video(message.id, downloaded_videos_file)
logger.info(f"Successfully downloaded and verified {message.id}: {original_video_name}")
# Update download tracking
downloaded_files_count += 1
downloaded_files_list.append(original_video_name)
# Check if we should send notification (every 10 downloads)
if downloaded_files_count >= 10:
notify_downloaded_files()
break
else:
logger.warning(f"File size mismatch for {message.id}: {original_video_name}. Retrying...")
raise errors.TimedOutError()
except errors.FileReferenceExpiredError:
logger.info(f"File reference expired for {message.id}: {original_video_name}. Refetching message...")
message = await refetch_message(client, message.id, message.chat_id)
if message:
continue
else:
error_msg = f"Failed to refetch message for {message.id}: {original_video_name}. Skipping..."
logger.error(error_msg)
download_errors.append(error_msg)
send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg}")
break
except errors.TimedOutError:
retry_count += 1
error_msg = f"Timeout on {message.id}: {original_video_name}. Retrying in {wait_time} seconds..."
logger.error(error_msg)
await asyncio.sleep(wait_time)
wait_time = min(wait_time * 2, max_retry_delay)
if retry_count == max_retries:
download_errors.append(error_msg)
send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg} (After {max_retries} retries)")
except Exception as e:
retry_count += 1
error_msg = f"Error downloading {message.id}: {original_video_name}. Error: {str(e)}. Retrying in {wait_time} seconds..."
logger.error(error_msg)
await asyncio.sleep(wait_time)
wait_time = min(wait_time * 2, max_retry_delay)
if retry_count == max_retries:
download_errors.append(error_msg)
send_telegram_notification(f"⚠️ DOWNLOAD ERROR: {error_msg} (After {max_retries} retries)")
if retry_count == max_retries:
logger.error(f"Failed to download {message.id}: {original_video_name} after {max_retries} retries.")
pbar.close()
async def main():
args = parse_args()
if not api_id or not api_hash:
error_msg = "API credentials not found. Please set TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables."
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
print("\nTo set up your environment:")
print("1. Create a .env file in the same directory as this script")
print("2. Add the following lines to the .env file:")
print(" TELEGRAM_API_ID=your_api_id")
print(" TELEGRAM_API_HASH=your_api_hash")
print(" TELEGRAM_CHANNEL=your_channel_id")
print("3. Replace the values with your actual credentials")
print("\nYou can get API credentials from https://my.telegram.org/apps")
return
# Convert min_size from MB to bytes
min_video_size = args.min_size * 1024 * 1024
# Setup output folder
if not os.path.exists(args.output):
os.makedirs(args.output)
# Setup downloaded videos tracking
downloaded_videos_file = os.path.join(args.output, 'downloaded_videos.txt')
downloaded_videos = await load_downloaded_videos(downloaded_videos_file)
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(args.concurrent)
# Create and connect client
client = TelegramClient(args.session, api_id, api_hash)
try:
await client.start()
# Validate channel
try:
channel_id = args.channel
if not channel_id.startswith('-100') and not channel_id.isdigit():
# Try to resolve username
channel_entity = await client.get_entity(channel_id)
channel_id = channel_entity.id
else:
channel_id = int(channel_id)
channel_entity = await client.get_entity(channel_id)
except Exception as e:
error_msg = f"Error resolving channel {args.channel}: {str(e)}"
logger.error(error_msg)
send_telegram_notification(f"⚠️ ERROR: {error_msg}")
return
# Initial disk space check
check_disk_space(args.output, args.disk_threshold)
logger.info(f"Downloading videos larger than {args.min_size}MB from channel {channel_id}...")
send_telegram_notification(f"🎬 Started downloading videos larger than {args.min_size}MB from channel {channel_id}")
# Collect all video messages
messages = []
async for message in client.iter_messages(channel_entity, filter=InputMessagesFilterVideo()):
if str(message.id) not in downloaded_videos and hasattr(message.media, 'document') and message.media.document.size >= min_video_size:
messages.append(message)
total_videos = len(messages)
logger.info(f"Found {total_videos} videos to download")
send_telegram_notification(f"🔍 Found {total_videos} videos to download")
# Download videos with concurrency control
async def download_with_semaphore(message):
async with semaphore:
await download_video(
client,
message,
args.output,
downloaded_videos_file,
args.retries,
args.retry_delay,
args.max_retry_delay,
args.disk_threshold
)
tasks = [download_with_semaphore(message) for message in messages]
await asyncio.gather(*tasks)
# Send final notification if there are any remaining downloads
if downloaded_files_count > 0:
notify_downloaded_files()
# Notify about any errors
if download_errors:
error_count = len(download_errors)
error_msg = f"⚠️ Completed with {error_count} errors. Check logs for details."
logger.warning(error_msg)
send_telegram_notification(error_msg)
completion_msg = f"✅ Finished downloading videos from channel {channel_id}"
logger.info(completion_msg)
send_telegram_notification(completion_msg)
except KeyboardInterrupt:
interrupt_msg = "⚠️ Interrupted by user. Exiting..."
logger.warning(interrupt_msg)
send_telegram_notification(interrupt_msg)
except Exception as e:
crash_msg = f"💥 SCRIPT CRASHED: {str(e)}\n{traceback.format_exc()}"
logger.critical(crash_msg)
send_telegram_notification(crash_msg)
finally:
await client.disconnect()
if __name__ == "__main__":
# Setup logger
logger = setup_logging()
# Send startup notification
send_telegram_notification("🚀 Telegram Video Downloader script started")
# Run the async main function
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.warning("Interrupted by user. Exiting...")
send_telegram_notification("⚠️ Script stopped by keyboard interrupt")
except Exception as e:
crash_msg = f"💥 FATAL ERROR: {str(e)}\n{traceback.format_exc()}"
logger.critical(crash_msg)
send_telegram_notification(crash_msg)
There are many things that the script does for me, some of them are :
-
Asynchronous Downloads: The script uses Python's
asyncio
library to perform asynchronous downloads. This means it can handle multiple download tasks simultaneously without blocking the main execution thread, leading to more efficient use of resources and faster overall download times. -
Semaphore for Concurrent Download Limit: A semaphore is utilized to limit the number of concurrent downloads. This prevents the script from overloading the network or system resources by restricting the number of files being downloaded at the same time.
-
Telethon for Telegram API Interaction: The script leverages the Telethon library, a powerful Python toolkit for interacting with Telegram's API. It allows for seamless access and control over Telegram channels, messages, and media files.
-
Resumable Downloads: One of the notable features of the script is its ability to resume incomplete downloads. If a download is interrupted due to a network issue or a program restart, the script can resume from where it left off, avoiding the need to re-download the entire file.
-
Exponential Backoff on Retries: The script implements an exponential backoff strategy for retries. If a download fails due to a timeout, it waits for a certain period before retrying, and this wait time increases exponentially with each failed attempt. This strategy helps in efficiently managing network issues.
-
Error Handling: The script includes robust error handling mechanisms, particularly for network-related issues like timeouts. It logs errors and handles them gracefully, ensuring the script's stability even in less-than-ideal network conditions.
-
Progress Tracking and Display: Progress of each download is tracked and displayed in the console. This feature provides real-time feedback on the status of each download, including which files are currently being downloaded and their progress percentage.
-
Filtering and Downloading Large Files: The script is specifically designed to filter and download large video files from Telegram channels. It allows setting a minimum file size threshold to target larger media files.
-
Organized File Management: Downloaded files are systematically renamed and organized. The script initially saves files with a
.temp
extension during the download process and renames them to their original names once the download is complete. -
Customizable Parameters: Key parameters such as the number of concurrent downloads, minimum file size for downloads, and retry behavior are easily customizable, making the script adaptable to different user requirements and network conditions.