Bye Bye Moore

PoCソルジャーな零細事業主が作業メモを残すブログ

非同期実行につかえるconcurrent.futures モジュールはJupyter notebookのセルの中でもつかえる

concurrent.futuresは、Pythonで並列処理を簡単に実装するための標準実装のライブラリです。
With構文とあわせて使うことで、タスクの投入から完了管理までを自動的に処理できます。
あとに残らないのでJupyter notebookのセル内で自己完結できるわけです。

二種類実装があるのですが、今回はCAN通信を多用するためI/O処理に強いらしいThreadPoolExecutorの方を使用してみようと思います。

実際のところ

CANポートを各一個専有する二本のロボットアームを非同期に実行するサンプル

from concurrent.futures import ThreadPoolExecutor
import threading

def execute_arm_movement(arm_0, arm_1):
    # 両アームの動作パターンを定義
    movement_patterns = [
        # 略
    ]

    def move_arm(arm, params):
        arm.move_all_joints(**params, total_time=0.5)

    for i in range(1, 3):  # 2回繰り返し
        print(f"サイクル {i} 開始")
        for pattern in movement_patterns:
            # 各パターンで両アームを同時に動作
            with ThreadPoolExecutor(max_workers=2) as executor:
                future_0 = executor.submit(move_arm, arm_0, pattern['arm_0'])
                future_1 = executor.submit(move_arm, arm_1, pattern['arm_1'])
                # 両方の動作が完了するのを待つ
                futures = [future_0, future_1]
                for future in futures:
                    future.result()
                
def main():
    # アーム制御の実行
    execute_arm_movement(arm_0, arm_1)

# メイン処理の実行
if __name__ == "__main__":
    main()