Source code for roslibpy.ros

from __future__ import print_function

import logging
import threading

from . import Message, Param, Service, ServiceRequest, Time
from .comm import RosBridgeClientFactory
from .core import RosTimeoutError

__all__ = ["Ros", "set_rosapi_timeout"]

LOGGER = logging.getLogger("roslibpy")
CONNECTION_TIMEOUT = 10
ROSAPI_TIMEOUT = 3


[docs]def set_rosapi_timeout(timeout): """Set the default number of seconds to wait for a response before raising an exception. Args: timeout (:obj:`int`): Duration in seconds. """ global ROSAPI_TIMEOUT ROSAPI_TIMEOUT = timeout
[docs]class Ros(object): """Connection manager to ROS server. Args: host (:obj:`str`): Name or IP address of the ROS bridge host, e.g. ``127.0.0.1``. port (:obj:`int`): ROS bridge port, e.g. ``9090``. is_secure (:obj:`bool`): ``True`` to use a secure web sockets connection, otherwise ``False``. """ def __init__(self, host, port=None, is_secure=False): self._id_counter = 0 url = RosBridgeClientFactory.create_url(host, port, is_secure) self.factory = RosBridgeClientFactory(url) self.is_connecting = False self.connect() @property def id_counter(self): """Generate an auto-incremental ID starting from 1. Returns: int: An auto-incremented ID. """ self._id_counter += 1 return self._id_counter @property def is_connected(self): """Indicate if the ROS connection is open or not. Returns: bool: True if connected to ROS, False otherwise. """ return self.factory.is_connected
[docs] def connect(self): """Connect to ROS.""" # Don't try to reconnect if already connected. if self.is_connected or self.is_connecting: return self.is_connecting = True def _unset_connecting_flag(*args): self.is_connecting = False self.factory.on_ready(_unset_connecting_flag) self.factory.connect()
[docs] def close(self, timeout=CONNECTION_TIMEOUT): """Disconnect from ROS.""" if self.is_connected: wait_disconnect = threading.Event() def _wrapper_callback(proto): self.emit("closing") proto.send_close() wait_disconnect.set() return proto self.factory.on_ready(_wrapper_callback) if not wait_disconnect.wait(timeout): raise RosTimeoutError("Failed to disconnect to ROS")
[docs] def run(self, timeout=CONNECTION_TIMEOUT): """Kick-starts a non-blocking event loop. Args: timeout: Timeout to wait until connection is ready. """ self.factory.manager.run() wait_connect = threading.Event() self.factory.on_ready(lambda _: wait_connect.set()) if not wait_connect.wait(timeout): raise RosTimeoutError("Failed to connect to ROS")
[docs] def run_forever(self): """Kick-starts a blocking loop to wait for events. Depending on the implementations, and the client applications, running this might be required or not. """ self.factory.manager.run_forever()
def run_event_loop(self): LOGGER.warn("Deprecation warning: use run_forever instead of run_event_loop ") self.run_forever()
[docs] def call_in_thread(self, callback): """Call the given function in a thread. The threading implementation is deferred to the factory. Args: callback (:obj:`callable`): Callable function to be invoked. """ self.factory.manager.call_in_thread(callback)
[docs] def call_later(self, delay, callback): """Call the given function after a certain period of time has passed. Args: delay (:obj:`int`): Number of seconds to wait before invoking the callback. callback (:obj:`callable`): Callable function to be invoked when ROS connection is ready. """ self.factory.manager.call_later(delay, callback)
[docs] def terminate(self): """Signals the termination of the main event loop.""" if self.is_connected: self.close() self.factory.manager.terminate()
[docs] def on(self, event_name, callback): """Add a callback to an arbitrary named event. Args: event_name (:obj:`str`): Name of the event to which to subscribe. callback: Callable function to be executed when the event is triggered. """ self.factory.on(event_name, callback)
[docs] def off(self, event_name, callback=None): """Remove a callback from an arbitrary named event. Args: event_name (:obj:`str`): Name of the event from which to unsubscribe. callback: Callable function. If ``None``, all callbacks of the event will be removed. """ if callback: self.factory.off(event_name, callback) else: self.factory.remove_all_listeners(event_name)
[docs] def emit(self, event_name, *args): """Trigger a named event.""" self.factory.emit(event_name, *args)
[docs] def on_ready(self, callback, run_in_thread=True): """Add a callback to be executed when the connection is established. If a connection to ROS is already available, the callback is executed immediately. Args: callback: Callable function to be invoked when ROS connection is ready. run_in_thread (:obj:`bool`): True to run the callback in a separate thread, False otherwise. """ def _wrapper_callback(proto): if run_in_thread: self.factory.manager.call_in_thread(callback) else: callback() return proto self.factory.on_ready(_wrapper_callback)
[docs] def send_on_ready(self, message): """Send message to ROS once the connection is established. If a connection to ROS is already available, the message is sent immediately. Args: message (:class:`.Message`): ROS Bridge Message to send. """ def _send_internal(proto): proto.send_ros_message(message) return proto self.factory.on_ready(_send_internal)
[docs] def blocking_call_from_thread(self, callback, timeout): """Call the given function from a thread, and wait for the result synchronously for as long as the timeout will allow. Args: callback: Callable function to be invoked from the thread. timeout (:obj:`int`): Number of seconds to wait for the response before raising an exception. Returns: The results from the callback, or a timeout exception. """ return self.factory.manager.blocking_call_from_thread(callback, timeout)
[docs] def get_service_request_callback(self, message): """Get the callback which, when called, sends the service request. Args: message (:class:`.Message`): ROS Bridge Message containing the request. Returns: A callable which makes the service request. """ def get_call_results(result_placeholder): inner_callback = self.factory.manager.get_inner_callback(result_placeholder) inner_errback = self.factory.manager.get_inner_errback(result_placeholder) self.call_async_service(message, inner_callback, inner_errback) return result_placeholder return get_call_results
[docs] def call_sync_service(self, message, timeout): """Send a blocking service request to ROS once the connection is established, waiting for the result to be return. If a connection to ROS is already available, the request is sent immediately. Args: message (:class:`.Message`): ROS Bridge Message containing the request. timeout (:obj:`int`): Number of seconds to wait for the response before raising an exception. Returns: Either returns the service request results or raises a timeout exception. """ get_call_results = self.get_service_request_callback(message) return self.blocking_call_from_thread(get_call_results, timeout)
[docs] def call_async_service(self, message, callback, errback): """Send a service request to ROS once the connection is established. If a connection to ROS is already available, the request is sent immediately. Args: message (:class:`.Message`): ROS Bridge Message containing the request. callback: Callback invoked on successful execution. errback: Callback invoked on error. """ def _send_internal(proto): proto.send_ros_service_request(message, callback, errback) return proto self.factory.on_ready(_send_internal)
def set_status_level(self, level, identifier): level_message = Message({"op": "set_level", "level": level, "id": identifier}) self.send_on_ready(level_message)
[docs] def get_time(self, callback=None, errback=None): """Retrieve the current ROS time. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: :class:`.Time`: An instance of ROS Time. """ service = Service(self, "/rosapi/get_time", "rosapi/GetTime") result = service.call(ServiceRequest(), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "time" in result return Time(result["time"]["secs"], result["time"]["nsecs"])
[docs] def get_topics(self, callback=None, errback=None): """Retrieve list of topics in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: list: List of topics if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/topics", "rosapi/Topics") result = service.call(ServiceRequest(), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "topics" in result return result["topics"]
[docs] def get_topic_type(self, topic, callback=None, errback=None): """Retrieve the type of a topic in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: str: Topic type if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/topic_type", "rosapi/TopicType") result = service.call(ServiceRequest({"topic": topic}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "type" in result return result["type"]
[docs] def get_topics_for_type(self, topic_type, callback=None, errback=None): """Retrieve list of topics in ROS matching the specified type. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: list: List of topics matching the specified type if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/topics_for_type", "rosapi/TopicsForType") result = service.call(ServiceRequest({"type": topic_type}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "topics" in result return result["topics"]
[docs] def get_services(self, callback=None, errback=None): """Retrieve list of active service names in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: list: List of services if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/services", "rosapi/Services") result = service.call(ServiceRequest(), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "services" in result return result["services"]
[docs] def get_service_type(self, service_name, callback=None, errback=None): """Retrieve the type of a service in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: str: Service type if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/service_type", "rosapi/ServiceType") result = service.call(ServiceRequest({"service": service_name}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "type" in result return result["type"]
[docs] def get_services_for_type(self, service_type, callback=None, errback=None): """Retrieve list of services in ROS matching the specified type. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: list: List of services matching the specified type if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/services_for_type", "rosapi/ServicesForType") result = service.call(ServiceRequest({"type": service_type}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "services" in result return result["services"]
[docs] def get_service_request_details(self, type, callback=None, errback=None): """Retrieve details of a ROS Service Request. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: Service Request details if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/service_request_details", "rosapi/ServiceRequestDetails") result = service.call(ServiceRequest({"type": type}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return return result
[docs] def get_service_response_details(self, type, callback=None, errback=None): """Retrieve details of a ROS Service Response. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: Service Response details if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/service_response_details", "rosapi/ServiceResponseDetails") result = service.call(ServiceRequest({"type": type}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return return result
[docs] def get_message_details(self, message_type, callback=None, errback=None): """Retrieve details of a message type in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: Message type details if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/message_details", "rosapi/MessageDetails") result = service.call(ServiceRequest({"type": message_type}), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return return result
[docs] def get_params(self, callback=None, errback=None): """Retrieve list of param names from the ROS Parameter Server. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: list: List of parameters if blocking, otherwise ``None``. """ service = Service(self, "/rosapi/get_param_names", "rosapi/GetParamNames") result = service.call(ServiceRequest(), callback, errback, timeout=ROSAPI_TIMEOUT) if callback: return assert "names" in result return result["names"]
[docs] def get_param(self, name, callback=None, errback=None): """Get the value of a parameter from the ROS Parameter Server. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . Returns: Parameter value if blocking, otherwise ``None``. """ param = Param(self, name) return param.get(callback, errback, timeout=ROSAPI_TIMEOUT)
[docs] def set_param(self, name, value, callback=None, errback=None): """Set the value of a parameter from the ROS Parameter Server. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . """ param = Param(self, name) param.set(value, callback, errback, timeout=ROSAPI_TIMEOUT)
[docs] def delete_param(self, name, callback=None, errback=None): """Delete parameter from the ROS Parameter Server. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . """ param = Param(self, name) param.delete(callback, errback, timeout=ROSAPI_TIMEOUT)
[docs] def get_action_servers(self, callback, errback=None): """Retrieve list of action servers in ROS.""" service = Service(self, "/rosapi/action_servers", "rosapi/GetActionServers") service.call(ServiceRequest(), callback, errback)
[docs] def get_nodes(self, callback=None, errback=None): """Retrieve list of active node names in ROS. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . """ service = Service(self, "/rosapi/nodes", "rosapi/Nodes") result = service.call(ServiceRequest(), callback, errback) if callback: return assert "nodes" in result return result["nodes"]
[docs] def get_node_details(self, node, callback=None, errback=None): """Retrieve list subscribed topics, publishing topics and services of a specific node name. Note: To make this a blocking call, pass ``None`` to the ``callback`` parameter . """ service = Service(self, "/rosapi/node_details", "rosapi/NodeDetails") result = service.call(ServiceRequest({"node": node}), callback, errback) if callback: return output = { "services": result["services"], "subscribing": result["subscribing"], "publishing": result["publishing"], } return output
if __name__ == "__main__": FORMAT = "%(asctime)-15s [%(levelname)s] %(message)s" logging.basicConfig(level=logging.DEBUG, format=FORMAT) ros_client = Ros("127.0.0.1", 9090) ros_client.on_ready(lambda: ros_client.get_topics(print)) ros_client.call_later(3, ros_client.close) ros_client.call_later(5, ros_client.terminate) ros_client.run_forever()