Skip to main content

Create COPActionServer

Overview

The COPActionServer is a ROS node that handles lift operation panel (COP) actions through BLE communication. This action is meant to be executed inside the Lift Cabin before entering the lift. The sequence of operations is as follows:.

  1. Authenticate with the COP.
  2. Request for target floor.
  3. Wait for lift to arrive at target floor.

Prerequisites

Ensure you have finished the following:

Step 1: Add COP_Utils Module

Create a file named cop_utils.py in the lift_svc_action_server package:

touch ~/ros2_ws/src/lift_svc_action_server/lift_svc_action_server/cop_utils.py

Open cop_utils.py and paste the following code:

import asyncio
import json
from bleak import BleakClient, BleakScanner, BleakError

class State:
def __init__(self):
self.is_authenticated = False
self.has_error = False
self.error = None
self.lift_arrived = False


async def find_device(device_address:str):
devices = await BleakScanner.discover()
for device in devices:
if device.address == device_address:
return device
raise Exception(f"Device {device_address} not found")

async def connect_and_perform_actions(
cop_address, cop_char_uuid, otp, target_floor, hold_door_duration, log_handler
):
state = State()
timer = 0

def handle_notification(sender, data):
json_data = json.loads(data.decode('utf-8'))
log_handler(f"Received notification: {json_data}")

if json_data.get('code') == 'ERROR':
state.error = json_data.get('value')
state.has_error = True
log_handler(f"Error: {state.error}")

if json_data.get('code') == 'AUTH_OK':
state.is_authenticated = True
log_handler("Authenticated successfully.")

if json_data.get('code') == 'CURRENT_FLOOR' and json_data.get('value') == target_floor:
state.lift_arrived = True
log_handler(f"Lift arrived: {json_data.get('value')}")

async with BleakClient(cop_address) as client:
log_handler(f"Connected to {cop_address}.")
await client.start_notify(cop_char_uuid, handle_notification)
await asyncio.sleep(1)

message = json.dumps({"code": "AUTH", "otp": otp})
await client.write_gatt_char(cop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

while not state.is_authenticated and not state.has_error:
await asyncio.sleep(1)

if state.has_error:
return False, f'Error: {state.error}'

message = json.dumps({"code": "GO", "floor": target_floor})
await client.write_gatt_char(cop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

message = json.dumps({"code": "MOCK_CURRENT_FLOOR", "floor": target_floor})
await client.write_gatt_char(cop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

while not state.lift_arrived and not state.has_error:
if timer > 30:
message = json.dumps({"code": "KEEP_ALIVE"})
await client.write_gatt_char(cop_char_uuid, message.encode('utf-8'))
timer = 0
await asyncio.sleep(1)

if state.has_error:
return False, f'Error: {state.error}'

message = json.dumps({"code": "OPEN_DOOR", "duration": hold_door_duration})
await client.write_gatt_char(cop_char_uuid, message.encode('utf-8'))
await asyncio.sleep(1)

return True, 'Device action completed successfully'

async def main():
otp = "123456"
cop_address = "CC:7B:5C:3A:F8:3A"
cop_char_uuid = "beb5483e-36e1-4688-b7f5-ea07361b26a8"

device = await find_device(cop_address)

success, message = await connect_and_perform_actions(
cop_address=device.address,
cop_char_uuid=cop_char_uuid,
otp=otp,
target_floor=5,
hold_door_duration=5,
log_handler=print
)

if success:
print("Device action completed successfully")
else:
print(f"Failed to connect: {message}")

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Step 2: Add COP Action Server

Create the cop_action_server.py file in your package directory. Create a file named cop_action_server.py in the lift_svc_action_server package:

touch ~/ros2_ws/src/lift_svc_action_server/lift_svc_action_server/cop_action_server.py

Add the following code to cop_action_server.py:

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from lift_svc_interfaces.action import COPAction
import bleak
from bleak import BleakClient, BleakScanner, BleakError
import asyncio
import json
from lift_svc_interfaces.msg import LiftState
import requests
from cop_utils import find_device, connect_and_perform_actions

class COPActionServer(Node):
def __init__(self):
super().__init__(f'cop_action_server')

# Initialize the action server
self.server = ActionServer(self, COPAction, 'cop_action', self.device_action_callback)

def device_action_callback(self, goal_handle):
# Main callback for handling action requests
loop = asyncio.get_event_loop()
future = loop.create_task(self._device_action_callback(goal_handle))
loop.run_until_complete(future)
return future.result()

async def _device_action_callback(self,goal_handle):
otp = goal_handle.request.otp
target_floor = goal_handle.request.target_floor
cop_address = goal_handle.request.cop_address
cop_char_uuid = goal_handle.request.cop_char_uuid

hold_door_duration = 5000

def log_handler(message):
# Log messages to the console
self.get_logger().info(message)
feedback = COPAction.Feedback(message = message)
goal_handle.publish_feedback(feedback)

try:
# Find the BLE device
device = await find_device(cop_address)
log_handler(f"Found device: {device}")

# Connect to the BLE device and perform actions
success, message = await connect_and_perform_actions(
cop_address=device.address,
cop_char_uuid=cop_char_uuid,
otp=otp,
target_floor=5,
hold_door_duration=5,
log_handler=log_handler
)

if not success:
raise Exception(f"Failed: {message}")

goal_handle.succeed()
return COPAction.Result(
message="Action completed successfully",
success=True,
)


except Exception as e:
# Handle exceptions
goal_handle.abort()
return LOPAction.Result(message=str(e), success=False)

def main(args=None):
# Initialize ROS node and start the action server
rclpy.init(args=args)
action_server = COPActionServer()
rclpy.spin(action_server)
action_server.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Step 3: Add COP Action Client

Create the cop_action_client.py file in your package directory.

Add the following code to cop_action_client.py:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from lift_svc_interfaces.action import COPAction


class COPActionClient(Node):

def __init__(self):
super().__init__('cop_action_client')
self._action_client = ActionClient(self, COPAction, 'cop_action')

def send_goal(self, target_floor:int, otp:str, cop_address:str, char_uuid:str):
goal_msg = COPAction.Goal()
goal_msg.otp = otp
goal_msg.target_floor = target_floor
goal_msg.cop_address = cop_address
goal_msg.cop_char_uuid = char_uuid

self._action_client.wait_for_server()

self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return

self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.message))
rclpy.shutdown()

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.message))


def main(args=None):
rclpy.init(args=args)

action_client = COPActionClient()

action_client.send_goal(
otp = "123456",
target_floor = 5,
cop_address = "YOUR_COP_ADDRESS_FROM_SUCCESSFUL_LOP_SERVER",
char_uuid = "YOUR_CHAR_UUID_FROM_SUCCESSFUL_LOP_SERVER"
)

rclpy.spin(action_client)


if __name__ == '__main__':
main()

Make sure to fill in the cop_address and char_uuid fields with the values obtained from the successful execution of the LOP Action Server.

Step 3: Modify package.xml

Ensure the package.xml includes the necessary dependencies:

<exec_depend>rclpy</exec_depend>
<exec_depend>lift_svc_interfaces</exec_depend>
<exec_depend>bleak</exec_depend>
<exec_depend>asyncio</exec_depend>
<exec_depend>json</exec_depend>

Step 4: Modify the Package Configuration

Open the setup.py file in the lift_svc_action_server package and update the entry_points section to include the new node:

entry_points={
'console_scripts': [
'lop_action_server = lift_svc_action_server.lop_action_server:main',
'lop_action_client = lift_svc_action_server.lop_action_client:main',

# Add the following lines
'cop_action_server = lift_svc_action_server.cop_action_server:main',
'cop_action_client = lift_svc_action_server.cop_action_client:main',
],
},

Step 5: Run the Nodes

To run the nodes, use the following commands:

For COP Action Server:

ros2 run <your_package_name> cop_action_server

For COP Action Client:

ros2 run <your_package_name> cop_action_client

This completes the setup for adding the COP Action Server and Client to your ROS 2 package.