Packet Sniffing là gì?
Packet Sniffing là kỹ thuật capture và phân tích network traffic. Scapy là thư viện Python mạnh mẽ cho network manipulation.
Cài đặt Scapy
pip install scapy
⚠️ Note: Cần quyền root/admin để capture packets!
Scapy Basics
from scapy.all import *
# Xem các layers có sẵn
ls() # List tất cả protocols
# Xem chi tiết một protocol
ls(IP)
ls(TCP)
# Tạo packet đơn giản
packet = IP(dst="8.8.8.8")/ICMP()
print(packet.summary())
# Gửi packet
send(packet) # Layer 3 (IP)
sendp(packet) # Layer 2 (Ethernet)
Packet Sniffing
from scapy.all import sniff, IP, TCP, UDP
def packet_callback(packet):
"""Process each captured packet."""
if IP in packet:
src_ip = packet[IP].src
dst_ip = packet[IP].dst
proto = packet[IP].proto
print(f"{src_ip} -> {dst_ip} [Protocol: {proto}]")
# Capture packets
# count: số packets cần capture
# filter: BPF filter
# iface: interface
# Capture 10 packets
packets = sniff(count=10, prn=packet_callback)
# Capture với filter
packets = sniff(filter="tcp port 80", count=5)
# Capture từ specific interface
# packets = sniff(iface="eth0", count=10)
Filtering Packets
from scapy.all import sniff, IP, TCP, UDP, Raw
def analyze_packet(packet):
"""Detailed packet analysis."""
# Check for TCP
if TCP in packet:
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
flags = packet[TCP].flags
print(f"TCP: {packet[IP].src}:{src_port} -> {packet[IP].dst}:{dst_port} [{flags}]")
# Check for HTTP
if dst_port == 80 or src_port == 80:
if Raw in packet:
payload = packet[Raw].load.decode(errors='ignore')
if "HTTP" in payload or "GET" in payload or "POST" in payload:
print(f" HTTP Request/Response detected")
print(f" {payload[:100]}...")
# Check for UDP
if UDP in packet:
src_port = packet[UDP].sport
dst_port = packet[UDP].dport
print(f"UDP: {packet[IP].src}:{src_port} -> {packet[IP].dst}:{dst_port}")
# Run sniffer
sniff(filter="tcp or udp", prn=analyze_packet, count=20)
HTTP Sniffer
from scapy.all import sniff, TCP, Raw
def http_sniffer(packet):
"""Capture HTTP requests and responses."""
if packet.haslayer(Raw):
try:
payload = packet[Raw].load.decode('utf-8', errors='ignore')
# HTTP Request
if any(method in payload for method in ['GET ', 'POST ', 'PUT ', 'DELETE ']):
print("\n" + "="*60)
print("HTTP REQUEST")
print("="*60)
# Extract first line (method, path)
first_line = payload.split('\r\n')[0]
print(f"Request: {first_line}")
# Extract Host header
for line in payload.split('\r\n'):
if line.startswith('Host:'):
print(f"Host: {line.split(': ')[1]}")
break
# HTTP Response
if 'HTTP/' in payload and payload.startswith('HTTP'):
print("\n" + "="*60)
print("HTTP RESPONSE")
print("="*60)
status_line = payload.split('\r\n')[0]
print(f"Status: {status_line}")
except Exception as e:
pass
# Run HTTP sniffer
print("Starting HTTP sniffer... (Ctrl+C to stop)")
sniff(filter="tcp port 80", prn=http_sniffer)
DNS Sniffer
from scapy.all import sniff, DNS, DNSQR, DNSRR, IP, UDP
def dns_sniffer(packet):
"""Capture DNS queries and responses."""
if DNS in packet:
# DNS Query
if packet[DNS].qr == 0: # Query
if DNSQR in packet:
query_name = packet[DNSQR].qname.decode()
query_type = packet[DNSQR].qtype
type_names = {1: 'A', 2: 'NS', 5: 'CNAME', 28: 'AAAA', 15: 'MX'}
type_str = type_names.get(query_type, str(query_type))
print(f"[DNS Query] {query_name} (Type: {type_str})")
# DNS Response
else:
if DNSRR in packet:
for i in range(packet[DNS].ancount):
rr = packet[DNS].an[i]
name = rr.rrname.decode()
rdata = rr.rdata
if isinstance(rdata, bytes):
rdata = rdata.decode(errors='ignore')
print(f"[DNS Response] {name} -> {rdata}")
# Run DNS sniffer
print("Starting DNS sniffer...")
sniff(filter="udp port 53", prn=dns_sniffer, store=0)
Credential Sniffer
⚠️ Warning: Chỉ dùng trên network của bạn và có sự đồng ý!
from scapy.all import sniff, TCP, Raw, IP
import re
def credential_sniffer(packet):
"""Detect potential credentials in cleartext traffic."""
if packet.haslayer(Raw) and packet.haslayer(TCP):
payload = packet[Raw].load.decode(errors='ignore').lower()
# Common credential patterns
patterns = [
r'user(name)?[=:]\s*\w+',
r'pass(word)?[=:]\s*\w+',
r'login[=:]\s*\w+',
r'email[=:]\s*[\w@.]+',
]
for pattern in patterns:
matches = re.findall(pattern, payload)
if matches:
src = packet[IP].src
dst = packet[IP].dst
print(f"\n[!] Potential credentials detected!")
print(f" From: {src} -> {dst}")
print(f" Matches: {matches}")
print(f" Payload snippet: {payload[:200]}")
# Run credential sniffer
# sniff(filter="tcp", prn=credential_sniffer, store=0)
ARP Scanner
from scapy.all import ARP, Ether, srp
import ipaddress
def arp_scan(network: str) -> list:
"""
Scan network using ARP to discover hosts.
Args:
network: Network range (e.g., "192.168.1.0/24")
Returns:
List of (IP, MAC) tuples
"""
# Create ARP request
arp = ARP(pdst=network)
ether = Ether(dst="ff:ff:ff:ff:ff:ff") # Broadcast
packet = ether/arp
# Send and receive
result = srp(packet, timeout=3, verbose=False)[0]
# Parse results
hosts = []
for sent, received in result:
hosts.append({
"ip": received.psrc,
"mac": received.hwsrc
})
return hosts
# Run scan
if __name__ == "__main__":
network = "192.168.1.0/24"
print(f"Scanning {network}...")
hosts = arp_scan(network)
print(f"\nFound {len(hosts)} hosts:")
print("-" * 40)
for host in hosts:
print(f"{host['ip']:15} {host['mac']}")
Packet Crafting
from scapy.all import *
# Create custom ICMP ping
def custom_ping(target: str, count: int = 4):
"""Send custom ICMP packets."""
for i in range(count):
packet = IP(dst=target)/ICMP(id=1234, seq=i)
response = sr1(packet, timeout=2, verbose=False)
if response:
print(f"Reply from {target}: seq={i} ttl={response.ttl}")
else:
print(f"Request timeout for seq={i}")
# TCP SYN packet
def syn_packet(target: str, port: int):
"""Create TCP SYN packet."""
packet = IP(dst=target)/TCP(dport=port, flags="S")
response = sr1(packet, timeout=2, verbose=False)
if response:
if response[TCP].flags == "SA": # SYN-ACK
print(f"Port {port} is OPEN")
elif response[TCP].flags == "RA": # RST-ACK
print(f"Port {port} is CLOSED")
else:
print(f"Port {port} is FILTERED")
Save và Load Captures
from scapy.all import sniff, wrpcap, rdpcap
# Capture và save
packets = sniff(count=100)
wrpcap("capture.pcap", packets)
print("Saved to capture.pcap")
# Load capture
loaded_packets = rdpcap("capture.pcap")
print(f"Loaded {len(loaded_packets)} packets")
# Analyze loaded packets
for packet in loaded_packets:
print(packet.summary())
Complete Sniffer Tool
#!/usr/bin/env python3
"""
Network Packet Sniffer
"""
import argparse
from datetime import datetime
from scapy.all import sniff, IP, TCP, UDP, DNS, Raw, wrpcap
class PacketSniffer:
def __init__(self, interface=None, output_file=None):
self.interface = interface
self.output_file = output_file
self.packets = []
self.packet_count = 0
def process_packet(self, packet):
self.packet_count += 1
self.packets.append(packet)
timestamp = datetime.now().strftime("%H:%M:%S")
if IP in packet:
src = packet[IP].src
dst = packet[IP].dst
proto = "TCP" if TCP in packet else "UDP" if UDP in packet else "OTHER"
info = ""
if TCP in packet:
info = f":{packet[TCP].sport} -> :{packet[TCP].dport}"
elif UDP in packet:
info = f":{packet[UDP].sport} -> :{packet[UDP].dport}"
print(f"[{timestamp}] #{self.packet_count} {proto} {src}{info.split(' ->')[0]} -> {dst}{info.split('-> ')[-1] if ' -> ' in info else ''}")
def start(self, count=0, filter_expr=None):
print(f"Starting sniffer...")
print(f"Interface: {self.interface or 'default'}")
print(f"Filter: {filter_expr or 'none'}")
print("-" * 60)
try:
sniff(
iface=self.interface,
filter=filter_expr,
prn=self.process_packet,
count=count,
store=True
)
except KeyboardInterrupt:
print("\nStopping...")
if self.output_file and self.packets:
wrpcap(self.output_file, self.packets)
print(f"Saved {len(self.packets)} packets to {self.output_file}")
def main():
parser = argparse.ArgumentParser(description="Packet Sniffer")
parser.add_argument("-i", "--interface", help="Network interface")
parser.add_argument("-c", "--count", type=int, default=0, help="Packet count (0=infinite)")
parser.add_argument("-f", "--filter", help="BPF filter expression")
parser.add_argument("-o", "--output", help="Output PCAP file")
args = parser.parse_args()
sniffer = PacketSniffer(
interface=args.interface,
output_file=args.output
)
sniffer.start(count=args.count, filter_expr=args.filter)
if __name__ == "__main__":
main()
Bước tiếp theo
Bạn đã hoàn thành khóa học Python for Cybersecurity! Tiếp tục:
- Thực hành với CTF challenges
- Xây dựng tools của riêng bạn
- Học thêm về reverse engineering
⚠️ Legal Notice: Chỉ sniff traffic trên networks bạn sở hữu hoặc có permission!