Pythonでpingの実装してみる

Python
ネットワーク
Published

August 1, 2021

はじめに

ネットワークの疎通確認を行う際など,pingコマンドにお世話になっている方は多いと思います.私も勿論その一人です. ですが,実際pingは何をしているのか?という点は前々理解していませんでした.そこで,pythonにてpingのサブセット(エコー要求とエコー応答のみ)を実装してみました.

やったこと

  • Pythonでpingコマンドのサブセットを実装
  • エコー要求に対してエコー応答が返信されることを確認

ICMPについて

ICMPとはInternet Control Message Protocolの略称です.名前の通り,インターネットの通信に関する情報の送信に用いられます. pingコマンドは,このプロトコルのエコー要求とエコー応答という二つのメッセージをやりとりすることで,端末間の疎通確認を行います.

エコー要求とエコー応答のパケットフォーマットは以下の通りです.

icmp spec

なお,これらのメッセージはIPデータグラムのペイロードとして送受信されます.

Pythonによる実装

エコー要求とエコー応答に対応するデータ構造

上述のパケットフォーマットにおけるTypeは,エコー要求では8が,エコー応答では0が設定されます. 従って,以下のようにICMPTypeとして定義することができます.

from enum import Enum

class ICMPType(Enum):
    ECHOREPLY = 0
    ECHO = 8

    def __int__(self):
        return self.value

また,パケットフォーマットからチェックサムフィールドを確認することができます. このチェックサムは以下の手順で計算します.

  1. チェックサムフィールドを0で埋める
  2. パケットを16ビット単位で区切る(パケット長が奇数バイトである場合は0x00を末尾に追加)
  3. 16ビット単位で区切ったデータを1の補数で加算していき合計値を求める
  4. 加算結果の1の補数を最終結果とする.この時,0の表現としては0xffffを用いる.

以上の手順でチェックサムを計算する関数calc_checksumを以下のように定義します.

def calc_checksum(data: bytes) -> int:
    if len(data) % 2 == 1:
        data += b"\x00"
    u16_counts = len(data) // 2
    checksum = sum(struct.unpack(f"!{u16_counts}H", data))
    while 0xFFFF < checksum:
        checksum = (checksum & 0xFFFF) + (checksum >> 16)
    if checksum != 0xFFFF:
        checksum = ~checksum
    return checksum & 0xFFFF

以下に示すICMPEchoがエコー要求とエコー応答を表現するデータ構造となります.checksumがコンストラクタ引数として指定されなかった場合は,calc_checksumにて計算します.

ICMPEchoはネットワークを通して送受信されます.従って,シリアライズを行うto_bytesと,デシリアライズを行うfrom_bytesを実装しています.

from dataclasses import dataclass
from typing import Optional
import struct

@dataclass(frozen=True)
class ICMPEcho:
    type: ICMPType
    code: int
    id: int
    seq: int
    data: bytes
    checksum: Optional[int] = None

    def __post_init__(self):
        if self.checksum is None:
            object.__setattr__(self, "checksum", 0)
            object.__setattr__(self, "checksum", calc_checksum(self.to_bytes()))

    def to_bytes(self) -> bytes:
        return struct.pack(
            f"!BBHHH{len(self.data)}s",
            int(self.type),
            self.code,
            self.checksum,
            self.id,
            self.seq,
            self.data,
        )

    @classmethod
    def from_bytes(cls, packed: bytes) -> "ICMPEcho":
        _type, code, checksum, id, seq = struct.unpack("!BBHHH", packed[:8])
        type = ICMPType(_type)
        data = packed[8:]
        return ICMPEcho(type, code, id, seq, data, checksum=checksum)

Rawソケットの作成

TCPやUDPを用いた時と同様,ICMPでもソケットを用いてネットワークプログラミングを行います. しかしながら,ICMPパケットはIPデータグラムのペイロードとするため,IPデータグラムを操作する必要があります.

IPデータグラムの操作はRawソケットを用いることで実現できます.そこで,ソケットを作成する関数raw_socketを以下のように定義しました. socket.socketの第2引数にはsocket.SOCK_RAWを第3引数にはsocket.IPPROTO_ICMPを指定します.これは,RawソケットをICMPから利用することを表していいます.

import socket
from contextlib import contextmanager

@contextmanager
def raw_socket():
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    try:
        yield sock
    finally:
        sock.close()

IPデータグラムのパース

Rawソケットによって受信したデータはIPデータグラムです.従って,これを適切にパースし,エコー応答をペイロードとして取り出す必要があります. 今回,以下のようにIPヘッダを表現するIPHeaderと,IPデータグラムをパースするparse_ip_datagram関数を実装しました. parse_ip_datagram関数では,受信したデータの先頭20バイトからIPHeaderインスタンスを作成し,残りのデータをペイロードとします.

from typing import Tuple

@dataclass(frozen=True)
class IPHeader:
    v: int
    hl: int
    tos: int
    len: int
    id: int
    off: int
    ttl: int
    p: int
    sum: int
    src: str
    dst: str

    @staticmethod
    def from_bytes(packed: bytes) -> "IPHeader":
        v_hl, tos, len, id, off, ttl, p, sum, src, dst = struct.unpack(
            "!BBHHHBBHII", packed
        )
        v = v_hl >> 4
        hl = v_hl & 0x0F

        return IPHeader(
            v,
            hl,
            tos,
            len,
            id,
            off,
            ttl,
            p,
            sum,
            socket.inet_ntoa(src.to_bytes(4, byteorder="big")),
            socket.inet_ntoa(dst.to_bytes(4, byteorder="big")),
        )

def parse_ip_datagram(data: bytes) -> Tuple[IPHeader, bytes]:
    ip_header = IPHeader.from_bytes(data[:20])
    payload = data[20:]
    return (ip_header, payload)

エコー要求の送信とエコー応答の受信

これまで実装してきた関数とデータ構造を用いて,エコー要求を送信しエコー応答を受信する関数pingを実装しました. 以下に示されるように,エコー要求をソケットに書き込み,ソケットから読み取ったエコー応答とIPヘッダを出力します.

def print_response(ip_header: IPHeader, echo_reply: ICMPEcho) -> None:
    print(
        f"ping echo reply from {ip_header.src}: icmp_seq={echo_reply.seq} ttl={ip_header.ttl}"
    )


def ping(host: str, seq: int) -> None:
    with raw_socket() as sock:
        packet = ICMPEcho(ICMPType.ECHO, 0, 0, seq, b"\xff").to_bytes()
        sock.sendto(packet, (host, 0))
        ip_header, payload = parse_ip_datagram(sock.recvfrom(4096)[0])
        echo_reply = ICMPEcho.from_bytes(payload)
        print_response(ip_header, echo_reply)

実行結果

実際に8.8.8.8に対してpingを呼び出した結果を以下に示します. エコー要求に対して適切なエコー応答が帰ってきていることが確認できます.

import time
for i in range(10):
    ping("8.8.8.8", i)
    time.sleep(1)
ping echo reply from 8.8.8.8: icmp_seq=0 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=1 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=2 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=3 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=4 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=5 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=6 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=7 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=8 ttl=37
ping echo reply from 8.8.8.8: icmp_seq=9 ttl=37

参考