ROS2から導入されたライフサイクルノードという機能をつかうと、
特定の条件下ですこ~しだけ稼働させたい激重センサーをハンドリングしたり、
メンテナンス時に一定時間ノードを止めるといった処理を一定の仕組みに則って書く事ができます。
実際のところ
参考もとの、公式サンプルをベースにとりあえず最低限の実装を写経
# This file is based on https://github.com/ros2/demos/blob/rolling/lifecycle_py/lifecycle_py/talker.py. # Like the original file, it is distributed under the "Apache License, Version 2.0." from typing import Optional import rclpy from rclpy.lifecycle import Node from rclpy.lifecycle import Publisher from rclpy.lifecycle import State from rclpy.lifecycle import TransitionCallbackReturn from rclpy.timer import Timer import std_msgs.msg class LifecycleTalker(Node): def __init__(self, node_name, **kwargs): """アプリ用のパラメータ設定""" self._count: int = 0 self._pub: Optional[Publisher] = None self._timer: Optional[Timer] = None super().__init__(node_name, **kwargs) def publish(self): msg = std_msgs.msg.String() msg.data = 'Lifecycle HelloWorld #' + str(self._count) self._count += 1 # 状態を確認し、許可されている場合はアプリケーションを実行 if self._pub is None or not self._pub.is_activated: self.get_logger().info('Lifecycle publisher is inactive. Messages are not published.') else: self.get_logger().info(f'Lifecycle publisher is active. Publishing: [{msg.data}]') if self._pub is not None: self._pub.publish(msg) def on_configure(self, state: State) -> TransitionCallbackReturn: # 設定処理をここに追加 self._pub = self.create_lifecycle_publisher(std_msgs.msg.String, 'lifecycle_chatter', 10) self._timer = self.create_timer(1.0, self.publish) self.get_logger().info('on_configure() is called.') return TransitionCallbackReturn.SUCCESS def on_activate(self, state: State) -> TransitionCallbackReturn: # アクティブになる際の処理をここに追加 self.get_logger().info('on_activate() is called.') return super().on_activate(state) def on_deactivate(self, state: State) -> TransitionCallbackReturn: # 非アクティブになる際の処理をここに追加 self.get_logger().info('on_deactivate() is called.') return super().on_deactivate(state) def on_cleanup(self, state: State) -> TransitionCallbackReturn: # クリーンアップ処理をここに追加 self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.get_logger().info('on_cleanup() is called.') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: State) -> TransitionCallbackReturn: # シャットダウン処理をここに追加 self.destroy_timer(self._timer) self.destroy_publisher(self._pub) self.get_logger().info('on_shutdown() is called.') return TransitionCallbackReturn.SUCCESS def main(): rclpy.init() executor = rclpy.executors.SingleThreadedExecutor() lc_node = LifecycleTalker('lc_talker') executor.add_node(lc_node) try: executor.spin() except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException): lc_node.destroy_node() if __name__ == '__main__': main()
ros2 lifecycleコマンドで遊ぶ
ノードの存在の確認
先ほどのスクリプトを実行した状態でros2 lifecycleを実行すると
$ ros2 lifecycle nodes /my_lifecycle_node
この状態では、状態
$ ros2 lifecycle list /my_lifecycle_node - configure [1] Start: unconfigured Goal: configuring - shutdown [5] Start: unconfigured Goal: shuttingdown
状態遷移
*** 一番最初の"configure"
まず、configureで立ち上げを行います。
$ ros2 lifecycle set /lc_talker configure Transitioning successful
[INFO] [1700963000.422042057] [lc_talker]: Lifecycle publisher is inactive. Messages are not published. [INFO] [1700963001.421987112] [lc_talker]: Lifecycle publisher is inactive. Messages are not published. ...
*** activate
$ ros2 lifecycle set /lc_talker activate Transitioning successful
[INFO] [1700963002.568506340] [lc_talker]: on_activate() is called. [INFO] [1700963003.422002575] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #1] [INFO] [1700963004.423245426] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #2] [INFO] [1700963005.421925774] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #3] ...
*** shutdown
$ ros2 lifecycle set /lc_talker shutdown Transitioning successful
[INFO] [1700963018.784170186] [lc_talker]: on_shutdown() is called.