Bye Bye Moore

PoCソルジャーな零細事業主が作業メモを残すブログ

Python3でUNIXソケットを経由して受信する

UNIXソケットはシステム内のプロセス間で完結する通信であれば、早くて安全です。
要はローカルファイルの読み書きなので

実際のところ

UNIXソケット"/tmp/my_socket"を受信し、
パケットのバリデーション(今回は12ケタで英小文字と数値)を実施するスクリプトは以下の通り

import os
import re
import socketserver

class MyUDPHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = self.request[0].strip()
        data_decoded = data.decode("utf-8")
        socket = self.request[1]
        if self.validate(data_decoded):
            print("{} wrote:".format(self.client_address[0]))
            print(data_decoded)
        else:
            print("Invalid data received.")

    def validate(self, data):
        # Check if length is 12
        if len(data) != 12:
            return False

        # Check if data contains only lowercase letters and numbers
        if not re.match('^[a-z0-9]*$', data):
            return False

        return True

if __name__ == "__main__":
    server_address = '/tmp/my_socket'
    # 既に存在する場合は削除
    if os.path.exists(server_address):
        os.remove(server_address)

    with socketserver.UnixDatagramServer(server_address, MyUDPHandler) as server:
        print("Serving on {}".format(server.server_address))
        server.serve_forever()

送信動作確認コマンド

送信は以前やったnetcatを利用した方法を使います

echo -n "abcdef123456" | nc -Uu /tmp/my_socket