diff --git a/adafruit_mcp2515/__init__.py b/adafruit_mcp2515/__init__.py index b6a54ed..9d7e6f4 100644 --- a/adafruit_mcp2515/__init__.py +++ b/adafruit_mcp2515/__init__.py @@ -25,10 +25,12 @@ """ from collections import namedtuple -from struct import unpack_from, pack_into +from struct import pack_into, unpack_from from time import sleep -from micropython import const + from adafruit_bus_device import spi_device +from micropython import const + from .canio import * from .timer import Timer @@ -99,7 +101,6 @@ # Filters & Masks _RXM0SIDH = const(0x20) _RXM1SIDH = const(0x24) -MASKS = [_RXM0SIDH, _RXM1SIDH] _RXF0SIDH = const(0x00) _RXF1SIDH = const(0x04) @@ -107,7 +108,7 @@ _RXF3SIDH = const(0x10) _RXF4SIDH = const(0x14) _RXF5SIDH = const(0x18) -FILTERS = [[_RXF0SIDH, _RXF1SIDH], [_RXF2SIDH, _RXF3SIDH, _RXF4SIDH, _RXF5SIDH]] + # bits/flags _RX0IF = const(0x01) _RX1IF = const(0x02) @@ -309,14 +310,24 @@ def __init__( self._tx_buffers = [] self._rx0_overflow = False self._rx1_overflow = False - self._masks_in_use = [] - self._filters_in_use = [[], []] self._mode = None self._bus_state = BusState.ERROR_ACTIVE self._baudrate = baudrate self._crystal_freq = crystal_freq self._loopback = loopback self._silent = silent + self._masks_filters = { + _RXM0SIDH: [None, { + _RXF0SIDH: None, + _RXF1SIDH: None + }], + _RXM1SIDH: [None, { + _RXF2SIDH: None, + _RXF3SIDH: None, + _RXF4SIDH: None, + _RXF5SIDH: None + }] + } self._init_buffers() self.initialize() @@ -475,7 +486,6 @@ def _read_from_rx_buffers(self): self._read_rx_buffer(_READ_RX1) def _write_message(self, tx_buffer, message_obj): - if tx_buffer is None: raise RuntimeError("No transmit buffer available to send") if isinstance(message_obj, RemoteTransmissionRequest): @@ -526,7 +536,6 @@ def _write_message(self, tx_buffer, message_obj): # TODO: Priority def _start_transmit(self, tx_buffer): - # self._buffer[0] = tx_buffer.SEND_CMD with self._bus_device_obj as spi: spi.write_readinto( @@ -538,13 +547,8 @@ def _start_transmit(self, tx_buffer): in_end=1, ) - def _set_filter_register(self, filter_index, mask, extended): - filter_reg_addr = FILTERS[filter_index] - self._write_id_to_register(filter_reg_addr, mask, extended) - - def _set_mask_register(self, mask_index, mask, extended): - mask_reg_addr = MASKS[mask_index] - self._write_id_to_register(mask_reg_addr, mask, extended) + def _set_acceptance_register(self, register, value, is_extended): + self._write_id_to_register(register, value, is_extended) @staticmethod def _unload_ids(raw_ids): @@ -774,45 +778,72 @@ def _get_bus_status(self): else: self._bus_state = BusState.ERROR_ACTIVE - def _create_mask(self, match): - mask = match.mask - if mask == 0: - if match.extended: - mask = EXTID_BOTTOM_29_MASK - else: - mask = STDID_BOTTOM_11_MASK - - masks_used = len(self._masks_in_use) - if masks_used < len(MASKS): - next_mask_index = masks_used - - self._set_mask_register(next_mask_index, mask, match.extended) - self._masks_in_use.append(MASKS[next_mask_index]) - return next_mask_index + def _find_mask_and_filter(self, match_mask: int): + """Finds the optimal mask and filter registers for the given mask. - raise RuntimeError("No Masks Available") + Returns a tuple of (mask, filter) registers. + Returns `None` if no mask and filter are available. - def _create_filter(self, match, mask_index): - - next_filter_index = len(self._filters_in_use[mask_index]) - if next_filter_index == len(FILTERS[mask_index]): - raise RuntimeError("No Filters Available") - - filter_register = FILTERS[mask_index][next_filter_index] + Args: + mask (int): The mask value to fit. + """ - self._write_id_to_register(filter_register, match.address, match.extended) - self._filters_in_use[mask_index].append(filter_register) + mask: int = None + filt: int = None + + # First try to find a matching mask with a free filter + for mask_reg, [mask_val, filts] in self._masks_filters.items(): + if mask_val == match_mask: + # Matching mask, look for a free filter + for filt_reg, filt_val in filts.items(): + if not filt_val: + # Free filter + mask = mask_reg + filt = filt_reg + break + + if mask: + # We're done here + break + + if mask is None: + # Then try to find a free mask + for mask_reg, [mask_val, filts] in self._masks_filters.items(): + if mask_val is None: + mask = mask_reg + filt = next(iter(filts.keys())) + break + + return (mask, filt) if mask is not None else None + + def _create_mask_and_filter(self, match: Match): + actual_mask = match.mask + if match.mask == 0: + actual_mask = EXTID_BOTTOM_29_MASK if match.extended \ + else STDID_BOTTOM_11_MASK + + result = self._find_mask_and_filter(actual_mask) + if not result: + raise Exception( + "No mask and filter is available for " + f"mask 0x{actual_mask:03x}, addr 0x{match.address:03x}" + ) + + (mask, filt) = result + self._set_acceptance_register(mask, actual_mask, match.extended) + self._set_acceptance_register(filt, match.address, match.extended) + self._masks_filters[mask][0] = actual_mask + self._masks_filters[mask][1][filt] = match.address def deinit_filtering_registers(self): """Clears the Receive Mask and Filter Registers""" - - for mask_index, mask_reg in enumerate(MASKS): + for mask_reg, mask_data in self._masks_filters.items(): self._set_register(mask_reg, 0) + mask_data[0] = None # Mask value + for filt_reg in mask_data[1]: + self._set_register(filt_reg, 0) + mask_data[1][filt_reg] = None - for filter_reg in FILTERS[mask_index]: - self._set_register(filter_reg, 0) - self._masks_in_use = [] - self._filters_in_use = [[], []] ######## CANIO API METHODS ############# @property @@ -903,19 +934,16 @@ def listen(self, matches=None, *, timeout: float = 10): `silent`==`True` and `loopback` == `False`" ) + match = None for match in matches: self._dbg("match:", match) - mask_index_used = self._create_mask(match) - self._create_filter(match, mask_index=mask_index_used) - - used_masks = len(self._masks_in_use) - # if matches were made and there are unused masks - # set the unused masks to prevent them from leaking packets - if len(matches) > 0 and used_masks < len(MASKS): - next_mask_index = used_masks - for idx in range(next_mask_index, len(MASKS)): - print("using unused mask index:", idx) - self._create_mask(matches[-1]) + self._create_mask_and_filter(match) + + for mask_reg, [mask_val, _] in self._masks_filters.items(): + if mask_val is None and match is not None: + # Mask unused, set it to a match to prevent leakage + self._set_acceptance_register(mask_reg, match.mask, match.extended) + return Listener(self, timeout)