Pythonでpingの実装してみる
Writing simple ping example
from enum import Enum
class ICMPType(Enum):
ECHOREPLY = 0
ECHO = 8
def __int__(self):
return self.value
また,パケットフォーマットからチェックサムフィールドを確認することができます. このチェックサムは以下の手順で計算します.
- チェックサムフィールドを
0
で埋める - パケットを16ビット単位で区切る(パケット長が奇数バイトである場合は
0x00
を末尾に追加) - 16ビット単位で区切ったデータを1の補数で加算していき合計値を求める
- 加算結果の1の補数を最終結果とする.この時,0の表現としては
0xffff
を用いる.
以上の手順でチェックサムを計算する関数calc_checksum
を以下のように定義します.
def calc_checksum(data: bytes) -> int:
if len(data) % 2 == 1:
data += b"\x00"
u16_counts = len(data) // 2
checksum = sum(struct.unpack(f"!{u16_counts}H", data))
while 0xFFFF < checksum:
checksum = (checksum & 0xFFFF) + (checksum >> 16)
if checksum != 0xFFFF:
checksum = ~checksum
return checksum & 0xFFFF
以下に示すICMPEcho
がエコー要求とエコー応答を表現するデータ構造となります.checksum
がコンストラクタ引数として指定されなかった場合は,calc_checksum
にて計算します.
ICMPEcho
はネットワークを通して送受信されます.従って,シリアライズを行うto_bytes
と,デシリアライズを行うfrom_bytes
を実装しています.
from dataclasses import dataclass
from typing import Optional
import struct
@dataclass(frozen=True)
class ICMPEcho:
type: ICMPType
code: int
id: int
seq: int
data: bytes
checksum: Optional[int] = None
def __post_init__(self):
if self.checksum is None:
object.__setattr__(self, "checksum", 0)
object.__setattr__(self, "checksum", calc_checksum(self.to_bytes()))
def to_bytes(self) -> bytes:
return struct.pack(
f"!BBHHH{len(self.data)}s",
int(self.type),
self.code,
self.checksum,
self.id,
self.seq,
self.data,
)
@classmethod
def from_bytes(cls, packed: bytes) -> "ICMPEcho":
_type, code, checksum, id, seq = struct.unpack("!BBHHH", packed[:8])
type = ICMPType(_type)
data = packed[8:]
return ICMPEcho(type, code, id, seq, data, checksum=checksum)
import socket
from contextlib import contextmanager
@contextmanager
def raw_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
try:
yield sock
finally:
sock.close()
from typing import Tuple
@dataclass(frozen=True)
class IPHeader:
v: int
hl: int
tos: int
len: int
id: int
off: int
ttl: int
p: int
sum: int
src: str
dst: str
@staticmethod
def from_bytes(packed: bytes) -> "IPHeader":
v_hl, tos, len, id, off, ttl, p, sum, src, dst = struct.unpack(
"!BBHHHBBHII", packed
)
v = v_hl >> 4
hl = v_hl & 0x0F
return IPHeader(
v,
hl,
tos,
len,
id,
off,
ttl,
p,
sum,
socket.inet_ntoa(src.to_bytes(4, byteorder="big")),
socket.inet_ntoa(dst.to_bytes(4, byteorder="big")),
)
def parse_ip_datagram(data: bytes) -> Tuple[IPHeader, bytes]:
ip_header = IPHeader.from_bytes(data[:20])
payload = data[20:]
return (ip_header, payload)
def print_response(ip_header: IPHeader, echo_reply: ICMPEcho) -> None:
print(
f"ping echo reply from {ip_header.src}: icmp_seq={echo_reply.seq} ttl={ip_header.ttl}"
)
def ping(host: str, seq: int) -> None:
with raw_socket() as sock:
packet = ICMPEcho(ICMPType.ECHO, 0, 0, seq, b"\xff").to_bytes()
sock.sendto(packet, (host, 0))
ip_header, payload = parse_ip_datagram(sock.recvfrom(4096)[0])
echo_reply = ICMPEcho.from_bytes(payload)
print_response(ip_header, echo_reply)
import time
for i in range(10):
ping("8.8.8.8", i)
time.sleep(1)