-
라즈베리파이 → STM32 uart를 통한 모터 제어Autonomous Lawn Mower/Raspberry pi & STM32 2025. 1. 5. 04:36반응형
25년 4월 27일 update
Speed up/down 때문에 다시 업데이트 함.
Command Length Data (int8_t) Direction Remarks Left_rpm Right_rpm Mower_duty Traction + Mower 0x11 3 10 10 20 Go Initial +10 (0 ~ 110) +10 (0 ~ 110) 20 (0 ~ 100) Speed up -10 (0 ~ 110) -10 (0 ~ 110) 20 (0 ~ 100) Speed down 0x12 3 -10 -10 20 Back Initial +10 (-110 ~ 0) +10 (-110 ~ 0) 20 (0 ~ 100) Speed up -10 (-110 ~ 0) -10 (-110 ~ 0) 20 (0 ~ 100) Speed down 0x13 3 20 -10 20 Left 0x14 3 -10 20 20 Right Stop 0x15 3 - - - Stop import threading
import serial
import time
# Configure UART settings
port_STM32 = '/dev/serial0' # Use '/dev/ttyS0' or '/dev/serial0'
port_bluetooth = '/dev/ttyAMA2'
# Open serial connections
try:
bleSerial = serial.Serial(port=port_bluetooth, baudrate=9600, timeout=1.0)
ser = serial.Serial(port=port_STM32, baudrate=115200, timeout=1.0)
print("Serial ports opened successfully.")
except serial.SerialException as e:
print(f"Error opening serial ports: {e}")
exit(1)
gData = ""
# Constants
STX = 0xFF
ETX = 0xFE
def serial_thread():
global gData
while True:
try:
data = bleSerial.readline()
if data:
gData = data.decode().strip()
except Exception as e:
print(f"Serial thread error: {e}")
def calculate_checksum(cmd, length, data):
"""Calculate the checksum using XOR."""
checksum = cmd ^ length
for byte in data:
checksum ^= byte
return checksum
def twos_complement_8bit(value):
"""Calculate the 8-bit two's complement of an integer."""
if value < 0:
value = (1 << 8) + value
return value & 0xFF
def limit_value(value, min_value, max_value):
"""Limit positive values to [0, max_value] and negative values to [min_value, 0]."""
if value > 0:
return max(0, min(value, max_value))
elif value < 0:
return min(0, max(value, min_value))
else:
return 0
def send_uart_command(serial_port, cmd, data):
"""Send a UART command with the specified data structure."""
length = len(data)
checksum = calculate_checksum(cmd, length, data)
packet = bytearray([STX, cmd, length]) + data + bytearray([checksum, ETX])
try:
serial_port.write(packet)
print(f"Sent packet: {[hex(byte) for byte in packet]}")
except serial.SerialException as e:
print(f"Error sending UART command: {e}")
def send_traction_command(serial_port, cmd, left_rpm, right_rpm, mower_duty):
"""Send a traction command with RPM and mower duty."""
try:
left_rpm_byte = twos_complement_8bit(left_rpm)
right_rpm_byte = twos_complement_8bit(right_rpm)
mower_duty_byte = twos_complement_8bit(mower_duty)
data = bytes([left_rpm_byte, right_rpm_byte, mower_duty_byte])
send_uart_command(serial_port, cmd, data)
except ValueError as e:
print(f"Error preparing traction command: {e}")
def main():
global gData
left_rpm = 0
right_rpm = 0
mower_duty = 50
last_movement_command = ""
try:
while True:
command = gData.lower()
if command:
movement_command = ""
if "go" in command:
movement_command = "go"
elif "back" in command:
movement_command = "back"
elif "left" in command:
movement_command = "left"
elif "right" in command:
movement_command = "right"
elif "stop" in command:
movement_command = "stop"
if movement_command and movement_command != last_movement_command:
last_movement_command = movement_command
print(f"ok {movement_command}")
if movement_command == "go":
left_rpm = 10
right_rpm = 10
mower_duty = 20
elif movement_command == "back":
left_rpm = -10
right_rpm = -10
mower_duty = 20
elif movement_command == "left":
left_rpm = -20
right_rpm = 10
mower_duty = 0
elif movement_command == "right":
left_rpm = 10
right_rpm = -20
mower_duty = 0
elif movement_command == "stop":
left_rpm = 0
right_rpm = 0
mower_duty = 0
if last_movement_command in ["go", "back"]:
if "speed up" in command:
print("ok speed up")
if last_movement_command == "go":
left_rpm = min(left_rpm + 10, 110)
right_rpm = min(right_rpm + 10, 110)
elif last_movement_command == "back":
left_rpm = max(left_rpm - 10, -110)
right_rpm = max(right_rpm - 10, -110)
elif "speed down" in command:
print("ok speed down")
if last_movement_command == "go":
left_rpm = max(left_rpm - 10, 0)
right_rpm = max(right_rpm - 10, 0)
elif last_movement_command == "back":
left_rpm = min(left_rpm + 10, 0)
right_rpm = min(right_rpm + 10, 0)
# Clear the command after processing
gData = ""
# Limit RPM and mower duty cycle
left_rpm = limit_value(left_rpm, -110, 110)
right_rpm = limit_value(right_rpm, -110, 110)
mower_duty = limit_value(mower_duty, -100, 100)
send_traction_command(ser, 0x15, left_rpm, right_rpm, mower_duty)
time.sleep(0.05)
except serial.SerialException as e:
print(f"Serial Error: {e}")
except KeyboardInterrupt:
print("\nProgram terminated by user.")
finally:
bleSerial.close()
ser.close()
print("Serial ports closed.")
if __name__ == '__main__':
task1 = threading.Thread(target=serial_thread, daemon=True)
task1.start()
main()25년 4월 26일 update
로봇이 좌우로 움직이는 상황을 고려해서 코드를 다시 업데이트 했다. 좌우 트랙션 모터의 RPM과 Mower 모터의 회전 방향을 정하기 위해서 라즈베리파이에서 int8_t를 가지는 data를 전송한다. 이때 data는 2의 보수를 이용하여 라즈베리파이에서 전송하고, STM32 마이크로프로세서에서 reverse 2의 보수를 이용하여 데이터 값을 확인한다. data값이 +이면 트랙션 모터는 forward로 이동, -이면 트랙션 모터는 reverse로 이동한다. data값이 +이면 Mower 모터는 오른쪽으로 이동하고, - 값이면 왼쪽으로 이동한다.

명령 테이블은 아래와 같이 업데이트 되었다.
Command Length Data Direction Traction + Mower 0x11 3 Left_rpm, Right_rpm, Mower_duty
- (int8_t) Left_rpm, Right_rpm: -110 to 110
- (int8_t) mower_duty: -100 to 100
※ negative value: reverse, positive value: forwardGo Stop 0x15 3 Left_rpm, Right_rpm, Mower_duty Stop 라즈베리파이의 파이썬 코드는 기존 코드에서 2의 보수 부분이 추가되었다. STM32 코드는 uart.c 내에
void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) 내에 reverse 2의 보수부분이 추가되었다.
https://github.com/Raymond-jang/lawn_mower/tree/master/16_uart_motor_encoder_rpm_ultrasonic_adc
import serial
import time
# Constants
STX = 0xFF
ETX = 0xFE
def calculate_checksum(cmd, length, data):
"""
Calculate the checksum using XOR.
"""
checksum = cmd ^ length
for byte in data:
checksum ^= byte
return checksum
def twos_complement_8bit(value):
""" Calculate the 8bit two's complement of an integer. """
if value < 0:
value = (1<<8) + value # add 256 to wrap into 8 bits
return value & 0xFF # ensure it stays 8 bits
def send_uart_command(serial_port, cmd, data):
"""
Send a UART command with the specified data structure.
:param serial_port: Opened serial port object
:param cmd: Command byte
:param data: List of data bytes (already in the desired format)
"""
length = len(data)
checksum = calculate_checksum(cmd, length, data)
packet = [STX, cmd, length] + list(data) + [checksum, ETX] # Ensure data is a list of integers
# Convert packet to bytes and send
serial_port.write(bytearray(packet))
print(f"Sent packet: {[hex(byte) for byte in packet]}")
def send_traction_command(serial_port, cmd, left_rpm, right_rpm, mower_duty):
"""
Send a traction command with RPM and mower duty as 8-bit two's complement.
:param serial_port: Serial port object
:param cmd: Command for traction operation (0x11, 0x12, 0x13)
:param left_rpm: Desired RPM for the left motor (integer).
:param right_rpm: Desired RPM for the right motor (integer).
:param mower_duty: Mower duty cycle value (integer).
"""
try:
left_rpm_byte = twos_complement_8bit(left_rpm)
right_rpm_byte = twos_complement_8bit(right_rpm)
mower_duty_byte = twos_complement_8bit(mower_duty) # Assuming mower_duty also needs 2's complement
data = bytes([left_rpm_byte, right_rpm_byte, mower_duty_byte])
send_uart_command(serial_port, cmd, data)
except ValueError as e:
print(f"Error: {e}")
def limit_value(value, min_value=-128, max_value=127):
"""Limits a given value to the specified range."""
return max(min(value, max_value), min_value)
# Example usage with infinite loop
if __name__ == "__main__":
# Configure UART settings
port_name = '/dev/serial0' # Use '/dev/ttyS0' or '/dev/serial0'
baud_rate = 115200
try:
# Open serial connection
with serial.Serial(port_name, baud_rate, timeout=1) as ser:
print("Serial port opened successfully.")
# while True:
left_rpm = 130
right_rpm = -130
mower_duty = 50
# Limiting the values
left_rpm = limit_value(left_rpm)
right_rpm = limit_value(right_rpm)
mower_duty = limit_value(mower_duty)
send_traction_command(ser, 0x15, left_rpm, right_rpm, mower_duty) # Example RPM and duty values
time.sleep(1) # Adjust the delay as needed for continuous sending
except serial.SerialException as e:
print(f"Serial Error: {e}")
except KeyboardInterrupt:
print("\nProgram terminated by user.")Original plan

통신 개념 라즈베리파이가 상위 제어기로 주변 환경을 인식하고 STM32를 통해서 모터 제어를 수행하게 된다. 상호간 통신은 UART를 이용한다. UART 통신 구현을 위해서 UART communication protocol을 아래와 같이 정의 하였다. Communication protocol은 데이터를 주고 받는데 있어서의 규칙 또는 약속이다. 통신 보안을 위한 여러가지 방법이 있지만, Hash 같은 기능을 구현하려면 좀 더 고가의 마이컴이 필요하고, UART 방식은 간단하고 저렴해서 개인 프로젝트에서 구현하기에 용이하다.
아래표는 라즈베리파이와 STM32간 데이터를 주고 받는데 있어서의 규칙 또는 약속인 통신 프로토콜이다.

UART communication protocol Command table은 아래와 같다.
Command Length Data Direction Traction + Mower 0x11 3 Left, Right, Mower Forward 0x12 3 Left, Right, Mower Backward Traction 0x21 2 Left, Right Forward 0x22 2 Left, Right Backward Stop 0x23 2 stop Stop 파이썬 송신 코드
import serial
# Constants
STX = 0xFF
ETX = 0xFE
def calculate_checksum(cmd, length, data):
"""
Calculate the checksum using XOR.
"""
checksum = cmd ^ length
for byte in data:
checksum ^= byte
return checksum
def send_uart_command(serial_port, cmd, data):
"""
Send a UART command with the specified data structure.
:param serial_port: Opened serial port object
:param cmd: Command byte
:param data: List of data bytes
"""
length = len(data)
checksum = calculate_checksum(cmd, length, data)
packet = [STX, cmd, length] + data + [checksum, ETX]
# Convert packet to bytes and send
serial_port.write(bytearray(packet))
print(f"Sent packet: {[hex(byte) for byte in packet]}")
# Functions for individual commands
def send_traction_command(serial_port, cmd, data):
"""
Send a traction command with variable-length data.
:param serial_port: Serial port object
:param cmd: Command for traction operation (0x11, 0x12, 0x13)
:param data: List of data bytes
"""
send_uart_command(serial_port, cmd, data)
# Example usage
if __name__ == "__main__":
# configure UART settings
port_name = '/dev/serial0' # use '/dev/ttyS0' or '/dev/serial0'
baud_rate = 115200
try:
# Open serial connection
with serial.Serial(port_name, baud_rate, timeout=1) as ser:
print("Serial port opened successfully.")
# Send individual commands
send_traction_command(ser, 0x11, [30, 30, 30]) # Traction forward with data value 50
except serial.SerialException as e:
print(f"Error: {e}")STM32 코드는 아래와 같다.
/* USER CODE BEGIN 0 */
#define STX 0xFF
#define ETX 0xFE
#define MAX_PACKET_SIZE 256 // Maximum buffer size
uint8_t cmd = 0;
uint8_t len = 0;
uint8_t data[MAX_PACKET_SIZE];
uint8_t packet_buffer[MAX_PACKET_SIZE];
uint8_t rx_buffer[1];
uint8_t receiving_packet = 0;
uint8_t packet_index = 0;
uint8_t uart2_duty = 25; // traction and mower motor speed
// uint8_t key_value, n=0;
void process_packet(uint8_t *packet, uint8_t length);
uint8_t calculate_checksum(uint8_t cmd, uint8_t len, uint8_t *data);
uint8_t calculate_checksum(uint8_t cmd, uint8_t len, uint8_t *data) {
uint8_t checksum = cmd ^ len;
for (uint8_t i = 0; i < len; i++) {
checksum ^= data[i];
}
return checksum;
}
void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) {
if (huart->Instance == USART1) { // Check if UART1 triggered the callback
uint8_t received_byte = rx_buffer[0];
if (!receiving_packet) {
if (received_byte == STX) {
// Start of a new packet
receiving_packet = 1;
packet_index = 0;
packet_buffer[packet_index++] = received_byte;
}
} else {
packet_buffer[packet_index++] = received_byte;
// Check for the end of the packet
if (received_byte == ETX && packet_index > 5) {
// Extract global variables
cmd = packet_buffer[1];
len = packet_buffer[2];
memcpy(data, &packet_buffer[3], len);
uint8_t received_checksum = packet_buffer[3 + len];
uint8_t calculated_checksum = calculate_checksum(cmd, len, data);
// Print the packet_buffet if it receives a valid packet
if (received_checksum == calculated_checksum) {
process_packet(packet_buffer, packet_index);
}
// Reset for the next packet
receiving_packet = 0;
packet_index = 0;
}
// Prevent buffer overflow
if (packet_index >= MAX_PACKET_SIZE) {
receiving_packet = 0;
packet_index = 0;
}
}
// Restart UART1 reception
HAL_UART_Receive_IT(&huart1, rx_buffer, 1);
}
}
// Print the packet data to UART2 for printing
void process_packet(uint8_t *packet, uint8_t length) {
UART_Transmit("Packet Received: ");
for (uint8_t i = 0; i < length; i++) {
char buffer[10];
sprintf(buffer, "0x%02X ", packet[i]);
UART_Transmit(buffer);
}
UART_Transmit("\r\n");
}
/* USER CODE END 0 */
/**
* @brief The application entry point.
* @retval int
*/
int main(void)
{
/* USER CODE BEGIN 1 */
/* USER CODE END 1 */
/* MCU Configuration--------------------------------------------------------*/
/* Reset of all peripherals, Initializes the Flash interface and the Systick. */
HAL_Init();
/* USER CODE BEGIN Init */
/* USER CODE END Init */
/* Configure the system clock */
SystemClock_Config();
/* USER CODE BEGIN SysInit */
/* USER CODE END SysInit */
/* Initialize all configured peripherals */
MX_GPIO_Init();
MX_USART2_UART_Init();
MX_TIM1_Init();
MX_USART1_UART_Init();
/* USER CODE BEGIN 2 */
HAL_UART_Receive_IT(&huart1, rx_buffer, 1);
HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_1);
// HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_1); // turn on complementary channel
HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_2);
// HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_2); // turn on complementary channe2
HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_3);
// HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_3); // turn on complementary channe3
/* USER CODE END 2 */
/* Infinite loop */
/* USER CODE BEGIN WHILE */
motor_control_init();
while (1)
{
uint8_t direction = 1; // forward = 1, reverse = 0
HAL_GPIO_TogglePin(GPIOA, GPIO_PIN_5);
HAL_Delay(1000);
motor_control_process(cmd, len, data);
// Uart input
#if 0
// Usart input for motor speed
UART_Transmit("Enter a number between 0 and 100:\r\n");
int number = UART_ReceiveNumber();
if (number != -1) {
char message[50];
snprintf(message, sizeof(message), "You entered: %d\r\n", number);
UART_Transmit(message);
UART_Transmit("Enter another number between 0 and 100:\r\n");
}
uart2_duty = number;
#endif
/* USER CODE END WHILE */
/* USER CODE BEGIN 3 */
}
/* USER CODE END 3 */
}
while (1)
{
HAL_GPIO_TogglePin(GPIOA, GPIO_PIN_5);
HAL_Delay(1000);
// process_packet(packet_buffer, packet_index);
/* USER CODE END WHILE */
/* USER CODE BEGIN 3 */
}
/* USER CODE END 3 */
}
※ 참고자료
반응형'Autonomous Lawn Mower > Raspberry pi & STM32' 카테고리의 다른 글
IMU sensor (MPU-6500) python code with thonny (0) 2025.02.15 DMA를 이용한 2ch ADC (0) 2025.02.08 Systick callback function (1) 2025.02.01 라즈베리파이 RealVNC로 직접 연결 (2) 2025.01.11 라즈베리파이 연결방법 (2) 2024.12.24