Port Scanner là gì?
Port Scanner là công cụ để khám phá các cổng mạng đang mở trên một host. Đây là bước đầu tiên trong reconnaissance khi thực hiện penetration testing.
Basic Port Scanner
#!/usr/bin/env python3
"""
Basic Port Scanner
Author: PION Learning
"""
import socket
import sys
from datetime import datetime
def scan_port(host: str, port: int) -> bool:
"""
Scan a single port on target host.
Returns True if port is open, False otherwise.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
result = sock.connect_ex((host, port))
return result == 0
except socket.error:
return False
finally:
sock.close()
def basic_scanner(host: str, start_port: int, end_port: int):
"""Scan a range of ports on target host."""
print("-" * 50)
print(f"Scanning target: {host}")
print(f"Time started: {datetime.now()}")
print("-" * 50)
open_ports = []
for port in range(start_port, end_port + 1):
if scan_port(host, port):
print(f"[+] Port {port} is OPEN")
open_ports.append(port)
print("-" * 50)
print(f"Scan completed in {datetime.now()}")
print(f"Open ports: {open_ports}")
return open_ports
if __name__ == "__main__":
target = input("Enter target host: ")
basic_scanner(target, 1, 1024)
Threaded Port Scanner
Scanner cơ bản chậm vì quét tuần tự. Sử dụng threading để scan nhanh hơn:
#!/usr/bin/env python3
"""
Threaded Port Scanner
Author: PION Learning
"""
import socket
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Tuple, Dict
# Common ports and services
COMMON_PORTS = {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
135: "MSRPC",
139: "NetBIOS",
143: "IMAP",
443: "HTTPS",
445: "SMB",
993: "IMAPS",
995: "POP3S",
1433: "MSSQL",
1521: "Oracle",
3306: "MySQL",
3389: "RDP",
5432: "PostgreSQL",
5900: "VNC",
6379: "Redis",
8080: "HTTP-Alt",
8443: "HTTPS-Alt",
27017: "MongoDB"
}
def scan_port(host: str, port: int, timeout: float = 1.0) -> Tuple[int, bool, str]:
"""
Scan a single port and return (port, is_open, service_name)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((host, port))
is_open = result == 0
service = COMMON_PORTS.get(port, "Unknown")
return (port, is_open, service)
except socket.error:
return (port, False, "")
finally:
sock.close()
def threaded_scanner(
host: str,
ports: List[int],
threads: int = 100,
timeout: float = 1.0
) -> List[Dict]:
"""
Scan multiple ports using thread pool.
"""
print("=" * 60)
print(f" PION Port Scanner")
print("=" * 60)
print(f" Target: {host}")
print(f" Ports: {len(ports)}")
print(f" Threads: {threads}")
print(f" Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
results = []
open_count = 0
with ThreadPoolExecutor(max_workers=threads) as executor:
# Submit all scan tasks
futures = {
executor.submit(scan_port, host, port, timeout): port
for port in ports
}
# Process results as they complete
for future in as_completed(futures):
port, is_open, service = future.result()
if is_open:
open_count += 1
print(f" [+] {port:5d}/tcp open {service}")
results.append({
"port": port,
"state": "open",
"service": service
})
print("=" * 60)
print(f" Scan complete: {open_count} open ports found")
print(f" Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
return sorted(results, key=lambda x: x["port"])
def main():
"""Main function with CLI interface."""
if len(sys.argv) < 2:
print("Usage: python port_scanner.py <target> [port_range]")
print("Examples:")
print(" python port_scanner.py 192.168.1.1")
print(" python port_scanner.py 192.168.1.1 1-1000")
print(" python port_scanner.py scanme.nmap.org common")
sys.exit(1)
target = sys.argv[1]
# Resolve hostname
try:
ip = socket.gethostbyname(target)
print(f"Resolved {target} to {ip}")
except socket.gaierror:
print(f"Could not resolve hostname: {target}")
sys.exit(1)
# Parse port range
if len(sys.argv) >= 3:
port_arg = sys.argv[2]
if port_arg == "common":
ports = list(COMMON_PORTS.keys())
elif "-" in port_arg:
start, end = map(int, port_arg.split("-"))
ports = list(range(start, end + 1))
else:
ports = [int(port_arg)]
else:
# Default: common ports
ports = list(COMMON_PORTS.keys())
# Run scan
results = threaded_scanner(target, ports)
return results
if __name__ == "__main__":
main()
Scanner với Banner Grabbing
#!/usr/bin/env python3
"""
Advanced Port Scanner with Banner Grabbing
Author: PION Learning
"""
import socket
from concurrent.futures import ThreadPoolExecutor
from typing import Tuple, Optional
# Probes for different services
SERVICE_PROBES = {
21: b"", # FTP sends banner
22: b"", # SSH sends banner
25: b"EHLO scanner\r\n", # SMTP
80: b"GET / HTTP/1.1\r\nHost: target\r\n\r\n", # HTTP
110: b"", # POP3 sends banner
143: b"", # IMAP sends banner
443: b"", # HTTPS - need SSL
3306: b"", # MySQL sends banner
}
def grab_banner(
host: str,
port: int,
timeout: float = 2.0
) -> Tuple[int, bool, Optional[str]]:
"""
Connect to port and grab service banner.
Returns (port, is_open, banner)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
# Send probe if available
probe = SERVICE_PROBES.get(port, b"")
if probe:
sock.send(probe)
# Receive banner
banner = sock.recv(1024)
banner_str = banner.decode(errors='ignore').strip()
# Clean up banner (first line only)
banner_str = banner_str.split('\n')[0][:80]
return (port, True, banner_str)
except socket.timeout:
return (port, True, None) # Open but no banner
except socket.error:
return (port, False, None)
finally:
sock.close()
def banner_scanner(host: str, ports: list, threads: int = 50):
"""Scan ports and grab banners."""
print(f"\n[*] Banner Grabbing Scanner")
print(f"[*] Target: {host}")
print("-" * 70)
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = [executor.submit(grab_banner, host, p) for p in ports]
for future in futures:
port, is_open, banner = future.result()
if is_open:
if banner:
print(f"[+] {port:5d} OPEN {banner}")
else:
print(f"[+] {port:5d} OPEN (no banner)")
print("-" * 70)
# Run
# banner_scanner("scanme.nmap.org", [22, 80, 443])
Scanner dạng Class
#!/usr/bin/env python3
"""
Object-Oriented Port Scanner
Author: PION Learning
"""
import socket
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
import json
@dataclass
class ScanResult:
"""Data class for scan results"""
port: int
state: str # open, closed, filtered
service: str
banner: Optional[str] = None
def to_dict(self) -> dict:
return {
"port": self.port,
"state": self.state,
"service": self.service,
"banner": self.banner
}
class PortScanner:
"""
Professional Port Scanner class with multiple scan modes.
"""
COMMON_PORTS = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
53: "DNS", 80: "HTTP", 443: "HTTPS", 445: "SMB",
3306: "MySQL", 3389: "RDP", 5432: "PostgreSQL"
}
def __init__(
self,
target: str,
timeout: float = 1.0,
threads: int = 100
):
self.target = target
self.timeout = timeout
self.threads = threads
self.results: List[ScanResult] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
# Resolve target
try:
self.ip = socket.gethostbyname(target)
except socket.gaierror:
raise ValueError(f"Cannot resolve: {target}")
def _scan_port(self, port: int) -> ScanResult:
"""Scan single port."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
try:
result = sock.connect_ex((self.ip, port))
if result == 0:
service = self.COMMON_PORTS.get(port, "unknown")
return ScanResult(port, "open", service)
else:
return ScanResult(port, "closed", "")
except socket.timeout:
return ScanResult(port, "filtered", "")
except socket.error:
return ScanResult(port, "error", "")
finally:
sock.close()
def scan(self, ports: List[int]) -> List[ScanResult]:
"""
Scan list of ports using thread pool.
"""
self.start_time = datetime.now()
self.results = []
with ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = [executor.submit(self._scan_port, p) for p in ports]
for future in futures:
result = future.result()
if result.state == "open":
self.results.append(result)
self.end_time = datetime.now()
self.results.sort(key=lambda x: x.port)
return self.results
def scan_range(self, start: int, end: int) -> List[ScanResult]:
"""Scan port range."""
return self.scan(list(range(start, end + 1)))
def scan_common(self) -> List[ScanResult]:
"""Scan common ports."""
return self.scan(list(self.COMMON_PORTS.keys()))
def summary(self) -> str:
"""Generate scan summary."""
duration = self.end_time - self.start_time if self.end_time else None
lines = [
"=" * 50,
f"Scan Report: {self.target} ({self.ip})",
"=" * 50,
f"Start: {self.start_time}",
f"End: {self.end_time}",
f"Duration: {duration}",
"-" * 50,
"PORT STATE SERVICE",
"-" * 50
]
for r in self.results:
lines.append(f"{r.port:5d}/tcp {r.state:8s} {r.service}")
lines.append("-" * 50)
lines.append(f"Total open ports: {len(self.results)}")
lines.append("=" * 50)
return "\n".join(lines)
def to_json(self) -> str:
"""Export results to JSON."""
data = {
"target": self.target,
"ip": self.ip,
"scan_time": str(self.start_time),
"open_ports": [r.to_dict() for r in self.results]
}
return json.dumps(data, indent=2)
# Usage
if __name__ == "__main__":
scanner = PortScanner("scanme.nmap.org")
# Scan common ports
results = scanner.scan_common()
# Print summary
print(scanner.summary())
# Export to JSON
# print(scanner.to_json())
Sử dụng Scanner
# Basic usage
python port_scanner.py 192.168.1.1
# Scan range
python port_scanner.py 192.168.1.1 1-1000
# Scan common ports
python port_scanner.py scanme.nmap.org common
Bước tiếp theo
Trong các bài tiếp theo:
- Web Scraping cho reconnaissance
- Password Tools với hashlib
- Packet Sniffing với Scapy
⚠️ Legal Notice: Chỉ scan các hệ thống bạn có quyền. Scan không phép là bất hợp pháp!