Xây dựng Port Scanner với Python

Tạo công cụ quét port chuyên nghiệp với Python - từ basic scanner đến threaded scanner với service detection.

Port Scanner là gì?

Port Scanner là công cụ để khám phá các cổng mạng đang mở trên một host. Đây là bước đầu tiên trong reconnaissance khi thực hiện penetration testing.

Basic Port Scanner

#!/usr/bin/env python3
"""
Basic Port Scanner
Author: PION Learning
"""

import socket
import sys
from datetime import datetime

def scan_port(host: str, port: int) -> bool:
    """
    Scan a single port on target host.
    Returns True if port is open, False otherwise.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(1)
    
    try:
        result = sock.connect_ex((host, port))
        return result == 0
    except socket.error:
        return False
    finally:
        sock.close()

def basic_scanner(host: str, start_port: int, end_port: int):
    """Scan a range of ports on target host."""
    
    print("-" * 50)
    print(f"Scanning target: {host}")
    print(f"Time started: {datetime.now()}")
    print("-" * 50)
    
    open_ports = []
    
    for port in range(start_port, end_port + 1):
        if scan_port(host, port):
            print(f"[+] Port {port} is OPEN")
            open_ports.append(port)
    
    print("-" * 50)
    print(f"Scan completed in {datetime.now()}")
    print(f"Open ports: {open_ports}")
    
    return open_ports

if __name__ == "__main__":
    target = input("Enter target host: ")
    basic_scanner(target, 1, 1024)

Threaded Port Scanner

Scanner cơ bản chậm vì quét tuần tự. Sử dụng threading để scan nhanh hơn:

#!/usr/bin/env python3
"""
Threaded Port Scanner
Author: PION Learning
"""

import socket
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Tuple, Dict

# Common ports and services
COMMON_PORTS = {
    21: "FTP",
    22: "SSH", 
    23: "Telnet",
    25: "SMTP",
    53: "DNS",
    80: "HTTP",
    110: "POP3",
    135: "MSRPC",
    139: "NetBIOS",
    143: "IMAP",
    443: "HTTPS",
    445: "SMB",
    993: "IMAPS",
    995: "POP3S",
    1433: "MSSQL",
    1521: "Oracle",
    3306: "MySQL",
    3389: "RDP",
    5432: "PostgreSQL",
    5900: "VNC",
    6379: "Redis",
    8080: "HTTP-Alt",
    8443: "HTTPS-Alt",
    27017: "MongoDB"
}

def scan_port(host: str, port: int, timeout: float = 1.0) -> Tuple[int, bool, str]:
    """
    Scan a single port and return (port, is_open, service_name)
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    
    try:
        result = sock.connect_ex((host, port))
        is_open = result == 0
        service = COMMON_PORTS.get(port, "Unknown")
        return (port, is_open, service)
    except socket.error:
        return (port, False, "")
    finally:
        sock.close()

def threaded_scanner(
    host: str, 
    ports: List[int], 
    threads: int = 100,
    timeout: float = 1.0
) -> List[Dict]:
    """
    Scan multiple ports using thread pool.
    """
    
    print("=" * 60)
    print(f" PION Port Scanner")
    print("=" * 60)
    print(f" Target: {host}")
    print(f" Ports: {len(ports)}")
    print(f" Threads: {threads}")
    print(f" Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 60)
    
    results = []
    open_count = 0
    
    with ThreadPoolExecutor(max_workers=threads) as executor:
        # Submit all scan tasks
        futures = {
            executor.submit(scan_port, host, port, timeout): port 
            for port in ports
        }
        
        # Process results as they complete
        for future in as_completed(futures):
            port, is_open, service = future.result()
            
            if is_open:
                open_count += 1
                print(f" [+] {port:5d}/tcp   open    {service}")
                results.append({
                    "port": port,
                    "state": "open",
                    "service": service
                })
    
    print("=" * 60)
    print(f" Scan complete: {open_count} open ports found")
    print(f" Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 60)
    
    return sorted(results, key=lambda x: x["port"])

def main():
    """Main function with CLI interface."""
    
    if len(sys.argv) < 2:
        print("Usage: python port_scanner.py <target> [port_range]")
        print("Examples:")
        print("  python port_scanner.py 192.168.1.1")
        print("  python port_scanner.py 192.168.1.1 1-1000")
        print("  python port_scanner.py scanme.nmap.org common")
        sys.exit(1)
    
    target = sys.argv[1]
    
    # Resolve hostname
    try:
        ip = socket.gethostbyname(target)
        print(f"Resolved {target} to {ip}")
    except socket.gaierror:
        print(f"Could not resolve hostname: {target}")
        sys.exit(1)
    
    # Parse port range
    if len(sys.argv) >= 3:
        port_arg = sys.argv[2]
        
        if port_arg == "common":
            ports = list(COMMON_PORTS.keys())
        elif "-" in port_arg:
            start, end = map(int, port_arg.split("-"))
            ports = list(range(start, end + 1))
        else:
            ports = [int(port_arg)]
    else:
        # Default: common ports
        ports = list(COMMON_PORTS.keys())
    
    # Run scan
    results = threaded_scanner(target, ports)
    
    return results

if __name__ == "__main__":
    main()

Scanner với Banner Grabbing

#!/usr/bin/env python3
"""
Advanced Port Scanner with Banner Grabbing
Author: PION Learning
"""

import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Tuple, Optional

# Probes for different services
SERVICE_PROBES = {
    21: b"",                                    # FTP sends banner
    22: b"",                                    # SSH sends banner
    25: b"EHLO scanner\r\n",                    # SMTP
    80: b"GET / HTTP/1.1\r\nHost: target\r\n\r\n",  # HTTP
    110: b"",                                   # POP3 sends banner
    143: b"",                                   # IMAP sends banner
    443: b"",                                   # HTTPS - need SSL
    3306: b"",                                  # MySQL sends banner
}

def grab_banner(
    host: str, 
    port: int, 
    timeout: float = 2.0
) -> Tuple[int, bool, Optional[str]]:
    """
    Connect to port and grab service banner.
    Returns (port, is_open, banner)
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    
    try:
        sock.connect((host, port))
        
        # Send probe if available
        probe = SERVICE_PROBES.get(port, b"")
        if probe:
            sock.send(probe)
        
        # Receive banner
        banner = sock.recv(1024)
        banner_str = banner.decode(errors='ignore').strip()
        
        # Clean up banner (first line only)
        banner_str = banner_str.split('\n')[0][:80]
        
        return (port, True, banner_str)
        
    except socket.timeout:
        return (port, True, None)  # Open but no banner
    except socket.error:
        return (port, False, None)
    finally:
        sock.close()

def banner_scanner(host: str, ports: list, threads: int = 50):
    """Scan ports and grab banners."""
    
    print(f"\n[*] Banner Grabbing Scanner")
    print(f"[*] Target: {host}")
    print("-" * 70)
    
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = [executor.submit(grab_banner, host, p) for p in ports]
        
        for future in futures:
            port, is_open, banner = future.result()
            
            if is_open:
                if banner:
                    print(f"[+] {port:5d}  OPEN  {banner}")
                else:
                    print(f"[+] {port:5d}  OPEN  (no banner)")
    
    print("-" * 70)

# Run
# banner_scanner("scanme.nmap.org", [22, 80, 443])

Scanner dạng Class

#!/usr/bin/env python3
"""
Object-Oriented Port Scanner
Author: PION Learning
"""

import socket
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
import json

@dataclass
class ScanResult:
    """Data class for scan results"""
    port: int
    state: str  # open, closed, filtered
    service: str
    banner: Optional[str] = None
    
    def to_dict(self) -> dict:
        return {
            "port": self.port,
            "state": self.state,
            "service": self.service,
            "banner": self.banner
        }

class PortScanner:
    """
    Professional Port Scanner class with multiple scan modes.
    """
    
    COMMON_PORTS = {
        21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
        53: "DNS", 80: "HTTP", 443: "HTTPS", 445: "SMB",
        3306: "MySQL", 3389: "RDP", 5432: "PostgreSQL"
    }
    
    def __init__(
        self, 
        target: str,
        timeout: float = 1.0,
        threads: int = 100
    ):
        self.target = target
        self.timeout = timeout
        self.threads = threads
        self.results: List[ScanResult] = []
        self.start_time: Optional[datetime] = None
        self.end_time: Optional[datetime] = None
        
        # Resolve target
        try:
            self.ip = socket.gethostbyname(target)
        except socket.gaierror:
            raise ValueError(f"Cannot resolve: {target}")
    
    def _scan_port(self, port: int) -> ScanResult:
        """Scan single port."""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        
        try:
            result = sock.connect_ex((self.ip, port))
            
            if result == 0:
                service = self.COMMON_PORTS.get(port, "unknown")
                return ScanResult(port, "open", service)
            else:
                return ScanResult(port, "closed", "")
                
        except socket.timeout:
            return ScanResult(port, "filtered", "")
        except socket.error:
            return ScanResult(port, "error", "")
        finally:
            sock.close()
    
    def scan(self, ports: List[int]) -> List[ScanResult]:
        """
        Scan list of ports using thread pool.
        """
        self.start_time = datetime.now()
        self.results = []
        
        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            futures = [executor.submit(self._scan_port, p) for p in ports]
            
            for future in futures:
                result = future.result()
                if result.state == "open":
                    self.results.append(result)
        
        self.end_time = datetime.now()
        self.results.sort(key=lambda x: x.port)
        
        return self.results
    
    def scan_range(self, start: int, end: int) -> List[ScanResult]:
        """Scan port range."""
        return self.scan(list(range(start, end + 1)))
    
    def scan_common(self) -> List[ScanResult]:
        """Scan common ports."""
        return self.scan(list(self.COMMON_PORTS.keys()))
    
    def summary(self) -> str:
        """Generate scan summary."""
        duration = self.end_time - self.start_time if self.end_time else None
        
        lines = [
            "=" * 50,
            f"Scan Report: {self.target} ({self.ip})",
            "=" * 50,
            f"Start: {self.start_time}",
            f"End: {self.end_time}",
            f"Duration: {duration}",
            "-" * 50,
            "PORT      STATE    SERVICE",
            "-" * 50
        ]
        
        for r in self.results:
            lines.append(f"{r.port:5d}/tcp {r.state:8s} {r.service}")
        
        lines.append("-" * 50)
        lines.append(f"Total open ports: {len(self.results)}")
        lines.append("=" * 50)
        
        return "\n".join(lines)
    
    def to_json(self) -> str:
        """Export results to JSON."""
        data = {
            "target": self.target,
            "ip": self.ip,
            "scan_time": str(self.start_time),
            "open_ports": [r.to_dict() for r in self.results]
        }
        return json.dumps(data, indent=2)

# Usage
if __name__ == "__main__":
    scanner = PortScanner("scanme.nmap.org")
    
    # Scan common ports
    results = scanner.scan_common()
    
    # Print summary
    print(scanner.summary())
    
    # Export to JSON
    # print(scanner.to_json())

Sử dụng Scanner

# Basic usage
python port_scanner.py 192.168.1.1

# Scan range
python port_scanner.py 192.168.1.1 1-1000

# Scan common ports
python port_scanner.py scanme.nmap.org common

Bước tiếp theo

Trong các bài tiếp theo:

  • Web Scraping cho reconnaissance
  • Password Tools với hashlib
  • Packet Sniffing với Scapy

⚠️ Legal Notice: Chỉ scan các hệ thống bạn có quyền. Scan không phép là bất hợp pháp!