Bye Bye Moore

PoCソルジャーな零細事業主が作業メモを残すブログ

ROS2のlifecycleサブコマンドで遊ぶ その1:とりあえず状態遷移を試す

ROS2から導入されたライフサイクルノードという機能をつかうと、
特定の条件下ですこ~しだけ稼働させたい激重センサーをハンドリングしたり、
メンテナンス時に一定時間ノードを止めるといった処理を一定の仕組みに則って書く事ができます。

実際のところ

参考もとの、公式サンプルをベースにとりあえず最低限の実装を写経

# This file is based on https://github.com/ros2/demos/blob/rolling/lifecycle_py/lifecycle_py/talker.py. 
# Like the original file, it is distributed under the "Apache License, Version 2.0."
from typing import Optional

import rclpy

from rclpy.lifecycle import Node
from rclpy.lifecycle import Publisher
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from rclpy.timer import Timer

import std_msgs.msg


class LifecycleTalker(Node):
    def __init__(self, node_name, **kwargs):
        """アプリ用のパラメータ設定"""
        self._count: int = 0
        self._pub: Optional[Publisher] = None
        self._timer: Optional[Timer] = None
        super().__init__(node_name, **kwargs)

    def publish(self):
        msg = std_msgs.msg.String()
        msg.data = 'Lifecycle HelloWorld #' + str(self._count)
        self._count += 1

        # 状態を確認し、許可されている場合はアプリケーションを実行
        if self._pub is None or not self._pub.is_activated:
            self.get_logger().info('Lifecycle publisher is inactive. Messages are not published.')
        else:
            self.get_logger().info(f'Lifecycle publisher is active. Publishing: [{msg.data}]')

        if self._pub is not None:
            self._pub.publish(msg)

    def on_configure(self, state: State) -> TransitionCallbackReturn:
        # 設定処理をここに追加
        self._pub = self.create_lifecycle_publisher(std_msgs.msg.String, 'lifecycle_chatter', 10)
        self._timer = self.create_timer(1.0, self.publish)

        self.get_logger().info('on_configure() is called.')
        return TransitionCallbackReturn.SUCCESS

    def on_activate(self, state: State) -> TransitionCallbackReturn:
        # アクティブになる際の処理をここに追加
        self.get_logger().info('on_activate() is called.')
        return super().on_activate(state)

    def on_deactivate(self, state: State) -> TransitionCallbackReturn:
        # 非アクティブになる際の処理をここに追加
        self.get_logger().info('on_deactivate() is called.')
        return super().on_deactivate(state)

    def on_cleanup(self, state: State) -> TransitionCallbackReturn:
        # クリーンアップ処理をここに追加
        self.destroy_timer(self._timer)
        self.destroy_publisher(self._pub)

        self.get_logger().info('on_cleanup() is called.')
        return TransitionCallbackReturn.SUCCESS

    def on_shutdown(self, state: State) -> TransitionCallbackReturn:
        # シャットダウン処理をここに追加
        self.destroy_timer(self._timer)
        self.destroy_publisher(self._pub)

        self.get_logger().info('on_shutdown() is called.')
        return TransitionCallbackReturn.SUCCESS

def main():
    rclpy.init()

    executor = rclpy.executors.SingleThreadedExecutor()
    lc_node = LifecycleTalker('lc_talker')
    executor.add_node(lc_node)
    try:
        executor.spin()
    except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
        lc_node.destroy_node()


if __name__ == '__main__':
    main()

ros2 lifecycleコマンドで遊ぶ

ノードの存在の確認

先ほどのスクリプトを実行した状態でros2 lifecycleを実行すると

$ ros2 lifecycle nodes
/my_lifecycle_node

この状態では、状態

$ ros2 lifecycle list /my_lifecycle_node
- configure [1]
	Start: unconfigured
	Goal: configuring
- shutdown [5]
	Start: unconfigured
	Goal: shuttingdown
状態遷移

*** 一番最初の"configure"

まず、configureで立ち上げを行います。

$ ros2 lifecycle set /lc_talker configure
Transitioning successful
[INFO] [1700963000.422042057] [lc_talker]: Lifecycle publisher is inactive. Messages are not published.
[INFO] [1700963001.421987112] [lc_talker]: Lifecycle publisher is inactive. Messages are not published.
...

*** activate

$ ros2 lifecycle set /lc_talker activate
Transitioning successful
[INFO] [1700963002.568506340] [lc_talker]: on_activate() is called.
[INFO] [1700963003.422002575] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #1]
[INFO] [1700963004.423245426] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #2]
[INFO] [1700963005.421925774] [lc_talker]: Lifecycle publisher is active. Publishing: [Lifecycle HelloWorld #3]
...

*** shutdown

$ ros2 lifecycle set /lc_talker shutdown
Transitioning successful
[INFO] [1700963018.784170186] [lc_talker]: on_shutdown() is called.