Lifecycle interface

Core lifecycle for modular robotics systems

Configure · Activate · Run · Transition · Shutdown

Core API

class lifecore_ros2.core.lifecycle_component_node.LifecycleComponentNode(*args, **kwargs)

Bases: LifecycleNode

Base node for composing LifecycleComponent instances as managed entities.

Owns:
  • The registry of attached LifecycleComponent instances.

  • Component registration gate: open before first transition, closed after.

  • Thread-safe access to the component registry and registration gate.

  • Propagation of ROS 2 lifecycle transitions to registered components in dependency-resolved order via _resolved_order.

Does not own:
  • Component-level resource allocation or release.

  • Component activation state.

  • Application-level business logic.

Override points:
  • Override on_configure, on_activate, on_deactivate, on_cleanup, or on_shutdown to add node-level behavior. Always call super() to preserve component propagation and the registration gate.

  • Do not override add_component, add_components, or _close_registration.

Note

Transition order is resolved at the first lifecycle transition via _resolved_order, applying Kahn’s topological sort on declared dependencies with priority as the primary tiebreaker and registration order as the stable fallback. Metadata may be declared on the component constructor or at add_component(...). add_component raises RegistrationClosedError once the first lifecycle transition has started.

Parameters:
  • node_name (str)

  • namespace (str | None)

  • kwargs (Any)

property components: tuple[LifecycleComponent, ...]
add_component(component, *, dependencies=None, priority=None)

Register a component as a managed entity.

Ordering metadata (dependencies, priority) may be declared here instead of in the component’s constructor. Declaring the same field in both places raises TypeError.

Parameters:
  • component (LifecycleComponent) – The component to register.

  • dependencies (Sequence[str] | None) – Names of other components that must be transitioned before this one. None leaves the component’s constructor value unchanged.

  • priority (int | None) – Tie-breaking value for ordering when dependencies do not impose a strict order; higher values are resolved earlier. None leaves the component’s constructor value unchanged.

Raises:
  • RegistrationClosedError – If lifecycle transitions have already started.

  • DuplicateComponentError – If a component with the same name is already registered.

  • TypeError – If both the component and the registration site declare a non-default value for the same metadata field.

Return type:

None

add_components(components)

Register a batch of components.

This convenience method accepts bare components only. When different components need registration-site ordering metadata, call add_component(...) for each item so the metadata stays explicit at the assembly site.

Parameters:

components (Iterable[LifecycleComponent])

Return type:

None

get_component(name)
Parameters:

name (str)

Return type:

LifecycleComponent

remove_component(name)

Unregister a component before lifecycle transitions begin.

Marks the component as withdrawn and detaches it from this node. The component remains in rclpy’s managed entity list as a silent no-op: all subsequent transition callbacks return SUCCESS immediately without executing hooks, allocating resources, or modifying component state.

Allowed states:

Only callable before the first lifecycle transition (while registration is still open). Raises RegistrationClosedError after transitions have started, because components may have already allocated resources and must be cleaned up via the normal lifecycle path.

Parameters:

name (str) – The name of the component to remove.

Raises:
  • RegistrationClosedError – If lifecycle transitions have already started.

  • KeyError – If no component with the given name is registered.

Return type:

None

on_configure(state)

Close the registration gate and propagate configure to all components.

Override in application nodes to add node-level configure behavior. Always call super().on_configure(state) first.

Raises:

ConcurrentTransitionError – If called concurrently with an in-progress transition.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

on_activate(state)

Propagate activate to all components.

Override in application nodes to add node-level activate behavior. Always call super().on_activate(state).

Raises:

ConcurrentTransitionError – If called concurrently with an in-progress transition.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

on_deactivate(state)

Propagate deactivate to all components.

Override in application nodes to add node-level deactivate behavior. Always call super().on_deactivate(state).

Raises:

ConcurrentTransitionError – If called concurrently with an in-progress transition.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

on_cleanup(state)

Propagate cleanup to all components.

Override in application nodes to add node-level cleanup behavior. Always call super().on_cleanup(state).

Raises:

ConcurrentTransitionError – If called concurrently with an in-progress transition.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

on_shutdown(state)

Close the registration gate and propagate shutdown to all components.

Override in application nodes to add node-level shutdown behavior. Always call super().on_shutdown(state).

Raises:

ConcurrentTransitionError – If called concurrently with an in-progress transition.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

on_error(state)

Propagate error to all components in reverse order.

Not guarded by _begin_transition — error recovery must remain reachable during active transitions.

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

trigger_configure()
Return type:

rclpy.lifecycle.TransitionCallbackReturn

trigger_activate()
Return type:

rclpy.lifecycle.TransitionCallbackReturn

trigger_deactivate()
Return type:

rclpy.lifecycle.TransitionCallbackReturn

trigger_cleanup()
Return type:

rclpy.lifecycle.TransitionCallbackReturn

trigger_shutdown()
Return type:

rclpy.lifecycle.TransitionCallbackReturn

lifecore_ros2.core.lifecycle_component.when_active(wrapped: F, /) F
lifecore_ros2.core.lifecycle_component.when_active(*, when_not_active: Callable[[], ~typing.Any] | None=<object object>) Callable[[F], F]

Gate a method so it only executes when the component is active.

Default behavior raises RuntimeError via the same shared primitive as LifecycleComponent.require_active(). Pass a callable to when_not_active to customise, or None for a silent no-op.

Usage:

@when_active
def publish(self, msg): ...

@when_active(when_not_active=None)
def _on_message_wrapper(self, msg): ...

@when_active(when_not_active=lambda: logger.warning("dropped"))
def do_something(self): ...
Parameters:
  • wrapped (F | None)

  • when_not_active (Callable[[], Any] | object)

Return type:

F | Callable[[F], F]

class lifecore_ros2.core.lifecycle_component.LifecycleComponent(*args, **kwargs)

Bases: ManagedEntity, ABC

Base managed entity for composable lifecycle behavior inside a LifecycleComponentNode.

Each subclass encapsulates a single concern (e.g. a publisher, subscriber, or timer) and integrates natively into the ROS 2 lifecycle via the ManagedEntity protocol.

Framework guarantees:
  • _is_active is set to True after a successful _on_activate hook and cleared to False after a successful _on_deactivate hook. For _on_cleanup, _on_shutdown, and _on_error, _is_active is cleared unconditionally before the hook runs. Subclasses never manage this flag.

  • _release_resources() is called automatically after _on_cleanup,

    _on_shutdown, and _on_error. Subclasses do not need to call it.

    • _needs_cleanup is cleared after each cleanup, shutdown, or error release

      attempt, even if resource release reports ERROR.

  • Exceptions in hooks are caught and converted to TransitionCallbackReturn.ERROR.

  • Invalid return values from hooks are caught and converted to ERROR.
    • Ordering metadata may be declared either here or at

      LifecycleComponentNode.add_component(...). Prefer the registration site when you want composition intent visible where the node assembles components.

Subclass extension points (override these):
  • _on_configure: allocate ROS resources. Default returns SUCCESS.

  • _on_activate: enable runtime behavior. Default returns SUCCESS.

  • _on_deactivate: disable runtime behavior. Default returns SUCCESS.

  • _on_cleanup: custom cleanup before automatic resource release. Default returns SUCCESS.

  • _on_shutdown: custom shutdown logic. Default returns SUCCESS.

  • _on_error: custom error handling. Default returns SUCCESS.

  • _release_resources: destroy ROS objects (publishers, subscriptions, timers). Must be idempotent. Call super()._release_resources() last when overriding.

Do not override:
  • on_configure, on_activate, on_deactivate, on_cleanup, on_shutdown, on_error — framework-controlled entry points that manage lifecycle invariants and delegate to the _on_* hooks.

Parameters:
  • name (str)

  • callback_group (CallbackGroup | None)

  • dependencies (Sequence[str])

  • priority (int)

property name: str
property is_active: bool

Whether the component is in the active state.

require_active()

Raise RuntimeError if this component is not active.

Façade over the shared activation_gating.require_active primitive. Use this in component extension points or try/except handlers that need component-specific inactive behavior.

Raises:

RuntimeError – If the component is not active.

Return type:

None

property callback_group: CallbackGroup | None

The callback group borrowed from the application, or None for the node default.

property node: LifecycleComponentNode
get_logger()
Return type:

_LoggerLike

get_parent_name()
Return type:

str

get_parent_namespace()
Return type:

str

final on_configure(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

final on_activate(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

final on_deactivate(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

final on_cleanup(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

final on_shutdown(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

final on_error(state)
Parameters:

state (rclpy.lifecycle.node.LifecycleState)

Return type:

rclpy.lifecycle.TransitionCallbackReturn

_on_configure(state)

Extension point. Override to allocate ROS resources.

Called during the configure transition. Do not enable runtime behavior here.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_on_activate(state)

Extension point. Override to enable runtime behavior.

The framework sets _is_active = True after this hook returns SUCCESS. Do not set _is_active manually.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_on_deactivate(state)

Extension point. Override to disable runtime behavior.

The framework clears _is_active only after this hook returns SUCCESS. All @when_active-gated methods stop executing after SUCCESS is returned.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_on_cleanup(state)

Extension point. Override for custom cleanup before automatic resource release.

The framework calls _release_resources() automatically after this hook.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_on_shutdown(state)

Extension point. Override for custom shutdown logic before automatic resource release.

The framework calls _release_resources() automatically after this hook.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_on_error(state)

Extension point. Override for custom error handling before automatic resource release.

The framework calls _release_resources() automatically after this hook.

Returns:

SUCCESS, FAILURE, or ERROR.

Return type:

TransitionCallbackReturn

Parameters:

state (rclpy.lifecycle.node.LifecycleState)

_release_resources()

Extension point. Override to release ROS resources owned by this component.

Called automatically by the framework during cleanup, shutdown, and error transitions. Destroy publishers, subscriptions, timers, etc. here. Do not mutate borrowed handles such as callback_group here.

Implementations must be idempotent. Call super()._release_resources() last when overriding in a subclass chain.

Return type:

None

Framework-raised exception types for lifecore_ros2.

All framework boundary violations raise a subclass of LifecoreError, which is itself a plain Exception. Every concrete subclass also inherits from the corresponding standard Python exception so that existing except RuntimeError and except ValueError handlers remain unbroken.

Hierarchy:

LifecoreError (Exception)
├── RegistrationClosedError   (RuntimeError)
├── DuplicateComponentError   (ValueError)
├── ComponentNotAttachedError (RuntimeError)
├── ComponentNotConfiguredError (RuntimeError)
├── InvalidLifecycleTransitionError (RuntimeError)
├── ConcurrentTransitionError (RuntimeError)
├── ComponentDependencyError  (ValueError)
│   ├── UnknownDependencyError
│   └── CyclicDependencyError
├── LifecycleHookError        (RuntimeError)
└── _InterfaceTypeNotResolvedError (TypeError)  [internal — not re-exported]
exception lifecore_ros2.core.exceptions.LifecoreError

Bases: Exception

Base class for all boundary-violation exceptions raised by lifecore_ros2.

Catch LifecoreError to handle any framework misuse error in one place. Concrete subclasses also inherit from standard Python exceptions for backward-compatibility with except RuntimeError / except ValueError.

exception lifecore_ros2.core.exceptions.RegistrationClosedError

Bases: LifecoreError, RuntimeError

Raised when add_component is called after lifecycle transitions have started.

exception lifecore_ros2.core.exceptions.DuplicateComponentError

Bases: LifecoreError, ValueError

Raised when a component with the same name is already registered on the node.

exception lifecore_ros2.core.exceptions.ComponentNotAttachedError

Bases: LifecoreError, RuntimeError

Raised when .node is accessed on a component that is not attached to a node.

exception lifecore_ros2.core.exceptions.ComponentNotConfiguredError

Bases: LifecoreError, RuntimeError

Raised when publish() is called on a publisher that has not been configured yet.

exception lifecore_ros2.core.exceptions.InvalidLifecycleTransitionError

Bases: LifecoreError, RuntimeError

Raised when a LifecycleComponent hook is called in an invalid lifecycle order.

exception lifecore_ros2.core.exceptions.ConcurrentTransitionError

Bases: LifecoreError, RuntimeError

Raised when a lifecycle transition is attempted concurrently with an in-progress transition.

exception lifecore_ros2.core.exceptions.ComponentDependencyError

Bases: LifecoreError, ValueError

Base class for dependency-resolution errors during component ordering.

exception lifecore_ros2.core.exceptions.UnknownDependencyError

Bases: ComponentDependencyError

Raised when a component declares a dependency on a name that is not registered.

exception lifecore_ros2.core.exceptions.CyclicDependencyError

Bases: ComponentDependencyError

Raised when the declared component dependencies contain a cycle.

exception lifecore_ros2.core.exceptions.LifecycleHookError

Bases: LifecoreError, RuntimeError

Raised internally when a lifecycle hook raises an unexpected exception.

Wraps the original exception as __cause__ for diagnosis. Never propagated to the caller of trigger_*; used only for logging and internal aggregation.

Activation-gating primitive

Shared activation-gating primitive for lifecore_ros2 components.

This module contains the single authoritative check for component activation state. All framework gating paths delegate here — @when_active, LifecycleComponent.require_active(), and component-specific inactive handlers — so the raise contract is defined in exactly one place.

lifecore_ros2.core.activation_gating.require_active(is_active, *, component_name)

Raise RuntimeError if the component is not active.

This is the shared primitive used by all activation-gating paths in the framework. Most component code should prefer LifecycleComponent.require_active(); the standalone function exists so @when_active and custom inactive handlers share the same raise contract.

Parameters:
  • is_active (bool) – Current activation state of the component.

  • component_name (str) – Name used in the error message for diagnosis.

Raises:

RuntimeError – If is_active is False.

Return type:

None