Automation với Python
Python là ngôn ngữ lý tưởng cho automation vì đơn giản, thư viện phong phú, và cross-platform.
OS và System Operations
import os
import subprocess
import shutil
# Environment variables
home = os.environ.get("HOME")
path = os.environ.get("PATH")
# Set environment variable
os.environ["MY_VAR"] = "my_value"
# Run system commands
result = subprocess.run(["ls", "-la"], capture_output=True, text=True)
print(result.stdout)
# Run với shell
result = subprocess.run("echo $HOME", shell=True, capture_output=True, text=True)
# Check command output
output = subprocess.check_output(["whoami"]).decode().strip()
print(f"Current user: {output}")
# File operations
shutil.copy("source.txt", "dest.txt")
shutil.move("old.txt", "new.txt")
shutil.rmtree("directory") # Remove directory
File System Monitoring
import os
import time
from pathlib import Path
def watch_directory(path: str, interval: float = 1.0):
"""Watch directory for changes."""
known_files = set(Path(path).rglob("*"))
while True:
current_files = set(Path(path).rglob("*"))
# New files
new = current_files - known_files
for f in new:
print(f"[+] New: {f}")
# Deleted files
deleted = known_files - current_files
for f in deleted:
print(f"[-] Deleted: {f}")
known_files = current_files
time.sleep(interval)
# Usage
# watch_directory("/path/to/watch")
Batch File Operations
from pathlib import Path
import shutil
def organize_downloads(download_folder: str):
"""Organize files by extension."""
categories = {
"Images": [".jpg", ".jpeg", ".png", ".gif", ".svg"],
"Documents": [".pdf", ".doc", ".docx", ".txt", ".xlsx"],
"Videos": [".mp4", ".mkv", ".avi", ".mov"],
"Archives": [".zip", ".rar", ".7z", ".tar", ".gz"],
"Code": [".py", ".js", ".html", ".css", ".json"],
}
download_path = Path(download_folder)
for file in download_path.iterdir():
if file.is_file():
ext = file.suffix.lower()
for category, extensions in categories.items():
if ext in extensions:
dest_folder = download_path / category
dest_folder.mkdir(exist_ok=True)
shutil.move(str(file), str(dest_folder / file.name))
print(f"Moved {file.name} to {category}")
break
def bulk_rename(folder: str, pattern: str, replacement: str):
"""Bulk rename files."""
for file in Path(folder).iterdir():
if pattern in file.name:
new_name = file.name.replace(pattern, replacement)
file.rename(file.parent / new_name)
print(f"Renamed: {file.name} -> {new_name}")
CLI Tools với argparse
import argparse
def main():
parser = argparse.ArgumentParser(
description="Network Security Scanner"
)
parser.add_argument(
"target",
help="Target host to scan"
)
parser.add_argument(
"-p", "--ports",
default="1-1000",
help="Port range (default: 1-1000)"
)
parser.add_argument(
"-t", "--threads",
type=int,
default=100,
help="Number of threads"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Verbose output"
)
parser.add_argument(
"-o", "--output",
help="Output file"
)
args = parser.parse_args()
print(f"Target: {args.target}")
print(f"Ports: {args.ports}")
print(f"Threads: {args.threads}")
print(f"Verbose: {args.verbose}")
# Start scanning...
if __name__ == "__main__":
main()
Scheduling Tasks
import schedule
import time
def job():
print("Running scheduled job...")
# Schedule jobs
schedule.every(10).seconds.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.at("09:00").do(job)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(1)
Log Analyzer Script
#!/usr/bin/env python3
"""
Log Analyzer - Analyze and summarize log files
"""
import re
import argparse
from pathlib import Path
from collections import Counter
from datetime import datetime
def parse_apache_log(line: str) -> dict | None:
"""Parse Apache access log line."""
pattern = r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+)'
match = re.match(pattern, line)
if match:
return {
"ip": match.group(1),
"timestamp": match.group(2),
"method": match.group(3),
"path": match.group(4),
"status": int(match.group(5)),
"size": int(match.group(6))
}
return None
def analyze_logs(logfile: str) -> dict:
"""Analyze log file and return statistics."""
stats = {
"total_requests": 0,
"status_codes": Counter(),
"top_ips": Counter(),
"top_paths": Counter(),
"methods": Counter(),
"errors": []
}
with open(logfile, "r") as f:
for line in f:
entry = parse_apache_log(line.strip())
if entry:
stats["total_requests"] += 1
stats["status_codes"][entry["status"]] += 1
stats["top_ips"][entry["ip"]] += 1
stats["top_paths"][entry["path"]] += 1
stats["methods"][entry["method"]] += 1
if entry["status"] >= 400:
stats["errors"].append(entry)
return stats
def print_report(stats: dict):
"""Print analysis report."""
print("=" * 50)
print("LOG ANALYSIS REPORT")
print("=" * 50)
print(f"\nTotal Requests: {stats['total_requests']}")
print("\n--- Status Codes ---")
for code, count in stats["status_codes"].most_common():
print(f" {code}: {count}")
print("\n--- Top 10 IPs ---")
for ip, count in stats["top_ips"].most_common(10):
print(f" {ip}: {count}")
print("\n--- Top 10 Paths ---")
for path, count in stats["top_paths"].most_common(10):
print(f" {path}: {count}")
print(f"\nErrors (4xx/5xx): {len(stats['errors'])}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Analyze log files")
parser.add_argument("logfile", help="Path to log file")
args = parser.parse_args()
stats = analyze_logs(args.logfile)
print_report(stats)
Backup Script
#!/usr/bin/env python3
"""
Automated backup script
"""
import os
import shutil
import tarfile
from datetime import datetime
from pathlib import Path
def create_backup(source_dirs: list, backup_dir: str, compress: bool = True):
"""Create backup of specified directories."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(backup_dir)
backup_path.mkdir(parents=True, exist_ok=True)
if compress:
# Create compressed archive
archive_name = f"backup_{timestamp}.tar.gz"
archive_path = backup_path / archive_name
with tarfile.open(archive_path, "w:gz") as tar:
for source in source_dirs:
tar.add(source, arcname=Path(source).name)
print(f"Created: {archive_path}")
return str(archive_path)
else:
# Copy directories
dest = backup_path / f"backup_{timestamp}"
dest.mkdir()
for source in source_dirs:
shutil.copytree(source, dest / Path(source).name)
print(f"Created: {dest}")
return str(dest)
def cleanup_old_backups(backup_dir: str, keep_count: int = 5):
"""Remove old backups, keeping only the most recent."""
backup_path = Path(backup_dir)
backups = sorted(backup_path.glob("backup_*"), reverse=True)
for old_backup in backups[keep_count:]:
if old_backup.is_file():
old_backup.unlink()
else:
shutil.rmtree(old_backup)
print(f"Removed old backup: {old_backup}")
if __name__ == "__main__":
# Backup these directories
sources = [
"/home/user/documents",
"/home/user/projects",
]
backup_location = "/home/user/backups"
create_backup(sources, backup_location)
cleanup_old_backups(backup_location, keep_count=5)
Bước tiếp theo
Tiếp theo:
- Cryptography: Hashing và encryption
- Password Tools: Password generators và crackers
💡 Pro tip: Test automation scripts trong môi trường sandbox trước khi chạy thật!