Source code for roslibpy.comm
"""Transport selection for the ROS bridge connection.
Three transports are available:
* ``twisted``: default. Built on autobahn + twisted. The historical
implementation.
* ``asyncio``: opt-in (2.1+). Built on Autobahn's asyncio integration (the
same Autobahn WebSocket stack as ``twisted``, running on an asyncio event
loop). Cleaner per-test loop isolation; no extra dependencies.
* ``cli``: IronPython only (auto-selected on ``sys.platform == "cli"``).
Selection precedence (highest → lowest):
1. ``transport=`` kwarg on ``Ros`` (per-instance).
2. ``ROSLIBPY_TRANSPORT`` environment variable.
3. Module-level default set via ``roslibpy.set_default_transport(...)``.
4. Platform default: ``cli`` on IronPython, ``twisted`` elsewhere.
"""
import os
import sys
from .comm import RosBridgeException, RosBridgeProtocol
__all__ = [
"RosBridgeException",
"RosBridgeProtocol",
"RosBridgeClientFactory",
"select_factory",
"set_default_transport",
"TRANSPORT_TWISTED",
"TRANSPORT_ASYNCIO",
"TRANSPORT_CLI",
]
TRANSPORT_TWISTED = "twisted"
TRANSPORT_ASYNCIO = "asyncio"
TRANSPORT_CLI = "cli"
_VALID_TRANSPORTS = (TRANSPORT_TWISTED, TRANSPORT_ASYNCIO, TRANSPORT_CLI)
_PLATFORM_DEFAULT = TRANSPORT_CLI if sys.platform == "cli" else TRANSPORT_TWISTED
_DEFAULT_TRANSPORT = _PLATFORM_DEFAULT
[docs]
def set_default_transport(name):
"""Set the process-wide default transport.
Args:
name (str): One of ``"twisted"``, ``"asyncio"``, ``"cli"``.
Raises:
ValueError: If ``name`` is not a known transport.
"""
global _DEFAULT_TRANSPORT
if name not in _VALID_TRANSPORTS:
raise ValueError("Unknown transport %r; expected one of %s" % (name, _VALID_TRANSPORTS))
_DEFAULT_TRANSPORT = name
def _resolve_transport(explicit=None):
"""Apply the precedence rules to land on a single transport name."""
if explicit is not None:
if explicit not in _VALID_TRANSPORTS:
raise ValueError("Unknown transport %r; expected one of %s" % (explicit, _VALID_TRANSPORTS))
return explicit
env_choice = os.environ.get("ROSLIBPY_TRANSPORT")
if env_choice:
if env_choice not in _VALID_TRANSPORTS:
raise ValueError("Unknown ROSLIBPY_TRANSPORT=%r; expected one of %s" % (env_choice, _VALID_TRANSPORTS))
return env_choice
return _DEFAULT_TRANSPORT
def _transport_conflict_error(requested, cause):
"""Build a clear error for the txaio single-framework-per-process limit.
The ``twisted`` and ``asyncio`` transports both build on Autobahn, whose
``txaio`` layer binds a single async framework (Twisted *or* asyncio) per
Python process. Selecting one after the other has already been activated
surfaces as a ``RuntimeError`` from deep inside ``txaio``; we translate it
into actionable guidance.
"""
other = TRANSPORT_ASYNCIO if requested == TRANSPORT_TWISTED else TRANSPORT_TWISTED
return RuntimeError(
"Cannot activate the %r transport: the %r transport is already in use "
"in this process. Both build on Autobahn, whose txaio layer binds a "
"single async framework per process, so they are mutually exclusive. "
"Select one transport per process (e.g. via the ROSLIBPY_TRANSPORT "
"environment variable or roslibpy.set_default_transport()), and use "
"separate processes if you need both. Original error: %s" % (requested, other, cause)
)
def select_factory(transport=None):
"""Return the factory class for the requested (or resolved) transport.
The transport modules are imported lazily so a process that never uses the
asyncio transport never imports ``autobahn.asyncio``, and a process that
never uses the twisted transport never imports ``twisted``. This laziness
also matters because the two Autobahn-based transports cannot coexist in a
single process (see :func:`_transport_conflict_error`).
Args:
transport (str, optional): One of ``"twisted"``, ``"asyncio"``,
``"cli"``. If ``None``, applies the precedence rules described in
the module docstring.
Returns:
The factory class to use for new ``Ros`` instances.
Raises:
RuntimeError: If the requested Autobahn-based transport conflicts with
one already activated in this process.
"""
name = _resolve_transport(transport)
if name == TRANSPORT_CLI:
from .comm_cli import CliRosBridgeClientFactory
return CliRosBridgeClientFactory
if name == TRANSPORT_ASYNCIO:
try:
from .comm_asyncio import AsyncioRosBridgeClientFactory
except RuntimeError as cause: # txaio already bound to twisted
raise _transport_conflict_error(TRANSPORT_ASYNCIO, cause)
return AsyncioRosBridgeClientFactory
# Fallback to default
try:
from .comm_autobahn import AutobahnRosBridgeClientFactory
except RuntimeError as cause: # txaio already bound to asyncio
raise _transport_conflict_error(TRANSPORT_TWISTED, cause)
return AutobahnRosBridgeClientFactory
def __getattr__(name):
# ``RosBridgeClientFactory`` remains a module-level binding for
# back-compatibility, but is resolved lazily via PEP 562 so that merely
# importing roslibpy does not import a transport — and, through Autobahn's
# txaio, irreversibly lock the process to a single async framework before
# the user has had a chance to choose one.
if name == "RosBridgeClientFactory":
return select_factory()
raise AttributeError("module %r has no attribute %r" % (__name__, name))