Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected Data Output from MCP2515 on Raspberry Pi 4 with Bookworm Lite (64-bit) #27

Open
fly-robin-fly opened this issue Aug 18, 2024 · 4 comments

Comments

@fly-robin-fly
Copy link

Issue Description:

I'm encountering an issue while executing the simple test code from the documentation. The output I'm receiving is not what I expected.

Setup:

Hardware: Raspberry Pi 4 with Bookworm Lite (64-bit)
Connection: MCP2515 connected via a logic level converter (also tested without the converter, same result)

Expected Output:

Data: "adafruit"
Message ID: 0x1234ABCD
Actual Output:

actual output

Send success: True
1 message available
Message from  0x121f1f4d
message data: bytearray(b'\xa3\xf5\xc7\x9e?\xef\xf9L')

Send success: True
1 message available
Message from  0x1205624d
message data: bytearray(b'\xd4v\xf7\\?\xe6\xb5L')

(repeats several times with the same unexpected data)

Additionally, when I connect another CAN device to the MCP2515 (without loopback mode), I encounter the same result as described in this GitHub issue: Issue #770.

Has anyone else faced a similar issue or have any insights on what might be going wrong? Any help would be greatly appreciated. Thanks in advance!

@tbengani
Copy link

tbengani commented Dec 3, 2024

Have you been able to fix this issue? I've been having this problem as well

@fly-robin-fly
Copy link
Author

@tbengani I resolved the issue by developing my own Python driver for the MCP2515, which is now working well.

    import spidev  # type: ignore
    import RPi.GPIO as GPIO  # type: ignore
    # pylint: enable=import-error

    GPIO.setwarnings(False)  # Disable GPIO warnings

    # MCP2515 Registers and Commands
    MCP2515_RESET = 0xC0
    MCP2515_READ = 0x03
    MCP2515_WRITE = 0x02
    MCP2515_BIT_MODIFY = 0x05
    MCP2515_READ_STATUS = 0xA0

    # MCP2515 CANCTRL Register Values
    MODE_NORMAL = 0x00
    MODE_LOOPBACK = 0x40
    MODE_CONFIG = 0x80

    class MCP2515:
        def __init__(self, spi_channel=0, spi_speed=100000, cs_pin=5, retries=3, timeout=1.0):
            self.spi = spidev.SpiDev()
            self.cs_pin = cs_pin
            self.retries = retries
            self.timeout = timeout

            try:
                self.spi.open(0, spi_channel)
                self.spi.max_speed_hz = spi_speed
            except Exception as e:
                raise RuntimeError(f"Failed to initialize SPI: {e}") from e

            GPIO.setmode(GPIO.BCM)
            GPIO.setup(cs_pin, GPIO.OUT)
            GPIO.output(cs_pin, GPIO.HIGH)
            self.reset()
            self.configure_baud_rate()

        def configure_baud_rate(self):
            # Set the baud rate to 250000 in configuration registers
            self.write_register(0x2A, 0x00)
            self.write_register(0x29, 0xB1)
            self.write_register(0x28, 0x05)

        def retry_operation(self, operation, *args, **kwargs):
            last_exception = None
            start_time = time.time()
            for _ in range(self.retries):
                try:
                    return operation(*args, **kwargs)
                except Exception as e:  # pylint: disable=broad-exception-caught
                    last_exception = e
                    if time.time() - start_time > self.timeout:
                        break
                    time.sleep(0.1)  # Small delay before retrying
            raise RuntimeError(f"""Operation failed after {
                               self.retries} retries: {last_exception}""")

        def reset(self):
            def _reset():
                GPIO.output(self.cs_pin, GPIO.LOW)
                self.spi.xfer([MCP2515_RESET])
                GPIO.output(self.cs_pin, GPIO.HIGH)
                time.sleep(0.1)  # Wait for the reset to complete

            self.retry_operation(_reset)

        def read_register(self, address):
            def _read_register(address):
                GPIO.output(self.cs_pin, GPIO.LOW)
                self.spi.xfer([MCP2515_READ, address])
                result = self.spi.xfer([0x00])[0]
                GPIO.output(self.cs_pin, GPIO.HIGH)
                return result

            return self.retry_operation(_read_register, address)

        def write_register(self, address, value):
            def _write_register(address, value):
                GPIO.output(self.cs_pin, GPIO.LOW)
                self.spi.xfer([MCP2515_WRITE, address, value])
                GPIO.output(self.cs_pin, GPIO.HIGH)

            self.retry_operation(_write_register, address, value)

        def bit_modify(self, address, mask, value):
            def _bit_modify(address, mask, value):
                GPIO.output(self.cs_pin, GPIO.LOW)
                self.spi.xfer([MCP2515_BIT_MODIFY, address, mask, value])
                GPIO.output(self.cs_pin, GPIO.HIGH)

            self.retry_operation(_bit_modify, address, mask, value)

        def set_mode(self, mode):
            def _set_mode(mode):
                self.write_register(0x0F, mode)
                # Verify the mode was set correctly
                current_mode = self.read_register(0x0F) & 0xE0
                if current_mode != mode:
                    raise RuntimeError(
                        f"Failed to set MCP2515 mode to {hex(mode)}")

            self.retry_operation(_set_mode, mode)

        def set_normal_mode(self):
            self.set_mode(MODE_NORMAL)

        def set_loopback_mode(self):
            self.set_mode(MODE_LOOPBACK)

        def read_status(self):
            def _read_status():
                GPIO.output(self.cs_pin, GPIO.LOW)
                status = self.spi.xfer([MCP2515_READ_STATUS, 0x00])[1]
                GPIO.output(self.cs_pin, GPIO.HIGH)
                return status

            return self.retry_operation(_read_status)

        def send_message(self, can_id, data, timeout=0.2):
            if not 0 <= can_id <= 0x7FF:
                raise ValueError("CAN ID must be 11 bits (0x000 to 0x7FF)")
            if len(data) > 8:
                raise ValueError("CAN data length must be 8 bytes or less")

            def _send_message(can_id, data):
                # Load TX buffer with CAN ID (11 bits)
                sid_high = (can_id >> 3) & 0xFF  # Higher 8 bits of CAN ID
                sid_low = (can_id << 5) & 0xE0   # Lower 3 bits of CAN ID

                # Set the standard identifier (SID) registers
                self.write_register(0x31, sid_high)  # TXB0SIDH
                self.write_register(0x32, sid_low)   # TXB0SIDL

                # Set the Data Length Code (DLC)
                dlc = len(data)
                self.write_register(0x35, dlc)  # TXB0DLC

                # Load data into the TX buffer
                for i, value in enumerate(data):
                    self.write_register(0x36 + i, value)

                # Request to send
                # Set TXREQ to start transmission
                self.bit_modify(0x30, 0x08, 0x08)

            self.retry_operation(_send_message, can_id, data)

            # Wait for transmission to complete
            start_time = time.time()
            while self.read_register(0x30) & 0x08:  # TXB0CTRL - TXREQ bit
                if time.time() - start_time > timeout:
                    # Abort the transmission if it's taking too long
                    # ABAT bit in CANCTRL register
                    self.bit_modify(0x0F, 0x10, 0x10)
                    raise RuntimeError("Transmission failed.")
                time.sleep(0.01)  # Small delay to prevent busy-waiting

            # Clear TX interrupt flag
            self.bit_modify(0x2C, 0x1C, 0x00)  # Clear TXnIF flags

        def read_message(self, timeout=1.0):
            start_time = time.time()
            while time.time() - start_time < timeout:
                status = self.read_status()
                if status & 0x01:  # Check if RX0IF is set
                    try:
                        id_high = self.read_register(0x61)
                        id_low = self.read_register(0x62)
                        length = self.read_register(0x65) & 0x0F
                        data = []
                        for i in range(length):
                            data.append(self.read_register(0x66 + i))
                        self.bit_modify(0x2C, 0x01, 0x00)  # Clear RX0IF
                        return (id_high << 3) | (id_low >> 5), data
                    except Exception as e:
                        raise RuntimeError(
                            f"Failed to read CAN message: {e}") from e
            return None, None  # Return None if no message is received within timeout

        def shutdown(self):
            def _shutdown():
                # Set MCP2515 to configuration mode before shutdown
                self.set_mode(MODE_CONFIG)
                self.spi.close()  # Close SPI interface
                GPIO.cleanup(self.cs_pin)  # Cleanup GPIO
            self.retry_operation(_shutdown)

@tbengani
Copy link

tbengani commented Dec 3, 2024

thanks so much! ill test it soon. for my own understanding, did you figure out why the library wasn't working on the Pi 4?

@fly-robin-fly
Copy link
Author

Unfortunately, no. I bought an oscilloscope and identified some major issues on the bus. Developing my own driver and gaining a better understanding of what was happening turned out to be the best solution for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants