Migration from Raw rclpy¶
This page shows a single before/after comparison: a subscriber node written with raw rclpy
rewritten using lifecore_ros2. The goal is to make the composition benefit concrete.
The before example represents common rclpy usage. The after example uses
LifecycleComponentNode and LifecycleSubscriberComponent.
Before: raw rclpy lifecycle node¶
A typical raw rclpy lifecycle subscriber node looks like this:
from rclpy.lifecycle import LifecycleNode, LifecycleState, TransitionCallbackReturn
from std_msgs.msg import String
class ChatterNode(LifecycleNode):
def __init__(self) -> None:
super().__init__("chatter_node")
self._sub = None
self._active = False
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
self._sub = self.create_subscription(String, "/chatter", self._on_msg, 10)
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self._active = True
return super().on_activate(state)
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self._active = False
return super().on_deactivate(state)
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.destroy_subscription(self._sub)
self._sub = None
return TransitionCallbackReturn.SUCCESS
def _on_msg(self, msg: String) -> None:
if not self._active:
return
self.get_logger().info(f"Received: {msg.data}")
Problems with this pattern at scale:
_active,_sub, and_on_msgare all mixed into the node class. Adding a second subscriber doubles the noise._activeis a hand-rolled flag that shadows the lifecycle state but is not connected to it. A missedsuper()call or an exception in a hook desynchronises it.lifecore_ros2removes both failure modes: activation gating is owned by thelibrary, and uncaught hook exceptions are wrapped in
LifecycleHookError, mapped toTransitionCallbackReturn.ERROR, and routed through rclpy’s nativeErrorProcessing(see Error handling contract).Cleanup logic (
destroy_subscription) is duplicated in every node that needs it.There is no reusable unit — the subscriber behavior is coupled to this specific node.
After: lifecore_ros2¶
The same behavior expressed with LifecycleSubscriberComponent:
from lifecore_ros2 import LifecycleComponentNode, LifecycleSubscriberComponent
from std_msgs.msg import String
class ChatterSubscriber(LifecycleSubscriberComponent[String]):
def __init__(self) -> None:
super().__init__(
name="chatter_sub",
topic_name="/chatter",
qos_profile=10,
)
def on_message(self, msg: String) -> None:
self.node.get_logger().info(f"Received: {msg.data}")
class ChatterNode(LifecycleComponentNode):
def __init__(self) -> None:
super().__init__("chatter_node")
self.add_component(ChatterSubscriber())
What changed:
_activeis gone. The library tracks activation state via_is_activeand drops inbound messages automatically when the component is not active._subcreation and destruction are handled byTopicComponent._on_configureand_release_resources. No explicitdestroy_subscriptioncall needed.ChatterSubscriberis reusable. Any node can attach it viaadd_component.The node class contains only composition logic: which components it owns.
What stays the same:
lifecycle transitions are still
configure → activate → deactivate → cleanupros2 lifecycle setcommands work identicallyrclpy controls the lifecycle state machine — lifecore_ros2 only propagates transitions to components
Running the equivalent example¶
The examples/minimal_subscriber.py file in the repository demonstrates this pattern.
Run it with:
source /opt/ros/jazzy/setup.bash
uv run --extra dev python examples/minimal_subscriber.py
See Examples for the full walkthrough.