from __future__ import print_function
import json
import logging
import time
# Python 2/3 compatibility import list
try:
from collections import UserDict
except ImportError:
from UserDict import UserDict
LOGGER = logging.getLogger("roslibpy")
__all__ = [
"Header",
"Message",
"Param",
"RosTimeoutError",
"Service",
"ServiceRequest",
"ServiceResponse",
"Time",
"Topic",
]
try:
class RosTimeoutError(TimeoutError):
pass
except NameError:
class RosTimeoutError(Exception):
pass
[docs]class Message(UserDict):
"""Message objects used for publishing and subscribing to/from topics.
A message is fundamentally a dictionary and behaves as one."""
def __init__(self, values=None):
self.data = {}
if values is not None:
self.update(values)
[docs]class Time(UserDict):
"""Represents ROS time with two integers: seconds since epoch and nanoseconds since seconds."""
def __init__(self, secs, nsecs):
self.data = {}
self.data["secs"] = self._ensure_int(secs)
self.data["nsecs"] = self._ensure_int(nsecs)
def _ensure_int(self, n):
if isinstance(n, int):
return n
if isinstance(n, float) and n.is_integer():
return int(n)
raise ValueError("argument must be an integer")
@property
def secs(self):
"""Seconds since epoch."""
return self.data["secs"]
@property
def nsecs(self):
"""Nanoseconds since seconds."""
return self.data["nsecs"]
[docs] def is_zero(self):
"""Return ``True`` if zero (secs and nsecs) otherwise ``False``."""
return self.data["secs"] == 0 and self.data["nsecs"] == 0
[docs] def to_nsec(self):
"""Return time as nanoseconds from epoch."""
stamp_secs = self.data["secs"]
stamp_nsecs = self.data["nsecs"]
return stamp_secs * int(1e9) + stamp_nsecs
[docs] def to_sec(self):
"""Return time as float seconds representation (same as ``time.time()``)."""
return float(self.data["secs"]) + float(self.data["nsecs"]) / int(1e9)
[docs] @staticmethod
def from_sec(float_secs):
"""Create new Time instance from a float seconds representation (e.g. ``time.time()``)."""
secs = int(float_secs)
nsecs = int((float_secs - secs) * int(1e9))
return Time(secs, nsecs)
[docs] @staticmethod
def now():
"""Create new Time instance from the current system time (not ROS time)."""
return Time.from_sec(time.time())
[docs]class ServiceRequest(UserDict):
"""Request for a service call."""
def __init__(self, values=None):
self.data = {}
if values is not None:
self.update(values)
[docs]class ServiceResponse(UserDict):
"""Response returned from a service call."""
def __init__(self, values=None):
self.data = {}
if values is not None:
self.update(values)
class MessageEncoder(json.JSONEncoder):
"""Internal class to serialize some of the core data types into json."""
def default(self, o):
if isinstance(o, Header):
return dict(o)
if isinstance(o, Time):
return dict(o)
return super(MessageEncoder, self).default(o)
[docs]class Topic(object):
"""Publish and/or subscribe to a topic in ROS.
Args:
ros (:class:`.Ros`): Instance of the ROS connection.
name (:obj:`str`): Topic name, e.g. ``/cmd_vel``.
message_type (:obj:`str`): Message type, e.g. ``std_msgs/String``.
compression (:obj:`str`): Type of compression to use, e.g. `png`. Defaults to `None`.
throttle_rate (:obj:`int`): Rate (in ms between messages) at which to throttle the topics.
queue_size (:obj:`int`): Queue size created at bridge side for re-publishing webtopics.
latch (:obj:`bool`): True to latch the topic when publishing, False otherwise.
queue_length (:obj:`int`): Queue length at bridge side used when subscribing.
reconnect_on_close (:obj:`bool`): Reconnect topic (both publisher & subscriber) on reconnection.
"""
SUPPORTED_COMPRESSION_TYPES = ("png", "none")
def __init__(
self,
ros,
name,
message_type,
compression=None,
latch=False,
throttle_rate=0,
queue_size=100,
queue_length=0,
reconnect_on_close=True,
):
self.ros = ros
self.name = name
self.message_type = message_type
self.compression = compression
self.latch = latch
self.throttle_rate = throttle_rate
self.queue_size = queue_size
self.queue_length = queue_length
self._subscribe_id = None
self._advertise_id = None
if self.compression is None:
self.compression = "none"
if self.compression not in self.SUPPORTED_COMPRESSION_TYPES:
raise ValueError("Unsupported compression type. Must be one of: " + str(self.SUPPORTED_COMPRESSION_TYPES))
self.reconnect_on_close = reconnect_on_close
@property
def is_advertised(self):
"""Indicate if the topic is currently advertised or not.
Returns:
bool: True if advertised as publisher of this topic, False otherwise.
"""
return self._advertise_id is not None
@property
def is_subscribed(self):
"""Indicate if the topic is currently subscribed or not.
Returns:
bool: True if subscribed to this topic, False otherwise.
"""
return self._subscribe_id is not None
[docs] def subscribe(self, callback):
"""Register a subscription to the topic.
Every time a message is published for the given topic,
the callback will be called with the message object.
Args:
callback: Function to be called when messages of this topic are published.
"""
# Avoid duplicate subscription
if self._subscribe_id:
return
self._subscribe_id = "subscribe:%s:%d" % (self.name, self.ros.id_counter)
self.ros.on(self.name, callback)
self._connect_topic(
Message(
{
"op": "subscribe",
"id": self._subscribe_id,
"type": self.message_type,
"topic": self.name,
"compression": self.compression,
"throttle_rate": self.throttle_rate,
"queue_length": self.queue_length,
}
)
)
[docs] def unsubscribe(self):
"""Unregister from a subscribed the topic."""
if not self._subscribe_id:
return
# Do not try to reconnect when manually unsubscribing
if self.reconnect_on_close:
self.ros.off("close", self._reconnect_topic)
self.ros.off(self.name)
self.ros.send_on_ready(Message({"op": "unsubscribe", "id": self._subscribe_id, "topic": self.name}))
self._subscribe_id = None
[docs] def publish(self, message):
"""Publish a message to the topic.
Args:
message (:class:`.Message`): ROS Bridge Message to publish.
"""
if not self.is_advertised:
self.advertise()
self.ros.send_on_ready(
Message(
{
"op": "publish",
"id": "publish:%s:%d" % (self.name, self.ros.id_counter),
"topic": self.name,
"msg": dict(message),
"latch": self.latch,
}
)
)
[docs] def advertise(self):
"""Register as a publisher for the topic."""
if self.is_advertised:
return
self._advertise_id = "advertise:%s:%d" % (self.name, self.ros.id_counter)
self._connect_topic(
Message(
{
"op": "advertise",
"id": self._advertise_id,
"type": self.message_type,
"topic": self.name,
"latch": self.latch,
"queue_size": self.queue_size,
}
)
)
if not self.reconnect_on_close:
self.ros.on("close", self._reset_advertise_id)
def _reset_advertise_id(self, _proto):
self._advertise_id = None
def _connect_topic(self, message):
self._connect_message = message
self.ros.send_on_ready(message)
if self.reconnect_on_close:
self.ros.on("close", self._reconnect_topic)
def _reconnect_topic(self, _proto):
# Delay a bit the event hookup because
# 1) _proto is not yet nullified, and
# 2) reconnect anyway takes a few seconds
self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self):
"""Unregister as a publisher for the topic."""
if not self.is_advertised:
return
# Do not try to reconnect when manually unadvertising
if self.reconnect_on_close:
self.ros.off("close", self._reconnect_topic)
self.ros.send_on_ready(
Message(
{
"op": "unadvertise",
"id": self._advertise_id,
"topic": self.name,
}
)
)
self._advertise_id = None
class ServiceException(Exception):
"""Exception that is thrown when a service call fails."""
pass
[docs]class Service(object):
"""Client/server of ROS services.
This class can be used both to consume other ROS services as a client,
or to provide ROS services as a server.
Args:
ros (:class:`.Ros`): Instance of the ROS connection.
name (:obj:`str`): Service name, e.g. ``/add_two_ints``.
service_type (:obj:`str`): Service type, e.g. ``rospy_tutorials/AddTwoInts``.
"""
def __init__(self, ros, name, service_type, reconnect_on_close=True):
self.ros = ros
self.name = name
self.service_type = service_type
self._service_callback = None
self._is_advertised = False
self.reconnect_on_close = reconnect_on_close
@property
def is_advertised(self):
"""Service servers are registered as advertised on ROS.
This class can be used to be a service client or a server.
Returns:
bool: True if this is a server, False otherwise.
"""
return self._is_advertised
[docs] def call(self, request, callback=None, errback=None, timeout=None):
"""Start a service call.
Note:
The service can be used either as blocking or non-blocking.
If the ``callback`` parameter is ``None``, then the call will
block until receiving a response. Otherwise, the service response
will be returned in the callback.
Args:
request (:class:`.ServiceRequest`): Service request.
callback: Callback invoked on successful execution.
errback: Callback invoked on error.
timeout: Timeout for the operation, in seconds. Only used if blocking.
Returns:
object: Service response if used as a blocking call, otherwise ``None``.
"""
if self.is_advertised:
return
service_call_id = "call_service:%s:%d" % (self.name, self.ros.id_counter)
message = Message(
{
"op": "call_service",
"id": service_call_id,
"service": self.name,
"args": dict(request),
}
)
# Non-blocking mode
if callback:
self.ros.call_async_service(message, callback, errback)
return
# Blocking mode
call_results = self.ros.call_sync_service(message, timeout)
if "exception" in call_results:
raise ServiceException(call_results["exception"])
return call_results["result"]
[docs] def advertise(self, callback):
"""Start advertising the service.
This turns the instance from a client into a server. The callback will be
invoked with every request that is made to the service.
If the service is already advertised, this call does nothing.
Args:
callback: Callback invoked on every service call. It should accept two parameters: `service_request` and
`service_response`. It should return `True` if executed correctly, otherwise `False`.
"""
if self.is_advertised:
return
if not callable(callback):
raise ValueError("Callback is not a valid callable")
self._service_callback = callback
self.ros.on(self.name, self._service_response_handler)
self._connect_service(Message({"op": "advertise_service", "type": self.service_type, "service": self.name}))
self._is_advertised = True
if not self.reconnect_on_close:
self.ros.on("close", self._reset_advertise_id)
def _reset_advertise_id(self, _proto):
self._is_advertised = False
def _connect_service(self, message):
self._connect_message = message
self.ros.send_on_ready(message)
if self.reconnect_on_close:
self.ros.on("close", self._reconnect_service)
def _reconnect_service(self, _proto):
# Delay a bit the event hookup because
# 1) _proto is not yet nullified, and
# 2) reconnect anyway takes a few seconds
self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self):
"""Unregister as a service server."""
if not self.is_advertised:
return
# Do not try to reconnect when manually unadvertising
if self.reconnect_on_close:
self.ros.off("close", self._reconnect_service)
self.ros.send_on_ready(
Message(
{
"op": "unadvertise_service",
"service": self.name,
}
)
)
self.ros.off(self.name, self._service_response_handler)
self._is_advertised = False
def _service_response_handler(self, request):
response = ServiceResponse()
success = self._service_callback(request["args"], response)
call = Message({"op": "service_response", "service": self.name, "values": dict(response), "result": success})
if "id" in request:
call["id"] = request["id"]
self.ros.send_on_ready(call)
[docs]class Param(object):
"""A ROS parameter.
Args:
ros (:class:`.Ros`): Instance of the ROS connection.
name (:obj:`str`): Parameter name, e.g. ``max_vel_x``.
"""
def __init__(self, ros, name):
self.ros = ros
self.name = name
[docs] def get(self, callback=None, errback=None, timeout=None):
"""Fetch the current value of the parameter.
Note:
This method can be used either as blocking or non-blocking.
If the ``callback`` parameter is ``None``, the call will
block and return the parameter value. Otherwise, the parameter
value will be passed on to the callback.
Args:
callback: Callable function to be invoked when the operation is completed.
errback: Callback invoked on error.
timeout: Timeout for the operation, in seconds. Only used if blocking.
Returns:
object: Parameter value if used as a blocking call, otherwise ``None``.
"""
client = Service(self.ros, "/rosapi/get_param", "rosapi/GetParam")
request = ServiceRequest({"name": self.name})
if not callback:
result = client.call(request, timeout=timeout)
return json.loads(result["value"])
else:
client.call(request, lambda result: callback(json.loads(result["value"])), errback)
[docs] def set(self, value, callback=None, errback=None, timeout=None):
"""Set a new value to the parameter.
Note:
This method can be used either as blocking or non-blocking.
If the ``callback`` parameter is ``None``, the call will
block until completion.
Args:
callback: Callable function to be invoked when the operation is completed.
errback: Callback invoked on error.
timeout: Timeout for the operation, in seconds. Only used if blocking.
"""
client = Service(self.ros, "/rosapi/set_param", "rosapi/SetParam")
request = ServiceRequest({"name": self.name, "value": json.dumps(value)})
client.call(request, callback, errback, timeout=timeout)
[docs] def delete(self, callback=None, errback=None, timeout=None):
"""Delete the parameter.
Note:
This method can be used either as blocking or non-blocking.
If the ``callback`` parameter is ``None``, the call will
block until completion.
Args:
callback: Callable function to be invoked when the operation is completed.
errback: Callback invoked on error.
timeout: Timeout for the operation, in seconds. Only used if blocking.
"""
client = Service(self.ros, "/rosapi/delete_param", "rosapi/DeleteParam")
request = ServiceRequest({"name": self.name})
client.call(request, callback, errback, timeout=timeout)
if __name__ == "__main__":
from . import Ros
FORMAT = "%(asctime)-15s [%(levelname)s] %(message)s"
logging.basicConfig(level=logging.DEBUG, format=FORMAT)
ros_client = Ros("127.0.0.1", 9090)
def run_subscriber_example():
listener = Topic(ros_client, "/chatter", "std_msgs/String")
listener.subscribe(lambda message: LOGGER.info("Received message on: %s", message["data"]))
def run_unsubscriber_example():
listener = Topic(ros_client, "/chatter", "std_msgs/String")
def print_message(message):
LOGGER.info("Received message on: %s", message["data"])
listener.subscribe(print_message)
ros_client.call_later(5, lambda: listener.unsubscribe())
ros_client.call_later(10, lambda: listener.subscribe(print_message))
def run_publisher_example():
publisher = Topic(ros_client, "/chatter", "std_msgs/String", compression="png")
def start_sending():
i = 0
while ros_client.is_connected and i < 5:
i += 1
message = Message({"data": "test"})
LOGGER.info("Publishing message to /chatter. %s", message)
publisher.publish(message)
time.sleep(0.75)
publisher.unadvertise()
ros_client.on_ready(start_sending, run_in_thread=True)
def run_service_example():
def h1(x):
print("ok", x)
def h2(x):
print("error", x)
service = Service(ros_client, "/turtle1/teleport_relative", "turtlesim/TeleportRelative")
service.call(ServiceRequest({"linear": 2, "angular": 2}), h1, h2)
def run_turtle_subscriber_example():
listener = Topic(ros_client, "/turtle1/pose", "turtlesim/Pose", throttle_rate=500)
def print_message(message):
LOGGER.info("Received message on: %s", message)
listener.subscribe(print_message)
def run_get_example():
param = Param(ros_client, "run_id")
param.get(print)
def run_set_example():
param = Param(ros_client, "test_param")
param.set("test_value")
def run_delete_example():
param = Param(ros_client, "test_param")
param.delete()
def run_server_example():
service = Service(ros_client, "/test_server", "rospy_tutorials/AddTwoInts")
def dispose_server():
service.unadvertise()
ros_client.call_later(1, service.ros.terminate)
def add_two_ints(request, response):
response["sum"] = request["a"] + request["b"]
if response["sum"] == 42:
ros_client.call_later(2, dispose_server)
return True
service.advertise(add_two_ints)
run_server_example()
ros_client.run_forever()