Skip to main content

Create LOPActionServer

Overview

The LOPActionServer is a ROS node that handles lift operation panel (LOP) actions through BLE communication. This action is meant to be executed at the Lift Landing before entering the lift. The sequence of operations is as follows:

  1. Request OTP and LOP info from the cloud.
  2. Authenticate with the LOP.
  3. Summon the lift.

Prerequisites

Before proceeding with the step by step guide, ensure you have finished the following:

Also ensure you have the following packages installed:

  • rclpy
  • lift_svc_interfaces
  • bleak
  • asyncio
  • json

Once you have fulfilled these prerequisites, you can proceed with the step-by-step guide.

Step 1: Create the ROS 2 Package

Navigate to your ROS 2 workspace and create a new package:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake lift_svc_action_server --dependencies rclpy action_msgs std_msgs bleak

Step 2: Create LOP_Utils Module

Create a file named lop_utils.py in the lift_svc_action_server package:

touch ~/ros2_ws/src/lift_svc_action_server/lift_svc_action_server/lop_utils.py

Open lop_utils.py and paste the following code:

 import asyncio
import json
import requests
from bleak import BleakClient, BleakScanner, BleakError

async def get_lift_info(lift_id, api_key):
# API call to get lift information
url = f"https://api-av5vhokaxq-uc.a.run.app/api/lifts/{lift_id}/info"
headers = {
"Content-Type": "application/json",
"Authorization": api_key
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()

async def get_otp(lift_id, api_key):
# API call to get one-time password
url = f"https://api-av5vhokaxq-uc.a.run.app/api/lifts/{lift_id}/otp"
headers = {
"Content-Type": "application/json",
"Authorization": api_key
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json().get('otp')

async def find_device(device_address:str):
devices = await BleakScanner.discover()
for device in devices:
if device.address == device_address:
return device
raise Exception(f"Device {device_address} not found")

class State:
def __init__(self):
self.is_authenticated = False
self.has_error = False
self.error = None
self.lift_arrived = False

import json
from bleak import BleakClient

async def connect_and_perform_actions(
lop_address, lop_char_uuid, otp, current_floor, target_floor, log_handler
):
state = State()
direction = None

# Keep-alive timer
timer = 0
HOLD_DOOR_DURATION = 5

if current_floor < target_floor:
direction = "UP"
else:
direction = "DOWN"

def handle_notification(sender, data):
# Handle notifications from the BLE device
json_data = json.loads(data.decode('utf-8'))
print(f"Received notification: {json_data}")

# Handle different types of notifications
if json_data.get('code') == 'ERROR':
state.error = json_data.get('value')
state.has_error = True
log_handler(f"Error: {state.error}")

if json_data.get('code') == 'AUTH_OK':
state.is_authenticated = True
log_handler("Authenticated Successfuly.")

if json_data.get('code') == f'ARRIVAL_UP' and direction == 'UP':
state.lift_arrived = True
log_handler(f"Lift arrived: {json_data.get('value')}")

if json_data.get('code') == f'ARRIVAL_DOWN' and direction == 'DOWN':
state.lift_arrived = True
log_handler(f"Lift arrived: {json_data.get('value')}")

async with BleakClient(lop_address) as client:
# Connect to the device
log_handler(f"Connected to {lop_address}.")
await client.start_notify(lop_char_uuid, handle_notification)
await asyncio.sleep(1) # Wait for notification subscription to stabilize

# Authenticate with the device
message = json.dumps({"code": "AUTH", "otp": otp})
await client.write_gatt_char(lop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

# Wait for authentication confirmation
while not state.is_authenticated and not state.has_error:
await asyncio.sleep(1)

if state.has_error:
return False, f'Error: {state.error}'

# Send lift movement command
message = json.dumps({"code": direction, "hold_door_duration": HOLD_DOOR_DURATION})
await client.write_gatt_char(lop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

# Mock lift arrival (for testing purposes)
message = json.dumps({"code": "DEV_START"})
await client.write_gatt_char(lop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

message = json.dumps({"code": f"MOCK_ARRIVAL_{direction}"})
await client.write_gatt_char(lop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)


# Wait for lift arrival confirmation. This might take awhile.
while not state.lift_arrived and not state.has_error:
if timer > 30:
message = json.dumps({"code": "KEEP_ALIVE"})
await client.write_gatt_char(lop_char_uuid, message.encode('utf-8'))
timer = 0
await asyncio.sleep(1)

if state.has_error:
return False, f'Error: {state.error}'

return True, 'Device action completed successfully'


async def main():
lift_id = "12345"
api_key = "12345"

# Get lift information
lift_info = await get_lift_info(lift_id, api_key)

# Get one-time password
otp = await get_otp(lift_id, api_key)
# otp = "123456"

lop = lift_info.get('lop_devices')[0]
lop_address = lop.get('address')
lop_char_uuid = lop.get('char_uuid')

# Find the BLE device
device = await find_device(lop_address)

# Connect to the device and perform actions
success, message = await connect_and_perform_actions(
lop_address = device.address,
lop_char_uuid = lop_char_uuid,
otp = otp,
current_floor = 0,
target_floor = 5,
log_handler = print
)

if success:
print("Device action completed successfully")
else:
print(f"Failed to connect: {message}")

if __name__ == '__main__':
# get event loop and run
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Step 3: Create the Action Server

Create a file named lop_action_server.py in the lift_svc_action_server package:

Create a file named lop_action_server.py in the lift_svc_action_server package:

touch ~/ros2_ws/src/lift_svc_action_server/lift_svc_action_server/lop_action_server.py

Step 4: Implement the Action Server Code

Open lop_action_server.py and paste the following code:

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from lift_svc_interface.action import LOPActionInterface


import bleak
from bleak import BleakClient, BleakScanner, BleakError
import asyncio
import json

from lift_svc_interfaces.msg import LiftState
import requests
from lop_utils import get_lift_info, get_otp, find_device, connect_and_perform_actions

class LOPActionServer(Node):
def __init__(self):
super().__init__(f'lop_action_server')

# Initialize the action server
self.server = ActionServer(self, LOPAction, 'lop_action', self.device_action_callback)

def device_action_callback(self, goal_handle):
# Main callback for handling action requests
loop = asyncio.get_event_loop()
future = loop.create_task(self._device_action_callback(goal_handle))
loop.run_until_complete(future)
return future.result()

async def _device_action_callback(self,goal_handle):
lift_id = goal_handle.request.lift_id
api_key = goal_handle.request.api_key
current_floor = goal_handle.request.current_floor
target_floor = goal_handle.request.target_floor
hold_door_duration = goal_handle.request.hold_door_duration

def log_handler(message):
# Log messages to the console
self.get_logger().info(message)
feedback = LOPAction.Feedback(message = message)
goal_handle.publish_feedback(feedback)

try:
# Get lift information
lift_info = await get_lift_info(lift_id, api_key)
log_handler(f"Got lift information: {lift_info}")

# Get one-time password
otp = await get_otp(lift_id, api_key)
log_handler(f"Got OTP: {otp}")
otp = "123456"

lop = lift_info.get('lop_devices')[current_floor]
lop_address = lop.get('address')
lop_char_uuid = lop.get('char_uuid')

cop = lift_info.get('cop_device')
cop_address = cop.get('address')
cop_char_uuid = cop.get('char_uuid')

# Find the BLE device
device = await find_device(lop_address)
log_handler(f"Found device: {device}")

# Connect to the BLE device and perform actions
success, message = await connect_and_perform_actions(
otp = otp,
current_floor = current_floor,
target_floor = target_floor,
lop_address = device.address,
lop_char_uuid = lop_char_uuid,
hold_door_duration = hold_door_duration,
log_handler = log_handler
)

if not success:
raise Exception(f"Failed: {message}")

goal_handle.succeed()
return LOPAction.Result(
message="Action completed successfully",
success=True,
otp = otp,
cop_address = cop_address,
cop_char_uuid = cop_char_uuid
)


except Exception as e:
# Handle exceptions
goal_handle.abort()
return LOPAction.Result(message=str(e), success=False)

def feedback(self, goal_handle, message):
# Publish feedback to the action client
feedback = LOPAction.Feedback()
feedback.message = message
goal_handle.publish_feedback(feedback)

def main(args=None):
# Initialize ROS node and start the action server
rclpy.init(args=args)
action_server = LOPActionServer()
rclpy.spin(action_server)
action_server.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()


### Step 5: Create Action Client Node

**Create a file** named `lop_action_client.py` in the `lift_svc_action_server` package:

```bash
touch ~/ros2_ws/src/lift_svc_action_server/lift_svc_action_server/lop_action_client.py

Step 6: Implement Action Client Code

Open lop_action_client.py and paste the following code:

    import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from lift_svc_interfaces.action import LOPAction


class LOPActionClient(Node):

def __init__(self):
super().__init__('lop_action_client')
self._action_client = ActionClient(self, LOPAction, 'lop_action')

def send_goal(self, lift_id:str,current_floor: int, target_floor: int , hold_door_duration:int, api_key:str):
goal_msg = LOPAction.Goal()
goal_msg.hold_door_duration = hold_door_duration
goal_msg.lift_id = lift_id
goal_msg.api_key = api_key
goal_msg.current_floor = current_floor
goal_msg.target_floor = target_floor

self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return

self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result))
rclpy.shutdown()

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.message))

# Call to sample lift
action_client.send_goal(lift_id="12345", current_floor=0, target_floor=1, hold_door_duration=10000, api_key="12345")

rclpy.spin(action_client)

def main(args=None):
rclpy.init(args=args)
action_client = LOPActionClient()
# Call to sample lift
action_client.send_goal(lift_id="12345", current_floor=0, target_floor=1, hold_door_duration=10000, api_key="12345")
rclpy.spin(action_client)

if __name__ == '__main__':
main()

Step 7: Add Dependencies to package.xml

Ensure the package.xml includes the necessary dependencies:

<exec_depend>rclpy</exec_depend>
<exec_depend>lift_svc_interfaces</exec_depend>
<exec_depend>bleak</exec_depend>
<exec_depend>asyncio</exec_depend>
<exec_depend>json</exec_depend>

Step 8: Modify the Package Configuration

Open the setup.py file in the lift_svc_action_server package and update the entry_points section to include the new node:

entry_points={
'console_scripts': [
'lop_action_server = lift_svc_action_server.lop_action_server:main',
'lop_action_client = lift_svc_action_server.lop_action_client:main',
],
},

Step 9: Build the Package

Build your workspace to compile the new node and action interface:

cd ~/ros2_ws
colcon build

Step 10: Run the Action Server

Source your workspace and run the new action server node:

source ~/ros2_ws/install/setup.bash
ros2 run lift_svc_action_server lop_action_server

Explanation of the Code

  • Node Initialization: The LOPActionServer class is a ROS 2 node that initializes an action server and a publisher for lift state updates.
  • API Calls: The get_lop and get_otp methods make API calls to obtain lift information and one-time passwords for authentication.
  • Action Callback: The device_action_callback method handles incoming action goals, performing BLE communication to authenticate and send commands to the lift operating panel.
  • Notification Handling: The handle_notification method processes notifications from the BLE device, updating the state and publishing messages to a ROS topic.

Summary

By following these steps, you will have a working ROS action server that communicates with lift operating panels via BLE. This server can be integrated into a larger ROS system to control lift operations based on received action goals.

What's Next

Now you are ready to add the COPActionServer to the package.