Source code for pyrobot.robots.locobot.base

# Copyright (c) Facebook, Inc. and its affiliates.

# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from __future__ import print_function

import copy
import os
from functools import partial
from os.path import expanduser

import numpy as np
import rospy
import tf
import tf.transformations
from kobuki_msgs.msg import BumperEvent, CliffEvent, WheelDropEvent
from nav_msgs.msg import Odometry

try:
    from orb_slam2_ros.vslam import VisualSLAM

    USE_ORB_SLAM2 = True
except:
    USE_ORB_SLAM2 = False

from pyrobot.robots.base import Base
from std_msgs.msg import Empty

from pyrobot.robots.locobot.base_control_utils import (
    LocalActionStatus,
    LocalActionServer,
)

from pyrobot.utils.util import append_namespace

import actionlib
from control_msgs.msg import (
    FollowJointTrajectoryAction,
    FollowJointTrajectoryGoal,
)
from std_msgs.msg import Empty

from actionlib_msgs.msg import GoalStatus


class BaseSafetyCallbacks(object):
    """
    This class encapsulates and provides interface to bumper, cliff and
    wheeldrop sensors on the base.  It all also trigers necessary callbacks
    when these sensors get tripped.
    """

    def __init__(self, base, ns):
        self.ns = ns
        self.should_stop = False
        self.bumper = False
        self.cliff = False
        self.wheel_drop = False
        self.subscribers = []

        s = rospy.Subscriber(
            append_namespace(self.ns, self.configs.ROSTOPIC_BUMPER),
            BumperEvent,
            self.bumper_callback_kobuki,
        )
        self.subscribers.append(s)
        s = rospy.Subscriber(
            append_namespace(self.ns, self.configs.ROSTOPIC_CLIFF), 
            CliffEvent, 
            self.cliff_callback
        )
        self.subscribers.append(s)
        s = rospy.Subscriber(
            append_namespace(self.ns, self.configs.ROSTOPIC_WHEELDROP),
            WheelDropEvent,
            self.wheeldrop_callback,
        )
        self.subscribers.append(s)

    def bumper_callback_create(self, data):
        bumped = data.is_left_pressed or data.is_right_pressed
        to_bump = (
            data.is_light_left
            or data.is_light_center_left
            or data.is_light_center_right
            or data.is_light_front_right
            or data.is_light_right
        )
        if bumped or to_bump:
            if self.bumper is False:
                rospy.loginfo("Bumper Detected")
            self.should_stop = True
            self.bumper = True
        else:
            self.bumper = False

    def cliff_callback(self, data):
        rospy.loginfo("Cliff Detected")
        self.should_stop = True
        self.cliff = True

    def wheeldrop_callback(self, data):
        rospy.loginfo("Drop the contact")
        self.should_stop = True
        self.wheel_drop = True

    def bumper_callback_kobuki(self, date):
        rospy.loginfo("Bumper Detected")
        self.bumper = True
        self.should_stop = True

    def __del__(self):
        # Delete callback on deletion of object.
        for s in self.subscribers:
            s.unregister()

class BaseState(BaseSafetyCallbacks):
    def __init__(self, base, configs, ns):
        # Set up SLAM, if requested.
        self.configs = configs
        BaseSafetyCallbacks.__init__(self, base, ns)

    def __del__(self):
        BaseSafetyCallbacks.__del__(self)


[docs]class LoCoBotBase(Base): """ This is a common base class for the locobot and locobot-lite base. """
[docs] def __init__( self, configs, ns="" ): """ The constructor for LoCoBotBase class. :param configs: configurations read from config file :type configs: YACS CfgNode """ super(LoCoBotBase, self).__init__(configs=configs, ns=ns) use_base = rospy.get_param("use_base", False) or rospy.get_param( "use_sim", False ) if not use_base: rospy.logwarn( "Neither use_base, nor use_sim, is not set to True " "when the LoCoBot driver is launched. " "You may not be able to command the base " "correctly using PyRobot!" ) return base = configs.BASE_TYPE assert base in [ "kobuki", "create", ], "BASE should be one of kobuki, create but is {:s}".format(base) self.base_state = BaseState(base, configs, ns) rospy.on_shutdown(self.clean_shutdown)
def clean_shutdown(self): rospy.loginfo("Stopping LoCoBot Base. Waiting for base thread to finish") self.stop()