diff --git a/adi/rx_tx.py b/adi/rx_tx.py index 36109dcc5..7a3585177 100644 --- a/adi/rx_tx.py +++ b/adi/rx_tx.py @@ -223,6 +223,29 @@ def _rx_init_channels(self): for m in self.rx_enabled_channels: v = self._rxadc.find_channel(self._rx_channel_names[m]) v.enabled = True + + # Make sure data fits into alignment requirement + if "length_align_bytes" in self._rxadc.buffer_attrs: + lab = int(self._rxadc.buffer_attrs["length_align_bytes"].value) + cbits = [ + chan.data_format.length for chan in self._rxadc.channels if chan.enabled + ] + bytes_per_sample = ( + max(cbits) // 8 + ) # Assume all channels have the same sample size + rx_data_size = ( + self.__rx_buffer_size * self._num_rx_channels_enabled * bytes_per_sample + ) + if rx_data_size % lab != 0: + print("Warning: RX buffer size does not fit into alignment requirement") + samples = lab / bytes_per_sample + if self._complex_data: + samples = samples / 2 + print( + f"rx_buffer_size must be a multiple of {samples} samples " + "for optimal performance" + ) + self.__rxbuf = iio.Buffer(self._rxadc, self.__rx_buffer_size, False) def __rx_unbuffered_data(self): @@ -451,6 +474,24 @@ def _tx_init_channels(self): self._txdac, self._tx_buffer_size, self.__tx_cyclic_buffer ) + def _check_alignment(self, data): + lab = int(self._txdac.buffer_attrs["length_align_bytes"].value) + cbits = [ + chan.data_format.length for chan in self._txdac.channels if chan.enabled + ] + bytes_per_sample = ( + max(cbits) // 8 + ) # Assume all channels have the same sample size + if len(bytearray(data)) % lab != 0: + print("Warning: Data length does not fit into alignment requirement") + samples = lab / bytes_per_sample + if self._complex_data: + samples = samples / 2 + print( + f"Data must be a multiple of {samples} samples " + "for optimal performance" + ) + def tx(self, data_np=None): """Transmit data to hardware buffers for each channel index in tx_enabled_channels. @@ -514,6 +555,10 @@ def tx(self, data_np=None): self._tx_buffer_size = len(data) // stride self._tx_init_channels() + # Make sure data fits into alignment requirement + if "length_align_bytes" in self._txdac.buffer_attrs: + self._check_alignment(data) + if len(data) // stride != self._tx_buffer_size: raise Exception( "Buffer length different than data length. "