Python File I/O: Đọc và Ghi Files

Làm việc với files trong Python - đọc, ghi, xử lý text và binary files, CSV, JSON.

Mở và Đọc Files

# Đọc toàn bộ file
with open("data.txt", "r") as file:
    content = file.read()
    print(content)

# Đọc từng dòng
with open("data.txt", "r") as file:
    for line in file:
        print(line.strip())

# Đọc tất cả dòng vào list
with open("data.txt", "r") as file:
    lines = file.readlines()

File Modes

ModeMô tả
rRead (default)
wWrite (overwrite)
aAppend
xCreate (fail if exists)
bBinary mode
+Read and write

Ghi Files

# Ghi mới (overwrite)
with open("output.txt", "w") as file:
    file.write("Line 1\n")
    file.write("Line 2\n")

# Ghi nhiều dòng
lines = ["Line 1", "Line 2", "Line 3"]
with open("output.txt", "w") as file:
    file.writelines(line + "\n" for line in lines)

# Append vào file có sẵn
with open("log.txt", "a") as file:
    file.write("New log entry\n")

Binary Files

# Đọc binary file
with open("image.png", "rb") as file:
    data = file.read()
    print(f"File size: {len(data)} bytes")

# Ghi binary file
with open("copy.png", "wb") as file:
    file.write(data)

# Đọc từng chunk (cho files lớn)
def read_chunks(filename, chunk_size=8192):
    with open(filename, "rb") as file:
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Usage
for chunk in read_chunks("large_file.bin"):
    # Process chunk
    pass

Làm việc với JSON

import json

# Đọc JSON file
with open("config.json", "r") as file:
    config = json.load(file)

# Ghi JSON file
data = {
    "target": "192.168.1.1",
    "ports": [22, 80, 443],
    "timeout": 1.0
}

with open("scan_config.json", "w") as file:
    json.dump(data, file, indent=2)

# Parse JSON string
json_string = '{"name": "test", "value": 123}'
data = json.loads(json_string)

# Convert to JSON string
json_output = json.dumps(data, indent=2)

Làm việc với CSV

import csv

# Đọc CSV
with open("hosts.csv", "r") as file:
    reader = csv.reader(file)
    for row in reader:
        print(row)  # ['host', 'port', 'status']

# Đọc CSV với headers (DictReader)
with open("hosts.csv", "r") as file:
    reader = csv.DictReader(file)
    for row in reader:
        print(f"Host: {row['host']}, Port: {row['port']}")

# Ghi CSV
with open("results.csv", "w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["host", "port", "status"])
    writer.writerow(["192.168.1.1", 80, "open"])
    writer.writerow(["192.168.1.1", 443, "open"])

# Ghi CSV với DictWriter
with open("results.csv", "w", newline="") as file:
    fieldnames = ["host", "port", "status"]
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerow({"host": "192.168.1.1", "port": 80, "status": "open"})

Path Operations với pathlib

from pathlib import Path

# Tạo Path object
path = Path("data/logs/app.log")

# Properties
print(path.name)        # app.log
print(path.stem)        # app
print(path.suffix)      # .log
print(path.parent)      # data/logs
print(path.exists())    # True/False
print(path.is_file())   # True/False
print(path.is_dir())    # True/False

# Đọc/Ghi với Path
content = path.read_text()
path.write_text("New content")

# Tạo directory
Path("output/reports").mkdir(parents=True, exist_ok=True)

# List files
for file in Path("logs").glob("*.log"):
    print(file)

# Recursive glob
for file in Path(".").rglob("*.py"):
    print(file)

Exception Handling

# Xử lý file errors
try:
    with open("data.txt", "r") as file:
        content = file.read()
except FileNotFoundError:
    print("File không tồn tại!")
except PermissionError:
    print("Không có quyền đọc file!")
except IOError as e:
    print(f"IO Error: {e}")

Ứng dụng: Log Parser

import re
from pathlib import Path
from datetime import datetime
from typing import List, Dict

def parse_log_line(line: str) -> Dict:
    """Parse a single log line."""
    # Format: [2025-01-23 10:30:45] ERROR: Connection failed
    pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+): (.+)'
    match = re.match(pattern, line)
    
    if match:
        return {
            "timestamp": match.group(1),
            "level": match.group(2),
            "message": match.group(3)
        }
    return None

def parse_log_file(filepath: str) -> List[Dict]:
    """Parse entire log file."""
    entries = []
    
    with open(filepath, "r") as file:
        for line in file:
            entry = parse_log_line(line.strip())
            if entry:
                entries.append(entry)
    
    return entries

def filter_errors(entries: List[Dict]) -> List[Dict]:
    """Filter only ERROR entries."""
    return [e for e in entries if e["level"] == "ERROR"]

# Usage
if __name__ == "__main__":
    logs = parse_log_file("app.log")
    errors = filter_errors(logs)
    
    print(f"Total entries: {len(logs)}")
    print(f"Errors: {len(errors)}")
    
    for error in errors[:5]:
        print(f"[{error['timestamp']}] {error['message']}")

Ứng dụng: Scan Results Exporter

import json
import csv
from datetime import datetime

class ScanResultsExporter:
    def __init__(self, results: list):
        self.results = results
        self.timestamp = datetime.now().isoformat()
    
    def to_json(self, filepath: str):
        """Export to JSON file."""
        data = {
            "timestamp": self.timestamp,
            "results": self.results
        }
        with open(filepath, "w") as f:
            json.dump(data, f, indent=2)
    
    def to_csv(self, filepath: str):
        """Export to CSV file."""
        with open(filepath, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=["host", "port", "status"])
            writer.writeheader()
            writer.writerows(self.results)
    
    def to_txt(self, filepath: str):
        """Export to plain text."""
        with open(filepath, "w") as f:
            f.write(f"Scan Results - {self.timestamp}\n")
            f.write("=" * 40 + "\n\n")
            for r in self.results:
                f.write(f"{r['host']}:{r['port']} - {r['status']}\n")

# Usage
results = [
    {"host": "192.168.1.1", "port": 22, "status": "open"},
    {"host": "192.168.1.1", "port": 80, "status": "open"},
]

exporter = ScanResultsExporter(results)
exporter.to_json("results.json")
exporter.to_csv("results.csv")

Bước tiếp theo

Tiếp theo trong course:

  • Network Programming: Sockets và HTTP
  • Web Scraping: BeautifulSoup, requests

💡 Security tip: Luôn validate file paths để tránh path traversal attacks!