concurrent.futuresは、Pythonで並列処理を簡単に実装するための標準実装のライブラリです。
With構文とあわせて使うことで、タスクの投入から完了管理までを自動的に処理できます。
あとに残らないのでJupyter notebookのセル内で自己完結できるわけです。
二種類実装があるのですが、今回はCAN通信を多用するためI/O処理に強いらしいThreadPoolExecutorの方を使用してみようと思います。
実際のところ
CANポートを各一個専有する二本のロボットアームを非同期に実行するサンプル
from concurrent.futures import ThreadPoolExecutor import threading def execute_arm_movement(arm_0, arm_1): # 両アームの動作パターンを定義 movement_patterns = [ # 略 ] def move_arm(arm, params): arm.move_all_joints(**params, total_time=0.5) for i in range(1, 3): # 2回繰り返し print(f"サイクル {i} 開始") for pattern in movement_patterns: # 各パターンで両アームを同時に動作 with ThreadPoolExecutor(max_workers=2) as executor: future_0 = executor.submit(move_arm, arm_0, pattern['arm_0']) future_1 = executor.submit(move_arm, arm_1, pattern['arm_1']) # 両方の動作が完了するのを待つ futures = [future_0, future_1] for future in futures: future.result() def main(): # アーム制御の実行 execute_arm_movement(arm_0, arm_1) # メイン処理の実行 if __name__ == "__main__": main()