Python Automation: Scripts tự động hóa

Viết automation scripts với Python - system tasks, file operations, scheduling, và CLI tools.

Automation với Python

Python là ngôn ngữ lý tưởng cho automation vì đơn giản, thư viện phong phú, và cross-platform.

OS và System Operations

import os
import subprocess
import shutil

# Environment variables
home = os.environ.get("HOME")
path = os.environ.get("PATH")

# Set environment variable
os.environ["MY_VAR"] = "my_value"

# Run system commands
result = subprocess.run(["ls", "-la"], capture_output=True, text=True)
print(result.stdout)

# Run với shell
result = subprocess.run("echo $HOME", shell=True, capture_output=True, text=True)

# Check command output
output = subprocess.check_output(["whoami"]).decode().strip()
print(f"Current user: {output}")

# File operations
shutil.copy("source.txt", "dest.txt")
shutil.move("old.txt", "new.txt")
shutil.rmtree("directory")  # Remove directory

File System Monitoring

import os
import time
from pathlib import Path

def watch_directory(path: str, interval: float = 1.0):
    """Watch directory for changes."""
    
    known_files = set(Path(path).rglob("*"))
    
    while True:
        current_files = set(Path(path).rglob("*"))
        
        # New files
        new = current_files - known_files
        for f in new:
            print(f"[+] New: {f}")
        
        # Deleted files
        deleted = known_files - current_files
        for f in deleted:
            print(f"[-] Deleted: {f}")
        
        known_files = current_files
        time.sleep(interval)

# Usage
# watch_directory("/path/to/watch")

Batch File Operations

from pathlib import Path
import shutil

def organize_downloads(download_folder: str):
    """Organize files by extension."""
    
    categories = {
        "Images": [".jpg", ".jpeg", ".png", ".gif", ".svg"],
        "Documents": [".pdf", ".doc", ".docx", ".txt", ".xlsx"],
        "Videos": [".mp4", ".mkv", ".avi", ".mov"],
        "Archives": [".zip", ".rar", ".7z", ".tar", ".gz"],
        "Code": [".py", ".js", ".html", ".css", ".json"],
    }
    
    download_path = Path(download_folder)
    
    for file in download_path.iterdir():
        if file.is_file():
            ext = file.suffix.lower()
            
            for category, extensions in categories.items():
                if ext in extensions:
                    dest_folder = download_path / category
                    dest_folder.mkdir(exist_ok=True)
                    shutil.move(str(file), str(dest_folder / file.name))
                    print(f"Moved {file.name} to {category}")
                    break

def bulk_rename(folder: str, pattern: str, replacement: str):
    """Bulk rename files."""
    
    for file in Path(folder).iterdir():
        if pattern in file.name:
            new_name = file.name.replace(pattern, replacement)
            file.rename(file.parent / new_name)
            print(f"Renamed: {file.name} -> {new_name}")

CLI Tools với argparse

import argparse

def main():
    parser = argparse.ArgumentParser(
        description="Network Security Scanner"
    )
    
    parser.add_argument(
        "target",
        help="Target host to scan"
    )
    
    parser.add_argument(
        "-p", "--ports",
        default="1-1000",
        help="Port range (default: 1-1000)"
    )
    
    parser.add_argument(
        "-t", "--threads",
        type=int,
        default=100,
        help="Number of threads"
    )
    
    parser.add_argument(
        "-v", "--verbose",
        action="store_true",
        help="Verbose output"
    )
    
    parser.add_argument(
        "-o", "--output",
        help="Output file"
    )
    
    args = parser.parse_args()
    
    print(f"Target: {args.target}")
    print(f"Ports: {args.ports}")
    print(f"Threads: {args.threads}")
    print(f"Verbose: {args.verbose}")
    
    # Start scanning...

if __name__ == "__main__":
    main()

Scheduling Tasks

import schedule
import time

def job():
    print("Running scheduled job...")

# Schedule jobs
schedule.every(10).seconds.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.at("09:00").do(job)

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

Log Analyzer Script

#!/usr/bin/env python3
"""
Log Analyzer - Analyze and summarize log files
"""

import re
import argparse
from pathlib import Path
from collections import Counter
from datetime import datetime

def parse_apache_log(line: str) -> dict | None:
    """Parse Apache access log line."""
    
    pattern = r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+)'
    match = re.match(pattern, line)
    
    if match:
        return {
            "ip": match.group(1),
            "timestamp": match.group(2),
            "method": match.group(3),
            "path": match.group(4),
            "status": int(match.group(5)),
            "size": int(match.group(6))
        }
    return None

def analyze_logs(logfile: str) -> dict:
    """Analyze log file and return statistics."""
    
    stats = {
        "total_requests": 0,
        "status_codes": Counter(),
        "top_ips": Counter(),
        "top_paths": Counter(),
        "methods": Counter(),
        "errors": []
    }
    
    with open(logfile, "r") as f:
        for line in f:
            entry = parse_apache_log(line.strip())
            if entry:
                stats["total_requests"] += 1
                stats["status_codes"][entry["status"]] += 1
                stats["top_ips"][entry["ip"]] += 1
                stats["top_paths"][entry["path"]] += 1
                stats["methods"][entry["method"]] += 1
                
                if entry["status"] >= 400:
                    stats["errors"].append(entry)
    
    return stats

def print_report(stats: dict):
    """Print analysis report."""
    
    print("=" * 50)
    print("LOG ANALYSIS REPORT")
    print("=" * 50)
    print(f"\nTotal Requests: {stats['total_requests']}")
    
    print("\n--- Status Codes ---")
    for code, count in stats["status_codes"].most_common():
        print(f"  {code}: {count}")
    
    print("\n--- Top 10 IPs ---")
    for ip, count in stats["top_ips"].most_common(10):
        print(f"  {ip}: {count}")
    
    print("\n--- Top 10 Paths ---")
    for path, count in stats["top_paths"].most_common(10):
        print(f"  {path}: {count}")
    
    print(f"\nErrors (4xx/5xx): {len(stats['errors'])}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Analyze log files")
    parser.add_argument("logfile", help="Path to log file")
    args = parser.parse_args()
    
    stats = analyze_logs(args.logfile)
    print_report(stats)

Backup Script

#!/usr/bin/env python3
"""
Automated backup script
"""

import os
import shutil
import tarfile
from datetime import datetime
from pathlib import Path

def create_backup(source_dirs: list, backup_dir: str, compress: bool = True):
    """Create backup of specified directories."""
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = Path(backup_dir)
    backup_path.mkdir(parents=True, exist_ok=True)
    
    if compress:
        # Create compressed archive
        archive_name = f"backup_{timestamp}.tar.gz"
        archive_path = backup_path / archive_name
        
        with tarfile.open(archive_path, "w:gz") as tar:
            for source in source_dirs:
                tar.add(source, arcname=Path(source).name)
        
        print(f"Created: {archive_path}")
        return str(archive_path)
    
    else:
        # Copy directories
        dest = backup_path / f"backup_{timestamp}"
        dest.mkdir()
        
        for source in source_dirs:
            shutil.copytree(source, dest / Path(source).name)
        
        print(f"Created: {dest}")
        return str(dest)

def cleanup_old_backups(backup_dir: str, keep_count: int = 5):
    """Remove old backups, keeping only the most recent."""
    
    backup_path = Path(backup_dir)
    backups = sorted(backup_path.glob("backup_*"), reverse=True)
    
    for old_backup in backups[keep_count:]:
        if old_backup.is_file():
            old_backup.unlink()
        else:
            shutil.rmtree(old_backup)
        print(f"Removed old backup: {old_backup}")

if __name__ == "__main__":
    # Backup these directories
    sources = [
        "/home/user/documents",
        "/home/user/projects",
    ]
    
    backup_location = "/home/user/backups"
    
    create_backup(sources, backup_location)
    cleanup_old_backups(backup_location, keep_count=5)

Bước tiếp theo

Tiếp theo:

  • Cryptography: Hashing và encryption
  • Password Tools: Password generators và crackers

💡 Pro tip: Test automation scripts trong môi trường sandbox trước khi chạy thật!