from enum import Enum
class ICMPType(Enum):
= 0
ECHOREPLY = 8
ECHO
def __int__(self):
return self.value
Pythonでpingの実装してみる
はじめに
ネットワークの疎通確認を行う際など,pingコマンドにお世話になっている方は多いと思います.私も勿論その一人です. ですが,実際pingは何をしているのか?という点は前々理解していませんでした.そこで,pythonにてpingのサブセット(エコー要求とエコー応答のみ)を実装してみました.
やったこと
- Pythonでpingコマンドのサブセットを実装
- エコー要求に対してエコー応答が返信されることを確認
ICMPについて
ICMPとはInternet Control Message Protocolの略称です.名前の通り,インターネットの通信に関する情報の送信に用いられます. pingコマンドは,このプロトコルのエコー要求とエコー応答という二つのメッセージをやりとりすることで,端末間の疎通確認を行います.
エコー要求とエコー応答のパケットフォーマットは以下の通りです.
なお,これらのメッセージはIPデータグラムのペイロードとして送受信されます.
Pythonによる実装
エコー要求とエコー応答に対応するデータ構造
上述のパケットフォーマットにおけるType
は,エコー要求では8
が,エコー応答では0
が設定されます. 従って,以下のようにICMPType
として定義することができます.
また,パケットフォーマットからチェックサムフィールドを確認することができます. このチェックサムは以下の手順で計算します.
- チェックサムフィールドを
0
で埋める - パケットを16ビット単位で区切る(パケット長が奇数バイトである場合は
0x00
を末尾に追加) - 16ビット単位で区切ったデータを1の補数で加算していき合計値を求める
- 加算結果の1の補数を最終結果とする.この時,0の表現としては
0xffff
を用いる.
以上の手順でチェックサムを計算する関数calc_checksum
を以下のように定義します.
def calc_checksum(data: bytes) -> int:
if len(data) % 2 == 1:
+= b"\x00"
data = len(data) // 2
u16_counts = sum(struct.unpack(f"!{u16_counts}H", data))
checksum while 0xFFFF < checksum:
= (checksum & 0xFFFF) + (checksum >> 16)
checksum if checksum != 0xFFFF:
= ~checksum
checksum return checksum & 0xFFFF
以下に示すICMPEcho
がエコー要求とエコー応答を表現するデータ構造となります.checksum
がコンストラクタ引数として指定されなかった場合は,calc_checksum
にて計算します.
ICMPEcho
はネットワークを通して送受信されます.従って,シリアライズを行うto_bytes
と,デシリアライズを行うfrom_bytes
を実装しています.
from dataclasses import dataclass
from typing import Optional
import struct
@dataclass(frozen=True)
class ICMPEcho:
type: ICMPType
int
code: id: int
int
seq: bytes
data: int] = None
checksum: Optional[
def __post_init__(self):
if self.checksum is None:
object.__setattr__(self, "checksum", 0)
object.__setattr__(self, "checksum", calc_checksum(self.to_bytes()))
def to_bytes(self) -> bytes:
return struct.pack(
f"!BBHHH{len(self.data)}s",
int(self.type),
self.code,
self.checksum,
self.id,
self.seq,
self.data,
)
@classmethod
def from_bytes(cls, packed: bytes) -> "ICMPEcho":
id, seq = struct.unpack("!BBHHH", packed[:8])
_type, code, checksum, type = ICMPType(_type)
= packed[8:]
data return ICMPEcho(type, code, id, seq, data, checksum=checksum)
Rawソケットの作成
TCPやUDPを用いた時と同様,ICMPでもソケットを用いてネットワークプログラミングを行います. しかしながら,ICMPパケットはIPデータグラムのペイロードとするため,IPデータグラムを操作する必要があります.
IPデータグラムの操作はRawソケットを用いることで実現できます.そこで,ソケットを作成する関数raw_socket
を以下のように定義しました. socket.socket
の第2引数にはsocket.SOCK_RAW
を第3引数にはsocket.IPPROTO_ICMP
を指定します.これは,RawソケットをICMPから利用することを表していいます.
import socket
from contextlib import contextmanager
@contextmanager
def raw_socket():
= socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
sock try:
yield sock
finally:
sock.close()
IPデータグラムのパース
Rawソケットによって受信したデータはIPデータグラムです.従って,これを適切にパースし,エコー応答をペイロードとして取り出す必要があります. 今回,以下のようにIPヘッダを表現するIPHeader
と,IPデータグラムをパースするparse_ip_datagram
関数を実装しました. parse_ip_datagram
関数では,受信したデータの先頭20バイトからIPHeader
インスタンスを作成し,残りのデータをペイロードとします.
from typing import Tuple
@dataclass(frozen=True)
class IPHeader:
int
v: int
hl: int
tos: len: int
id: int
int
off: int
ttl: int
p: sum: int
str
src: str
dst:
@staticmethod
def from_bytes(packed: bytes) -> "IPHeader":
len, id, off, ttl, p, sum, src, dst = struct.unpack(
v_hl, tos, "!BBHHHBBHII", packed
)= v_hl >> 4
v = v_hl & 0x0F
hl
return IPHeader(
v,
hl,
tos,len,
id,
off,
ttl,
p,sum,
4, byteorder="big")),
socket.inet_ntoa(src.to_bytes(4, byteorder="big")),
socket.inet_ntoa(dst.to_bytes(
)
def parse_ip_datagram(data: bytes) -> Tuple[IPHeader, bytes]:
= IPHeader.from_bytes(data[:20])
ip_header = data[20:]
payload return (ip_header, payload)
エコー要求の送信とエコー応答の受信
これまで実装してきた関数とデータ構造を用いて,エコー要求を送信しエコー応答を受信する関数ping
を実装しました. 以下に示されるように,エコー要求をソケットに書き込み,ソケットから読み取ったエコー応答とIPヘッダを出力します.
def print_response(ip_header: IPHeader, echo_reply: ICMPEcho) -> None:
print(
f"ping echo reply from {ip_header.src}: icmp_seq={echo_reply.seq} ttl={ip_header.ttl}"
)
def ping(host: str, seq: int) -> None:
with raw_socket() as sock:
= ICMPEcho(ICMPType.ECHO, 0, 0, seq, b"\xff").to_bytes()
packet 0))
sock.sendto(packet, (host, = parse_ip_datagram(sock.recvfrom(4096)[0])
ip_header, payload = ICMPEcho.from_bytes(payload)
echo_reply print_response(ip_header, echo_reply)
実行結果
実際に8.8.8.8
に対してping
を呼び出した結果を以下に示します. エコー要求に対して適切なエコー応答が帰ってきていることが確認できます.
import time
for i in range(10):
"8.8.8.8", i)
ping(1) time.sleep(
ping echo reply from 8.8.8.8: icmp_seq=0 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=1 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=2 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=3 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=4 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=5 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=6 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=7 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=8 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=9 ttl=37