Source code for roslibpy.core

from __future__ import print_function

import json
import logging
import time

# Python 2/3 compatibility import list
try:
    from collections import UserDict
except ImportError:
    from UserDict import UserDict

LOGGER = logging.getLogger("roslibpy")

__all__ = [
    "Header",
    "Message",
    "Param",
    "RosTimeoutError",
    "Service",
    "ServiceRequest",
    "ServiceResponse",
    "Time",
    "Topic",
]


try:

    class RosTimeoutError(TimeoutError):
        pass

except NameError:

    class RosTimeoutError(Exception):
        pass


[docs]class Message(UserDict): """Message objects used for publishing and subscribing to/from topics. A message is fundamentally a dictionary and behaves as one.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
[docs]class Time(UserDict): """Represents ROS time with two integers: seconds since epoch and nanoseconds since seconds.""" def __init__(self, secs, nsecs): self.data = {} self.data["secs"] = self._ensure_int(secs) self.data["nsecs"] = self._ensure_int(nsecs) def _ensure_int(self, n): if isinstance(n, int): return n if isinstance(n, float) and n.is_integer(): return int(n) raise ValueError("argument must be an integer") @property def secs(self): """Seconds since epoch.""" return self.data["secs"] @property def nsecs(self): """Nanoseconds since seconds.""" return self.data["nsecs"]
[docs] def is_zero(self): """Return ``True`` if zero (secs and nsecs) otherwise ``False``.""" return self.data["secs"] == 0 and self.data["nsecs"] == 0
[docs] def to_nsec(self): """Return time as nanoseconds from epoch.""" stamp_secs = self.data["secs"] stamp_nsecs = self.data["nsecs"] return stamp_secs * int(1e9) + stamp_nsecs
[docs] def to_sec(self): """Return time as float seconds representation (same as ``time.time()``).""" return float(self.data["secs"]) + float(self.data["nsecs"]) / int(1e9)
[docs] @staticmethod def from_sec(float_secs): """Create new Time instance from a float seconds representation (e.g. ``time.time()``).""" secs = int(float_secs) nsecs = int((float_secs - secs) * int(1e9)) return Time(secs, nsecs)
[docs] @staticmethod def now(): """Create new Time instance from the current system time (not ROS time).""" return Time.from_sec(time.time())
[docs]class ServiceRequest(UserDict): """Request for a service call.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
[docs]class ServiceResponse(UserDict): """Response returned from a service call.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
class MessageEncoder(json.JSONEncoder): """Internal class to serialize some of the core data types into json.""" def default(self, o): if isinstance(o, Header): return dict(o) if isinstance(o, Time): return dict(o) return super(MessageEncoder, self).default(o)
[docs]class Topic(object): """Publish and/or subscribe to a topic in ROS. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Topic name, e.g. ``/cmd_vel``. message_type (:obj:`str`): Message type, e.g. ``std_msgs/String``. compression (:obj:`str`): Type of compression to use, e.g. `png`. Defaults to `None`. throttle_rate (:obj:`int`): Rate (in ms between messages) at which to throttle the topics. queue_size (:obj:`int`): Queue size created at bridge side for re-publishing webtopics. latch (:obj:`bool`): True to latch the topic when publishing, False otherwise. queue_length (:obj:`int`): Queue length at bridge side used when subscribing. reconnect_on_close (:obj:`bool`): Reconnect topic (both publisher & subscriber) on reconnection. """ SUPPORTED_COMPRESSION_TYPES = ("png", "none") def __init__( self, ros, name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=100, queue_length=0, reconnect_on_close=True, ): self.ros = ros self.name = name self.message_type = message_type self.compression = compression self.latch = latch self.throttle_rate = throttle_rate self.queue_size = queue_size self.queue_length = queue_length self._subscribe_id = None self._advertise_id = None if self.compression is None: self.compression = "none" if self.compression not in self.SUPPORTED_COMPRESSION_TYPES: raise ValueError("Unsupported compression type. Must be one of: " + str(self.SUPPORTED_COMPRESSION_TYPES)) self.reconnect_on_close = reconnect_on_close @property def is_advertised(self): """Indicate if the topic is currently advertised or not. Returns: bool: True if advertised as publisher of this topic, False otherwise. """ return self._advertise_id is not None @property def is_subscribed(self): """Indicate if the topic is currently subscribed or not. Returns: bool: True if subscribed to this topic, False otherwise. """ return self._subscribe_id is not None
[docs] def subscribe(self, callback): """Register a subscription to the topic. Every time a message is published for the given topic, the callback will be called with the message object. Args: callback: Function to be called when messages of this topic are published. """ # Avoid duplicate subscription if self._subscribe_id: return self._subscribe_id = "subscribe:%s:%d" % (self.name, self.ros.id_counter) self.ros.on(self.name, callback) self._connect_topic( Message( { "op": "subscribe", "id": self._subscribe_id, "type": self.message_type, "topic": self.name, "compression": self.compression, "throttle_rate": self.throttle_rate, "queue_length": self.queue_length, } ) )
[docs] def unsubscribe(self): """Unregister from a subscribed the topic.""" if not self._subscribe_id: return # Do not try to reconnect when manually unsubscribing if self.reconnect_on_close: self.ros.off("close", self._reconnect_topic) self.ros.off(self.name) self.ros.send_on_ready(Message({"op": "unsubscribe", "id": self._subscribe_id, "topic": self.name})) self._subscribe_id = None
[docs] def publish(self, message): """Publish a message to the topic. Args: message (:class:`.Message`): ROS Bridge Message to publish. """ if not self.is_advertised: self.advertise() self.ros.send_on_ready( Message( { "op": "publish", "id": "publish:%s:%d" % (self.name, self.ros.id_counter), "topic": self.name, "msg": dict(message), "latch": self.latch, } ) )
[docs] def advertise(self): """Register as a publisher for the topic.""" if self.is_advertised: return self._advertise_id = "advertise:%s:%d" % (self.name, self.ros.id_counter) self._connect_topic( Message( { "op": "advertise", "id": self._advertise_id, "type": self.message_type, "topic": self.name, "latch": self.latch, "queue_size": self.queue_size, } ) ) if not self.reconnect_on_close: self.ros.on("close", self._reset_advertise_id)
def _reset_advertise_id(self, _proto): self._advertise_id = None def _connect_topic(self, message): self._connect_message = message self.ros.send_on_ready(message) if self.reconnect_on_close: self.ros.on("close", self._reconnect_topic) def _reconnect_topic(self, _proto): # Delay a bit the event hookup because # 1) _proto is not yet nullified, and # 2) reconnect anyway takes a few seconds self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self): """Unregister as a publisher for the topic.""" if not self.is_advertised: return # Do not try to reconnect when manually unadvertising if self.reconnect_on_close: self.ros.off("close", self._reconnect_topic) self.ros.send_on_ready( Message( { "op": "unadvertise", "id": self._advertise_id, "topic": self.name, } ) ) self._advertise_id = None
class ServiceException(Exception): """Exception that is thrown when a service call fails.""" pass
[docs]class Service(object): """Client/server of ROS services. This class can be used both to consume other ROS services as a client, or to provide ROS services as a server. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Service name, e.g. ``/add_two_ints``. service_type (:obj:`str`): Service type, e.g. ``rospy_tutorials/AddTwoInts``. """ def __init__(self, ros, name, service_type, reconnect_on_close=True): self.ros = ros self.name = name self.service_type = service_type self._service_callback = None self._is_advertised = False self.reconnect_on_close = reconnect_on_close @property def is_advertised(self): """Service servers are registered as advertised on ROS. This class can be used to be a service client or a server. Returns: bool: True if this is a server, False otherwise. """ return self._is_advertised
[docs] def call(self, request, callback=None, errback=None, timeout=None): """Start a service call. Note: The service can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, then the call will block until receiving a response. Otherwise, the service response will be returned in the callback. Args: request (:class:`.ServiceRequest`): Service request. callback: Callback invoked on successful execution. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. Returns: object: Service response if used as a blocking call, otherwise ``None``. """ if self.is_advertised: return service_call_id = "call_service:%s:%d" % (self.name, self.ros.id_counter) message = Message( { "op": "call_service", "id": service_call_id, "service": self.name, "args": dict(request), } ) # Non-blocking mode if callback: self.ros.call_async_service(message, callback, errback) return # Blocking mode call_results = self.ros.call_sync_service(message, timeout) if "exception" in call_results: raise ServiceException(call_results["exception"]) return call_results["result"]
[docs] def advertise(self, callback): """Start advertising the service. This turns the instance from a client into a server. The callback will be invoked with every request that is made to the service. If the service is already advertised, this call does nothing. Args: callback: Callback invoked on every service call. It should accept two parameters: `service_request` and `service_response`. It should return `True` if executed correctly, otherwise `False`. """ if self.is_advertised: return if not callable(callback): raise ValueError("Callback is not a valid callable") self._service_callback = callback self.ros.on(self.name, self._service_response_handler) self._connect_service(Message({"op": "advertise_service", "type": self.service_type, "service": self.name})) self._is_advertised = True if not self.reconnect_on_close: self.ros.on("close", self._reset_advertise_id)
def _reset_advertise_id(self, _proto): self._is_advertised = False def _connect_service(self, message): self._connect_message = message self.ros.send_on_ready(message) if self.reconnect_on_close: self.ros.on("close", self._reconnect_service) def _reconnect_service(self, _proto): # Delay a bit the event hookup because # 1) _proto is not yet nullified, and # 2) reconnect anyway takes a few seconds self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self): """Unregister as a service server.""" if not self.is_advertised: return # Do not try to reconnect when manually unadvertising if self.reconnect_on_close: self.ros.off("close", self._reconnect_service) self.ros.send_on_ready( Message( { "op": "unadvertise_service", "service": self.name, } ) ) self.ros.off(self.name, self._service_response_handler) self._is_advertised = False
def _service_response_handler(self, request): response = ServiceResponse() success = self._service_callback(request["args"], response) call = Message({"op": "service_response", "service": self.name, "values": dict(response), "result": success}) if "id" in request: call["id"] = request["id"] self.ros.send_on_ready(call)
[docs]class Param(object): """A ROS parameter. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Parameter name, e.g. ``max_vel_x``. """ def __init__(self, ros, name): self.ros = ros self.name = name
[docs] def get(self, callback=None, errback=None, timeout=None): """Fetch the current value of the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block and return the parameter value. Otherwise, the parameter value will be passed on to the callback. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. Returns: object: Parameter value if used as a blocking call, otherwise ``None``. """ client = Service(self.ros, "/rosapi/get_param", "rosapi/GetParam") request = ServiceRequest({"name": self.name}) if not callback: result = client.call(request, timeout=timeout) return json.loads(result["value"]) else: client.call(request, lambda result: callback(json.loads(result["value"])), errback)
[docs] def set(self, value, callback=None, errback=None, timeout=None): """Set a new value to the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block until completion. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. """ client = Service(self.ros, "/rosapi/set_param", "rosapi/SetParam") request = ServiceRequest({"name": self.name, "value": json.dumps(value)}) client.call(request, callback, errback, timeout=timeout)
[docs] def delete(self, callback=None, errback=None, timeout=None): """Delete the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block until completion. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. """ client = Service(self.ros, "/rosapi/delete_param", "rosapi/DeleteParam") request = ServiceRequest({"name": self.name}) client.call(request, callback, errback, timeout=timeout)
if __name__ == "__main__": from . import Ros FORMAT = "%(asctime)-15s [%(levelname)s] %(message)s" logging.basicConfig(level=logging.DEBUG, format=FORMAT) ros_client = Ros("127.0.0.1", 9090) def run_subscriber_example(): listener = Topic(ros_client, "/chatter", "std_msgs/String") listener.subscribe(lambda message: LOGGER.info("Received message on: %s", message["data"])) def run_unsubscriber_example(): listener = Topic(ros_client, "/chatter", "std_msgs/String") def print_message(message): LOGGER.info("Received message on: %s", message["data"]) listener.subscribe(print_message) ros_client.call_later(5, lambda: listener.unsubscribe()) ros_client.call_later(10, lambda: listener.subscribe(print_message)) def run_publisher_example(): publisher = Topic(ros_client, "/chatter", "std_msgs/String", compression="png") def start_sending(): i = 0 while ros_client.is_connected and i < 5: i += 1 message = Message({"data": "test"}) LOGGER.info("Publishing message to /chatter. %s", message) publisher.publish(message) time.sleep(0.75) publisher.unadvertise() ros_client.on_ready(start_sending, run_in_thread=True) def run_service_example(): def h1(x): print("ok", x) def h2(x): print("error", x) service = Service(ros_client, "/turtle1/teleport_relative", "turtlesim/TeleportRelative") service.call(ServiceRequest({"linear": 2, "angular": 2}), h1, h2) def run_turtle_subscriber_example(): listener = Topic(ros_client, "/turtle1/pose", "turtlesim/Pose", throttle_rate=500) def print_message(message): LOGGER.info("Received message on: %s", message) listener.subscribe(print_message) def run_get_example(): param = Param(ros_client, "run_id") param.get(print) def run_set_example(): param = Param(ros_client, "test_param") param.set("test_value") def run_delete_example(): param = Param(ros_client, "test_param") param.delete() def run_server_example(): service = Service(ros_client, "/test_server", "rospy_tutorials/AddTwoInts") def dispose_server(): service.unadvertise() ros_client.call_later(1, service.ros.terminate) def add_two_ints(request, response): response["sum"] = request["a"] + request["b"] if response["sum"] == 42: ros_client.call_later(2, dispose_server) return True service.advertise(add_two_ints) run_server_example() ros_client.run_forever()