Packet Sniffing với Python và Scapy

Học cách phân tích network traffic với Scapy - capture, analyze, và craft packets.

Packet Sniffing là gì?

Packet Sniffing là kỹ thuật capture và phân tích network traffic. Scapy là thư viện Python mạnh mẽ cho network manipulation.

Cài đặt Scapy

pip install scapy

⚠️ Note: Cần quyền root/admin để capture packets!

Scapy Basics

from scapy.all import *

# Xem các layers có sẵn
ls()  # List tất cả protocols

# Xem chi tiết một protocol
ls(IP)
ls(TCP)

# Tạo packet đơn giản
packet = IP(dst="8.8.8.8")/ICMP()
print(packet.summary())

# Gửi packet
send(packet)  # Layer 3 (IP)
sendp(packet) # Layer 2 (Ethernet)

Packet Sniffing

from scapy.all import sniff, IP, TCP, UDP

def packet_callback(packet):
    """Process each captured packet."""
    
    if IP in packet:
        src_ip = packet[IP].src
        dst_ip = packet[IP].dst
        proto = packet[IP].proto
        
        print(f"{src_ip} -> {dst_ip} [Protocol: {proto}]")

# Capture packets
# count: số packets cần capture
# filter: BPF filter
# iface: interface

# Capture 10 packets
packets = sniff(count=10, prn=packet_callback)

# Capture với filter
packets = sniff(filter="tcp port 80", count=5)

# Capture từ specific interface
# packets = sniff(iface="eth0", count=10)

Filtering Packets

from scapy.all import sniff, IP, TCP, UDP, Raw

def analyze_packet(packet):
    """Detailed packet analysis."""
    
    # Check for TCP
    if TCP in packet:
        src_port = packet[TCP].sport
        dst_port = packet[TCP].dport
        flags = packet[TCP].flags
        
        print(f"TCP: {packet[IP].src}:{src_port} -> {packet[IP].dst}:{dst_port} [{flags}]")
        
        # Check for HTTP
        if dst_port == 80 or src_port == 80:
            if Raw in packet:
                payload = packet[Raw].load.decode(errors='ignore')
                if "HTTP" in payload or "GET" in payload or "POST" in payload:
                    print(f"  HTTP Request/Response detected")
                    print(f"  {payload[:100]}...")
    
    # Check for UDP
    if UDP in packet:
        src_port = packet[UDP].sport
        dst_port = packet[UDP].dport
        print(f"UDP: {packet[IP].src}:{src_port} -> {packet[IP].dst}:{dst_port}")

# Run sniffer
sniff(filter="tcp or udp", prn=analyze_packet, count=20)

HTTP Sniffer

from scapy.all import sniff, TCP, Raw

def http_sniffer(packet):
    """Capture HTTP requests and responses."""
    
    if packet.haslayer(Raw):
        try:
            payload = packet[Raw].load.decode('utf-8', errors='ignore')
            
            # HTTP Request
            if any(method in payload for method in ['GET ', 'POST ', 'PUT ', 'DELETE ']):
                print("\n" + "="*60)
                print("HTTP REQUEST")
                print("="*60)
                
                # Extract first line (method, path)
                first_line = payload.split('\r\n')[0]
                print(f"Request: {first_line}")
                
                # Extract Host header
                for line in payload.split('\r\n'):
                    if line.startswith('Host:'):
                        print(f"Host: {line.split(': ')[1]}")
                        break
            
            # HTTP Response
            if 'HTTP/' in payload and payload.startswith('HTTP'):
                print("\n" + "="*60)
                print("HTTP RESPONSE")
                print("="*60)
                
                status_line = payload.split('\r\n')[0]
                print(f"Status: {status_line}")
                
        except Exception as e:
            pass

# Run HTTP sniffer
print("Starting HTTP sniffer... (Ctrl+C to stop)")
sniff(filter="tcp port 80", prn=http_sniffer)

DNS Sniffer

from scapy.all import sniff, DNS, DNSQR, DNSRR, IP, UDP

def dns_sniffer(packet):
    """Capture DNS queries and responses."""
    
    if DNS in packet:
        # DNS Query
        if packet[DNS].qr == 0:  # Query
            if DNSQR in packet:
                query_name = packet[DNSQR].qname.decode()
                query_type = packet[DNSQR].qtype
                
                type_names = {1: 'A', 2: 'NS', 5: 'CNAME', 28: 'AAAA', 15: 'MX'}
                type_str = type_names.get(query_type, str(query_type))
                
                print(f"[DNS Query] {query_name} (Type: {type_str})")
        
        # DNS Response
        else:
            if DNSRR in packet:
                for i in range(packet[DNS].ancount):
                    rr = packet[DNS].an[i]
                    name = rr.rrname.decode()
                    rdata = rr.rdata
                    
                    if isinstance(rdata, bytes):
                        rdata = rdata.decode(errors='ignore')
                    
                    print(f"[DNS Response] {name} -> {rdata}")

# Run DNS sniffer
print("Starting DNS sniffer...")
sniff(filter="udp port 53", prn=dns_sniffer, store=0)

Credential Sniffer

⚠️ Warning: Chỉ dùng trên network của bạn và có sự đồng ý!

from scapy.all import sniff, TCP, Raw, IP
import re

def credential_sniffer(packet):
    """Detect potential credentials in cleartext traffic."""
    
    if packet.haslayer(Raw) and packet.haslayer(TCP):
        payload = packet[Raw].load.decode(errors='ignore').lower()
        
        # Common credential patterns
        patterns = [
            r'user(name)?[=:]\s*\w+',
            r'pass(word)?[=:]\s*\w+',
            r'login[=:]\s*\w+',
            r'email[=:]\s*[\w@.]+',
        ]
        
        for pattern in patterns:
            matches = re.findall(pattern, payload)
            if matches:
                src = packet[IP].src
                dst = packet[IP].dst
                
                print(f"\n[!] Potential credentials detected!")
                print(f"    From: {src} -> {dst}")
                print(f"    Matches: {matches}")
                print(f"    Payload snippet: {payload[:200]}")

# Run credential sniffer
# sniff(filter="tcp", prn=credential_sniffer, store=0)

ARP Scanner

from scapy.all import ARP, Ether, srp
import ipaddress

def arp_scan(network: str) -> list:
    """
    Scan network using ARP to discover hosts.
    
    Args:
        network: Network range (e.g., "192.168.1.0/24")
    
    Returns:
        List of (IP, MAC) tuples
    """
    
    # Create ARP request
    arp = ARP(pdst=network)
    ether = Ether(dst="ff:ff:ff:ff:ff:ff")  # Broadcast
    packet = ether/arp
    
    # Send and receive
    result = srp(packet, timeout=3, verbose=False)[0]
    
    # Parse results
    hosts = []
    for sent, received in result:
        hosts.append({
            "ip": received.psrc,
            "mac": received.hwsrc
        })
    
    return hosts

# Run scan
if __name__ == "__main__":
    network = "192.168.1.0/24"
    print(f"Scanning {network}...")
    
    hosts = arp_scan(network)
    
    print(f"\nFound {len(hosts)} hosts:")
    print("-" * 40)
    for host in hosts:
        print(f"{host['ip']:15} {host['mac']}")

Packet Crafting

from scapy.all import *

# Create custom ICMP ping
def custom_ping(target: str, count: int = 4):
    """Send custom ICMP packets."""
    
    for i in range(count):
        packet = IP(dst=target)/ICMP(id=1234, seq=i)
        response = sr1(packet, timeout=2, verbose=False)
        
        if response:
            print(f"Reply from {target}: seq={i} ttl={response.ttl}")
        else:
            print(f"Request timeout for seq={i}")

# TCP SYN packet
def syn_packet(target: str, port: int):
    """Create TCP SYN packet."""
    
    packet = IP(dst=target)/TCP(dport=port, flags="S")
    response = sr1(packet, timeout=2, verbose=False)
    
    if response:
        if response[TCP].flags == "SA":  # SYN-ACK
            print(f"Port {port} is OPEN")
        elif response[TCP].flags == "RA":  # RST-ACK
            print(f"Port {port} is CLOSED")
    else:
        print(f"Port {port} is FILTERED")

Save và Load Captures

from scapy.all import sniff, wrpcap, rdpcap

# Capture và save
packets = sniff(count=100)
wrpcap("capture.pcap", packets)
print("Saved to capture.pcap")

# Load capture
loaded_packets = rdpcap("capture.pcap")
print(f"Loaded {len(loaded_packets)} packets")

# Analyze loaded packets
for packet in loaded_packets:
    print(packet.summary())

Complete Sniffer Tool

#!/usr/bin/env python3
"""
Network Packet Sniffer
"""

import argparse
from datetime import datetime
from scapy.all import sniff, IP, TCP, UDP, DNS, Raw, wrpcap

class PacketSniffer:
    def __init__(self, interface=None, output_file=None):
        self.interface = interface
        self.output_file = output_file
        self.packets = []
        self.packet_count = 0
    
    def process_packet(self, packet):
        self.packet_count += 1
        self.packets.append(packet)
        
        timestamp = datetime.now().strftime("%H:%M:%S")
        
        if IP in packet:
            src = packet[IP].src
            dst = packet[IP].dst
            proto = "TCP" if TCP in packet else "UDP" if UDP in packet else "OTHER"
            
            info = ""
            if TCP in packet:
                info = f":{packet[TCP].sport} -> :{packet[TCP].dport}"
            elif UDP in packet:
                info = f":{packet[UDP].sport} -> :{packet[UDP].dport}"
            
            print(f"[{timestamp}] #{self.packet_count} {proto} {src}{info.split(' ->')[0]} -> {dst}{info.split('-> ')[-1] if ' -> ' in info else ''}")
    
    def start(self, count=0, filter_expr=None):
        print(f"Starting sniffer...")
        print(f"Interface: {self.interface or 'default'}")
        print(f"Filter: {filter_expr or 'none'}")
        print("-" * 60)
        
        try:
            sniff(
                iface=self.interface,
                filter=filter_expr,
                prn=self.process_packet,
                count=count,
                store=True
            )
        except KeyboardInterrupt:
            print("\nStopping...")
        
        if self.output_file and self.packets:
            wrpcap(self.output_file, self.packets)
            print(f"Saved {len(self.packets)} packets to {self.output_file}")

def main():
    parser = argparse.ArgumentParser(description="Packet Sniffer")
    parser.add_argument("-i", "--interface", help="Network interface")
    parser.add_argument("-c", "--count", type=int, default=0, help="Packet count (0=infinite)")
    parser.add_argument("-f", "--filter", help="BPF filter expression")
    parser.add_argument("-o", "--output", help="Output PCAP file")
    
    args = parser.parse_args()
    
    sniffer = PacketSniffer(
        interface=args.interface,
        output_file=args.output
    )
    
    sniffer.start(count=args.count, filter_expr=args.filter)

if __name__ == "__main__":
    main()

Bước tiếp theo

Bạn đã hoàn thành khóa học Python for Cybersecurity! Tiếp tục:

  • Thực hành với CTF challenges
  • Xây dựng tools của riêng bạn
  • Học thêm về reverse engineering

⚠️ Legal Notice: Chỉ sniff traffic trên networks bạn sở hữu hoặc có permission!