Source code for roslibpy.core

from __future__ import print_function

import json
import logging
import time

# Python 2/3 compatibility import list
try:
    from collections import UserDict
except ImportError:
    from UserDict import UserDict

LOGGER = logging.getLogger('roslibpy')

__all__ = ['Message',
           'ServiceRequest',
           'ServiceResponse',
           'Topic',
           'Service',
           'Param']


[docs]class Message(UserDict): """Message objects used for publishing and subscribing to/from topics. A message is fundamentally a dictionary and behaves as one.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
[docs]class Time(UserDict): """Represents ROS time with two integers: seconds since epoch and nanoseconds since seconds.""" def __init__(self, secs, nsecs): self.data = {} self.data['secs'] = secs self.data['nsecs'] = nsecs @property def secs(self): """Seconds since epoch.""" return self.data['secs'] @property def nsecs(self): """Nanoseconds since seconds.""" return self.data['nsecs']
[docs] def is_zero(self): """Return ``True`` if zero (secs and nsecs) otherwise ``False``.""" return self.data['secs'] == 0 and self.data['nsecs'] == 0
[docs] def to_nsec(self): """Return time as nanoseconds from epoch.""" stamp_secs = self.data['secs'] stamp_nsecs = self.data['nsecs'] return stamp_secs * int(1e9) + stamp_nsecs
[docs] def to_sec(self): """Return time as float seconds representation (same as ``time.time()``).""" return float(self.data['secs']) + float(self.data['nsecs']) / int(1e9)
[docs] @staticmethod def from_sec(float_secs): """Create new Time instance from a float seconds representation (e.g. ``time.time()``).""" secs = int(float_secs) nsecs = int((float_secs - secs) * int(1e9)) return Time(secs, nsecs)
[docs] @staticmethod def now(): """Create new Time instance from the current system time (not ROS time).""" return Time.from_sec(time.time())
[docs]class ServiceRequest(UserDict): """Request for a service call.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
[docs]class ServiceResponse(UserDict): """Response returned from a service call.""" def __init__(self, values=None): self.data = {} if values is not None: self.update(values)
class MessageEncoder(json.JSONEncoder): """Internal class to serialize some of the core data types into json.""" def default(self, o): if isinstance(o, Header): return dict(o) if isinstance(o, Time): return dict(o) return super(MessageEncoder, self).default(o)
[docs]class Topic(object): """Publish and/or subscribe to a topic in ROS. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Topic name, e.g. ``/cmd_vel``. message_type (:obj:`str`): Message type, e.g. ``std_msgs/String``. compression (:obj:`str`): Type of compression to use, e.g. `png`. Defaults to `None`. throttle_rate (:obj:`int`): Rate (in ms between messages) at which to throttle the topics. queue_size (:obj:`int`): Queue size created at bridge side for re-publishing webtopics. latch (:obj:`bool`): True to latch the topic when publishing, False otherwise. queue_length (:obj:`int`): Queue length at bridge side used when subscribing. reconnect_on_close (:obj:`bool`): Reconnect the topic (both for publisher and subscribers) if a reconnection is detected. """ SUPPORTED_COMPRESSION_TYPES = ('png', 'none') def __init__(self, ros, name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=100, queue_length=0, reconnect_on_close=True): self.ros = ros self.name = name self.message_type = message_type self.compression = compression self.latch = latch self.throttle_rate = throttle_rate self.queue_size = queue_size self.queue_length = queue_length self._subscribe_id = None self._advertise_id = None if self.compression is None: self.compression = 'none' if self.compression not in self.SUPPORTED_COMPRESSION_TYPES: raise ValueError( 'Unsupported compression type. Must be one of: ' + str(self.SUPPORTED_COMPRESSION_TYPES)) self.reconnect_on_close = reconnect_on_close @property def is_advertised(self): """Indicate if the topic is currently advertised or not. Returns: bool: True if advertised as publisher of this topic, False otherwise. """ return self._advertise_id is not None @property def is_subscribed(self): """Indicate if the topic is currently subscribed or not. Returns: bool: True if subscribed to this topic, False otherwise. """ return self._subscribe_id is not None
[docs] def subscribe(self, callback): """Register a subscription to the topic. Every time a message is published for the given topic, the callback will be called with the message object. Args: callback: Function to be called when messages of this topic are published. """ # Avoid duplicate subscription if self._subscribe_id: return self._subscribe_id = 'subscribe:%s:%d' % ( self.name, self.ros.id_counter) self.ros.on(self.name, callback) self._connect_topic(Message({ 'op': 'subscribe', 'id': self._subscribe_id, 'type': self.message_type, 'topic': self.name, 'compression': self.compression, 'throttle_rate': self.throttle_rate, 'queue_length': self.queue_length }))
[docs] def unsubscribe(self): """Unregister from a subscribed the topic.""" if not self._subscribe_id: return # Do not try to reconnect when manually unsubscribing if self.reconnect_on_close: self.ros.off('close', self._reconnect_topic) self.ros.off(self.name) self.ros.send_on_ready(Message({ 'op': 'unsubscribe', 'id': self._subscribe_id, 'topic': self.name })) self._subscribe_id = None
[docs] def publish(self, message): """Publish a message to the topic. Args: message (:class:`.Message`): ROS Bridge Message to publish. """ if not self.is_advertised: self.advertise() self.ros.send_on_ready(Message({ 'op': 'publish', 'id': 'publish:%s:%d' % (self.name, self.ros.id_counter), 'topic': self.name, 'msg': dict(message), 'latch': self.latch }))
[docs] def advertise(self): """Register as a publisher for the topic.""" if self.is_advertised: return self._advertise_id = 'advertise:%s:%d' % ( self.name, self.ros.id_counter) self._connect_topic(Message({ 'op': 'advertise', 'id': self._advertise_id, 'type': self.message_type, 'topic': self.name, 'latch': self.latch, 'queue_size': self.queue_size })) if not self.reconnect_on_close: self.ros.on('close', self._reset_advertise_id)
def _reset_advertise_id(self, _proto): self._advertise_id = None def _connect_topic(self, message): self._connect_message = message self.ros.send_on_ready(message) if self.reconnect_on_close: self.ros.on('close', self._reconnect_topic) def _reconnect_topic(self, _proto): # Delay a bit the event hookup because # 1) _proto is not yet nullified, and # 2) reconnect anyway takes a few seconds self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self): """Unregister as a publisher for the topic.""" if not self.is_advertised: return # Do not try to reconnect when manually unadvertising if self.reconnect_on_close: self.ros.off('close', self._reconnect_topic) self.ros.send_on_ready(Message({ 'op': 'unadvertise', 'id': self._advertise_id, 'topic': self.name, })) self._advertise_id = None
[docs]class Service(object): """Client/server of ROS services. This class can be used both to consume other ROS services as a client, or to provide ROS services as a server. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Service name, e.g. ``/add_two_ints``. service_type (:obj:`str`): Service type, e.g. ``rospy_tutorials/AddTwoInts``. """ def __init__(self, ros, name, service_type, reconnect_on_close=True): self.ros = ros self.name = name self.service_type = service_type self._service_callback = None self._is_advertised = False self.reconnect_on_close = reconnect_on_close @property def is_advertised(self): """Service servers are registered as advertised on ROS. This class can be used to be a service client or a server. Returns: bool: True if this is a server, False otherwise. """ return self._is_advertised
[docs] def call(self, request, callback=None, errback=None, timeout=None): """Start a service call. Note: The service can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, then the call will block until receiving a response. Otherwise, the service response will be returned in the callback. Args: request (:class:`.ServiceRequest`): Service request. callback: Callback invoked on successful execution. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. Returns: object: Service response if used as a blocking call, otherwise ``None``. """ if self.is_advertised: return service_call_id = 'call_service:%s:%d' % ( self.name, self.ros.id_counter) message = Message({ 'op': 'call_service', 'id': service_call_id, 'service': self.name, 'args': dict(request), }) # Non-blocking mode if callback: self.ros.call_async_service(message, callback, errback) return # Blocking mode call_results = self.ros.call_sync_service(message, timeout) if 'exception' in call_results: raise Exception(call_results['exception']) return call_results['result']
[docs] def advertise(self, callback): """Start advertising the service. This turns the instance from a client into a server. The callback will be invoked with every request that is made to the service. If the service is already advertised, this call does nothing. Args: callback: Callback invoked on every service call. It should accept two parameters: `service_request` and `service_response`. It should return `True` if executed correctly, otherwise `False`. """ if self.is_advertised: return if not callable(callback): raise ValueError('Callback is not a valid callable') self._service_callback = callback self.ros.on(self.name, self._service_response_handler) self._connect_service(Message({ 'op': 'advertise_service', 'type': self.service_type, 'service': self.name })) self._is_advertised = True if not self.reconnect_on_close: self.ros.on('close', self._reset_advertise_id)
def _reset_advertise_id(self, _proto): self._is_advertised = False def _connect_service(self, message): self._connect_message = message self.ros.send_on_ready(message) if self.reconnect_on_close: self.ros.on('close', self._reconnect_service) def _reconnect_service(self, _proto): # Delay a bit the event hookup because # 1) _proto is not yet nullified, and # 2) reconnect anyway takes a few seconds self.ros.call_later(1, lambda: self.ros.send_on_ready(self._connect_message))
[docs] def unadvertise(self): """Unregister as a service server.""" if not self.is_advertised: return # Do not try to reconnect when manually unadvertising if self.reconnect_on_close: self.ros.off('close', self._reconnect_service) self.ros.send_on_ready(Message({ 'op': 'unadvertise_service', 'service': self.name, })) self.ros.off(self.name, self._service_response_handler) self._is_advertised = False
def _service_response_handler(self, request): response = ServiceResponse() success = self._service_callback(request['args'], response) call = Message({'op': 'service_response', 'service': self.name, 'values': dict(response), 'result': success }) if 'id' in request: call['id'] = request['id'] self.ros.send_on_ready(call)
[docs]class Param(object): """A ROS parameter. Args: ros (:class:`.Ros`): Instance of the ROS connection. name (:obj:`str`): Parameter name, e.g. ``max_vel_x``. """ def __init__(self, ros, name): self.ros = ros self.name = name
[docs] def get(self, callback=None, errback=None, timeout=None): """Fetch the current value of the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block and return the parameter value. Otherwise, the parameter value will be passed on to the callback. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. Returns: object: Parameter value if used as a blocking call, otherwise ``None``. """ client = Service(self.ros, '/rosapi/get_param', 'rosapi/GetParam') request = ServiceRequest({'name': self.name}) if not callback: result = client.call(request, timeout=timeout) return json.loads(result['value']) else: client.call(request, lambda result: callback( json.loads(result['value'])), errback)
[docs] def set(self, value, callback=None, errback=None, timeout=None): """Set a new value to the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block until completion. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. """ client = Service(self.ros, '/rosapi/set_param', 'rosapi/SetParam') request = ServiceRequest( {'name': self.name, 'value': json.dumps(value)}) client.call(request, callback, errback, timeout=timeout)
[docs] def delete(self, callback=None, errback=None, timeout=None): """Delete the parameter. Note: This method can be used either as blocking or non-blocking. If the ``callback`` parameter is ``None``, the call will block until completion. Args: callback: Callable function to be invoked when the operation is completed. errback: Callback invoked on error. timeout: Timeout for the operation, in seconds. Only used if blocking. """ client = Service(self.ros, '/rosapi/delete_param', 'rosapi/DeleteParam') request = ServiceRequest({'name': self.name}) client.call(request, callback, errback, timeout=timeout)
if __name__ == '__main__': from . import Ros FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s' logging.basicConfig(level=logging.DEBUG, format=FORMAT) ros_client = Ros('127.0.0.1', 9090) def run_subscriber_example(): listener = Topic(ros_client, '/chatter', 'std_msgs/String') listener.subscribe(lambda message: LOGGER.info( 'Received message on: %s', message['data'])) def run_unsubscriber_example(): listener = Topic(ros_client, '/chatter', 'std_msgs/String') def print_message(message): LOGGER.info('Received message on: %s', message['data']) listener.subscribe(print_message) ros_client.call_later(5, lambda: listener.unsubscribe()) ros_client.call_later(10, lambda: listener.subscribe(print_message)) def run_publisher_example(): publisher = Topic(ros_client, '/chatter', 'std_msgs/String', compression='png') def start_sending(): i = 0 while ros_client.is_connected and i < 5: i += 1 message = Message({'data': 'test'}) LOGGER.info('Publishing message to /chatter. %s', message) publisher.publish(message) time.sleep(0.75) publisher.unadvertise() ros_client.on_ready(start_sending, run_in_thread=True) def run_service_example(): def h1(x): print('ok', x) def h2(x): print('error', x) service = Service(ros_client, '/turtle1/teleport_relative', 'turtlesim/TeleportRelative') service.call(ServiceRequest({'linear': 2, 'angular': 2}), h1, h2) def run_turtle_subscriber_example(): listener = Topic(ros_client, '/turtle1/pose', 'turtlesim/Pose', throttle_rate=500) def print_message(message): LOGGER.info('Received message on: %s', message) listener.subscribe(print_message) def run_get_example(): param = Param(ros_client, 'run_id') param.get(print) def run_set_example(): param = Param(ros_client, 'test_param') param.set('test_value') def run_delete_example(): param = Param(ros_client, 'test_param') param.delete() def run_server_example(): service = Service(ros_client, '/test_server', 'rospy_tutorials/AddTwoInts') def dispose_server(): service.unadvertise() ros_client.call_later(1, service.ros.terminate) def add_two_ints(request, response): response['sum'] = request['a'] + request['b'] if response['sum'] == 42: ros_client.call_later(2, dispose_server) return True service.advertise(add_two_ints) run_server_example() ros_client.run_forever()