ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 라즈베리파이 → STM32 uart를 통한 모터 제어
    Autonomous Lawn Mower/Raspberry pi & STM32 2025. 1. 5. 04:36
    반응형

    25년 4월 27일 update 

    Speed up/down 때문에 다시 업데이트 함. 


    Command Length Data (int8_t) Direction Remarks
    Left_rpm Right_rpm Mower_duty
    Traction + Mower 0x11 3 10 10 20 Go Initial
    +10 (0 ~ 110) +10 (0 ~ 110) 20 (0 ~ 100) Speed up  
    -10 (0 ~ 110) -10 (0 ~ 110) 20 (0 ~ 100) Speed down  
    0x12 3 -10 -10 20 Back Initial
    +10 (-110 ~ 0) +10 (-110 ~ 0) 20 (0 ~ 100) Speed up  
    -10 (-110 ~ 0) -10 (-110 ~ 0) 20 (0 ~ 100) Speed down  
    0x13 3 20 -10 20 Left  
    0x14 3 -10 20 20 Right  
    Stop 0x15 3 - - - Stop  

     

    import threading
    import serial
    import time

    # Configure UART settings
    port_STM32 = '/dev/serial0'  # Use '/dev/ttyS0' or '/dev/serial0'
    port_bluetooth = '/dev/ttyAMA2'

    # Open serial connections
    try:
        bleSerial = serial.Serial(port=port_bluetooth, baudrate=9600, timeout=1.0)
        ser = serial.Serial(port=port_STM32, baudrate=115200, timeout=1.0)
        print("Serial ports opened successfully.")
    except serial.SerialException as e:
        print(f"Error opening serial ports: {e}")
        exit(1)

    gData = ""

    # Constants
    STX = 0xFF
    ETX = 0xFE

    def serial_thread():
        global gData
        while True:
            try:
                data = bleSerial.readline()
                if data:
                    gData = data.decode().strip()
            except Exception as e:
                print(f"Serial thread error: {e}")

    def calculate_checksum(cmd, length, data):
        """Calculate the checksum using XOR."""
        checksum = cmd ^ length
        for byte in data:
            checksum ^= byte
        return checksum

    def twos_complement_8bit(value):
        """Calculate the 8-bit two's complement of an integer."""
        if value < 0:
            value = (1 << 8) + value
        return value & 0xFF

    def limit_value(value, min_value, max_value):
        """Limit positive values to [0, max_value] and negative values to [min_value, 0]."""
        if value > 0:
            return max(0, min(value, max_value))
        elif value < 0:
            return min(0, max(value, min_value))
        else:
            return 0

    def send_uart_command(serial_port, cmd, data):
        """Send a UART command with the specified data structure."""
        length = len(data)
        checksum = calculate_checksum(cmd, length, data)
        packet = bytearray([STX, cmd, length]) + data + bytearray([checksum, ETX])

        try:
            serial_port.write(packet)
            print(f"Sent packet: {[hex(byte) for byte in packet]}")
        except serial.SerialException as e:
            print(f"Error sending UART command: {e}")

    def send_traction_command(serial_port, cmd, left_rpm, right_rpm, mower_duty):
        """Send a traction command with RPM and mower duty."""
        try:
            left_rpm_byte = twos_complement_8bit(left_rpm)
            right_rpm_byte = twos_complement_8bit(right_rpm)
            mower_duty_byte = twos_complement_8bit(mower_duty)
            data = bytes([left_rpm_byte, right_rpm_byte, mower_duty_byte])
            send_uart_command(serial_port, cmd, data)
        except ValueError as e:
            print(f"Error preparing traction command: {e}")

    def main():
        global gData
        left_rpm = 0
        right_rpm = 0
        mower_duty = 50

        last_movement_command = ""

        try:
            while True:
                command = gData.lower()

                if command:
                    movement_command = ""

                    if "go" in command:
                        movement_command = "go"
                    elif "back" in command:
                        movement_command = "back"
                    elif "left" in command:
                        movement_command = "left"
                    elif "right" in command:
                        movement_command = "right"
                    elif "stop" in command:
                        movement_command = "stop"

                    if movement_command and movement_command != last_movement_command:
                        last_movement_command = movement_command
                        print(f"ok {movement_command}")

                        if movement_command == "go":
                            left_rpm = 10
                            right_rpm = 10
                            mower_duty = 20
                        elif movement_command == "back":
                            left_rpm = -10
                            right_rpm = -10
                            mower_duty = 20
                        elif movement_command == "left":
                            left_rpm = -20
                            right_rpm = 10
                            mower_duty = 0
                        elif movement_command == "right":
                            left_rpm = 10
                            right_rpm = -20
                            mower_duty = 0
                        elif movement_command == "stop":
                            left_rpm = 0
                            right_rpm = 0
                            mower_duty = 0

                    if last_movement_command in ["go", "back"]:
                        if "speed up" in command:
                            print("ok speed up")
                            if last_movement_command == "go":
                                left_rpm = min(left_rpm + 10, 110)
                                right_rpm = min(right_rpm + 10, 110)
                            elif last_movement_command == "back":
                                left_rpm = max(left_rpm - 10, -110)
                                right_rpm = max(right_rpm - 10, -110)
                        elif "speed down" in command:
                            print("ok speed down")
                            if last_movement_command == "go":
                                left_rpm = max(left_rpm - 10, 0)
                                right_rpm = max(right_rpm - 10, 0)
                            elif last_movement_command == "back":
                                left_rpm = min(left_rpm + 10, 0)
                                right_rpm = min(right_rpm + 10, 0)

                    # Clear the command after processing
                    gData = ""

                    # Limit RPM and mower duty cycle
                    left_rpm = limit_value(left_rpm, -110, 110)
                    right_rpm = limit_value(right_rpm, -110, 110)
                    mower_duty = limit_value(mower_duty, -100, 100)

                    send_traction_command(ser, 0x15, left_rpm, right_rpm, mower_duty)

                time.sleep(0.05)
        except serial.SerialException as e:
            print(f"Serial Error: {e}")
        except KeyboardInterrupt:
            print("\nProgram terminated by user.")
        finally:
            bleSerial.close()
            ser.close()
            print("Serial ports closed.")

    if __name__ == '__main__':
        task1 = threading.Thread(target=serial_thread, daemon=True)
        task1.start()
        main()

    25년 4월 26일 update 

    로봇이 좌우로 움직이는 상황을 고려해서 코드를 다시 업데이트 했다. 좌우 트랙션 모터의 RPM과 Mower 모터의 회전 방향을 정하기 위해서 라즈베리파이에서 int8_t를 가지는 data를 전송한다. 이때 data는 2의 보수를 이용하여 라즈베리파이에서 전송하고, STM32 마이크로프로세서에서 reverse 2의 보수를 이용하여 데이터 값을 확인한다. data값이 +이면 트랙션 모터는 forward로 이동, -이면 트랙션 모터는 reverse로 이동한다. data값이 +이면 Mower 모터는 오른쪽으로 이동하고, - 값이면 왼쪽으로 이동한다. 

     

    명령 테이블은 아래와 같이 업데이트 되었다. 

      Command Length Data Direction
    Traction + Mower 0x11 3 Left_rpm, Right_rpm, Mower_duty
    - (int8_t) Left_rpm, Right_rpm: -110 to 110
    - (int8_t) mower_duty: -100 to 100
    ※ negative value: reverse, positive value: forward
    Go
    Stop 0x15 3 Left_rpm, Right_rpm, Mower_duty Stop

     

    라즈베리파이의 파이썬 코드는 기존 코드에서 2의 보수 부분이 추가되었다. STM32 코드는 uart.c 내에

    void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) 내에 reverse 2의 보수부분이 추가되었다. 

    https://github.com/Raymond-jang/lawn_mower/tree/master/16_uart_motor_encoder_rpm_ultrasonic_adc

     

    import serial
    import time

    # Constants
    STX = 0xFF
    ETX = 0xFE

    def calculate_checksum(cmd, length, data):
        """
        Calculate the checksum using XOR.
        """
        checksum = cmd ^ length
        for byte in data:
            checksum ^= byte
        return checksum

    def twos_complement_8bit(value):
        """ Calculate the 8bit two's complement of an integer. """
        if value < 0:
            value = (1<<8) + value # add 256 to wrap into 8 bits
        return value & 0xFF # ensure it stays 8 bits
        

    def send_uart_command(serial_port, cmd, data):
        """
        Send a UART command with the specified data structure.
        :param serial_port: Opened serial port object
        :param cmd: Command byte
        :param data: List of data bytes (already in the desired format)
        """
        length = len(data)
        checksum = calculate_checksum(cmd, length, data)
        packet = [STX, cmd, length] + list(data) + [checksum, ETX]  # Ensure data is a list of integers

        # Convert packet to bytes and send
        serial_port.write(bytearray(packet))
        print(f"Sent packet: {[hex(byte) for byte in packet]}")

    def send_traction_command(serial_port, cmd, left_rpm, right_rpm, mower_duty):
        """
        Send a traction command with RPM and mower duty as 8-bit two's complement.
        :param serial_port: Serial port object
        :param cmd: Command for traction operation (0x11, 0x12, 0x13)
        :param left_rpm: Desired RPM for the left motor (integer).
        :param right_rpm: Desired RPM for the right motor (integer).
        :param mower_duty: Mower duty cycle value (integer).
        """
        try:
            left_rpm_byte = twos_complement_8bit(left_rpm)
            right_rpm_byte = twos_complement_8bit(right_rpm)
            mower_duty_byte = twos_complement_8bit(mower_duty)  # Assuming mower_duty also needs 2's complement
            data = bytes([left_rpm_byte, right_rpm_byte, mower_duty_byte])
            send_uart_command(serial_port, cmd, data)
        except ValueError as e:
            print(f"Error: {e}")

    def limit_value(value, min_value=-128, max_value=127):
        """Limits a given value to the specified range."""
        return max(min(value, max_value), min_value)


    # Example usage with infinite loop
    if __name__ == "__main__":
        # Configure UART settings
        port_name = '/dev/serial0'  # Use '/dev/ttyS0' or '/dev/serial0'
        baud_rate = 115200

        try:
            # Open serial connection
            with serial.Serial(port_name, baud_rate, timeout=1) as ser:
                print("Serial port opened successfully.")

                # while True:
                left_rpm = 130
                right_rpm = -130
                mower_duty = 50

                # Limiting the values
                left_rpm = limit_value(left_rpm)
                right_rpm = limit_value(right_rpm)
                mower_duty = limit_value(mower_duty)

                send_traction_command(ser, 0x15, left_rpm, right_rpm, mower_duty)  # Example RPM and duty values
                time.sleep(1)  # Adjust the delay as needed for continuous sending

        except serial.SerialException as e:
            print(f"Serial Error: {e}")
        except KeyboardInterrupt:
            print("\nProgram terminated by user.")

     

    Original plan

    통신 개념

    라즈베리파이가 상위 제어기로 주변 환경을 인식하고 STM32를 통해서 모터 제어를 수행하게 된다. 상호간 통신은 UART를 이용한다. UART 통신 구현을 위해서 UART communication protocol을 아래와 같이 정의 하였다. Communication protocol은 데이터를 주고 받는데 있어서의 규칙 또는 약속이다. 통신 보안을 위한 여러가지 방법이 있지만, Hash 같은 기능을 구현하려면 좀 더 고가의 마이컴이 필요하고, UART 방식은 간단하고 저렴해서 개인 프로젝트에서 구현하기에 용이하다.

     

    아래표는 라즈베리파이와 STM32간 데이터를 주고 받는데 있어서의 규칙 또는 약속인 통신 프로토콜이다.

    UART communication protocol

     

    Command table은 아래와 같다.

      Command Length Data Direction
    Traction + Mower 0x11 3 Left, Right, Mower Forward
    0x12 3 Left, Right, Mower Backward
    Traction 0x21 2 Left, Right Forward
    0x22 2 Left, Right Backward
    Stop 0x23 2 stop Stop

     

    파이썬 송신 코드 

    import serial

    # Constants
    STX = 0xFF
    ETX = 0xFE

    def calculate_checksum(cmd, length, data):
        """
        Calculate the checksum using XOR.
        """
        checksum = cmd ^ length
        for byte in data:
            checksum ^= byte
        return checksum

    def send_uart_command(serial_port, cmd, data):
        """
        Send a UART command with the specified data structure.
        
        :param serial_port: Opened serial port object
        :param cmd: Command byte
        :param data: List of data bytes
        """
        length = len(data)
        checksum = calculate_checksum(cmd, length, data)
        packet = [STX, cmd, length] + data + [checksum, ETX]
        
        # Convert packet to bytes and send
        serial_port.write(bytearray(packet))
        print(f"Sent packet: {[hex(byte) for byte in packet]}")

    # Functions for individual commands
    def send_traction_command(serial_port, cmd, data):
        """
        Send a traction command with variable-length data.
        
        :param serial_port: Serial port object
        :param cmd: Command for traction operation (0x11, 0x12, 0x13)
        :param data: List of data bytes
        """
        send_uart_command(serial_port, cmd, data)


    # Example usage
    if __name__ == "__main__":
        # configure UART settings
        port_name = '/dev/serial0' # use '/dev/ttyS0' or '/dev/serial0'
        baud_rate = 115200
        
        try:
            # Open serial connection
            with serial.Serial(port_name, baud_rate, timeout=1) as ser:
                print("Serial port opened successfully.")
                
                # Send individual commands
                send_traction_command(ser, 0x11, [30, 30, 30])          # Traction forward with data value 50
        
        except serial.SerialException as e:
            print(f"Error: {e}")

     

     

    STM32 코드는 아래와 같다. 

    /* USER CODE BEGIN 0 */

    #define STX 0xFF

    #define ETX 0xFE

    #define MAX_PACKET_SIZE 256 // Maximum buffer size

     

     

    uint8_t cmd = 0;

    uint8_t len = 0;

    uint8_t data[MAX_PACKET_SIZE];

    uint8_t packet_buffer[MAX_PACKET_SIZE];

    uint8_t rx_buffer[1];

    uint8_t receiving_packet = 0;

    uint8_t packet_index = 0;

     

    uint8_t uart2_duty = 25; // traction and mower motor speed

     

     

    // uint8_t key_value, n=0;

     

     

    void process_packet(uint8_t *packet, uint8_t length);

    uint8_t calculate_checksum(uint8_t cmd, uint8_t len, uint8_t *data);

     

    uint8_t calculate_checksum(uint8_t cmd, uint8_t len, uint8_t *data) {

    uint8_t checksum = cmd ^ len;

    for (uint8_t i = 0; i < len; i++) {

    checksum ^= data[i];

    }

    return checksum;

    }

     

    void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) {

     

    if (huart->Instance == USART1) { // Check if UART1 triggered the callback

    uint8_t received_byte = rx_buffer[0];

     

    if (!receiving_packet) {

    if (received_byte == STX) {

    // Start of a new packet

    receiving_packet = 1;

    packet_index = 0;

    packet_buffer[packet_index++] = received_byte;

    }

    } else {

    packet_buffer[packet_index++] = received_byte;

     

    // Check for the end of the packet

    if (received_byte == ETX && packet_index > 5) {

    // Extract global variables

    cmd = packet_buffer[1];

    len = packet_buffer[2];

    memcpy(data, &packet_buffer[3], len);

     

    uint8_t received_checksum = packet_buffer[3 + len];

    uint8_t calculated_checksum = calculate_checksum(cmd, len, data);

     

    // Print the packet_buffet if it receives a valid packet

    if (received_checksum == calculated_checksum) {

    process_packet(packet_buffer, packet_index);

    }

    // Reset for the next packet

    receiving_packet = 0;

    packet_index = 0;

    }

     

    // Prevent buffer overflow

    if (packet_index >= MAX_PACKET_SIZE) {

    receiving_packet = 0;

    packet_index = 0;

    }

    }

     

    // Restart UART1 reception

    HAL_UART_Receive_IT(&huart1, rx_buffer, 1);

    }

    }

     

    // Print the packet data to UART2 for printing

    void process_packet(uint8_t *packet, uint8_t length) {

     

    UART_Transmit("Packet Received: ");

     

    for (uint8_t i = 0; i < length; i++) {

    char buffer[10];

    sprintf(buffer, "0x%02X ", packet[i]);

    UART_Transmit(buffer);

    }

    UART_Transmit("\r\n");

    }

    /* USER CODE END 0 */

     

    /**

    * @brief The application entry point.

    * @retval int

    */

    int main(void)

    {

     

    /* USER CODE BEGIN 1 */

     

    /* USER CODE END 1 */

     

    /* MCU Configuration--------------------------------------------------------*/

     

    /* Reset of all peripherals, Initializes the Flash interface and the Systick. */

    HAL_Init();

     

    /* USER CODE BEGIN Init */

     

    /* USER CODE END Init */

     

    /* Configure the system clock */

    SystemClock_Config();

     

    /* USER CODE BEGIN SysInit */

     

    /* USER CODE END SysInit */

     

    /* Initialize all configured peripherals */

    MX_GPIO_Init();

    MX_USART2_UART_Init();

    MX_TIM1_Init();

    MX_USART1_UART_Init();

    /* USER CODE BEGIN 2 */

    HAL_UART_Receive_IT(&huart1, rx_buffer, 1);

     

    HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_1);

    // HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_1); // turn on complementary channel

    HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_2);

    // HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_2); // turn on complementary channe2

    HAL_TIM_PWM_Start(&htim1, TIM_CHANNEL_3);

    // HAL_TIMEx_PWMN_Start(&htim1, TIM_CHANNEL_3); // turn on complementary channe3

    /* USER CODE END 2 */

     

    /* Infinite loop */

    /* USER CODE BEGIN WHILE */

     

    motor_control_init();

     

    while (1)

    {

    uint8_t direction = 1; // forward = 1, reverse = 0

     

    HAL_GPIO_TogglePin(GPIOA, GPIO_PIN_5);

    HAL_Delay(1000);

     

    motor_control_process(cmd, len, data);

     

    // Uart input

    #if 0

    // Usart input for motor speed

    UART_Transmit("Enter a number between 0 and 100:\r\n");

     

    int number = UART_ReceiveNumber();

     

    if (number != -1) {

    char message[50];

    snprintf(message, sizeof(message), "You entered: %d\r\n", number);

    UART_Transmit(message);

    UART_Transmit("Enter another number between 0 and 100:\r\n");

    }

    uart2_duty = number;

    #endif

    /* USER CODE END WHILE */

     

    /* USER CODE BEGIN 3 */

    }

    /* USER CODE END 3 */

    }

    while (1)

    {

    HAL_GPIO_TogglePin(GPIOA, GPIO_PIN_5);

    HAL_Delay(1000);

     

    // process_packet(packet_buffer, packet_index);

     

    /* USER CODE END WHILE */

     

    /* USER CODE BEGIN 3 */

    }

    /* USER CODE END 3 */

    }

     

     

     

     

    ※ 참고자료

    https://www.youtube.com/watch?v=0LhKIVuH6nY

    반응형
Designed by Tistory.