diff --git a/src/urh/dev/native/Device.py b/src/urh/dev/native/Device.py index 217e20df..9697a441 100644 --- a/src/urh/dev/native/Device.py +++ b/src/urh/dev/native/Device.py @@ -9,6 +9,7 @@ import time import psutil from PyQt5.QtCore import QObject, pyqtSignal +from multiprocessing import Pipe from urh.util.Formatter import Formatter from urh.util.Logger import logger @@ -40,7 +41,7 @@ class Device(QObject): self.error_codes = {} self.errors = set() - self.queue = Queue() + self.parent_conn, self.child_conn = Pipe() self.send_buffer = None self.send_buffer_reader = None @@ -66,10 +67,10 @@ class Device(QObject): self.sendbuffer_thread.daemon = True self.sendbuffer_thread.start() - def _start_readqueue_thread(self): - self.read_queue_thread = threading.Thread(target=self.read_receiving_queue) - self.read_queue_thread.daemon = True - self.read_queue_thread.start() + def _start_read_rcv_buffer_thread(self): + self.read_recv_buffer_thread = threading.Thread(target=self.read_receiving_queue) + self.read_recv_buffer_thread.daemon = True + self.read_recv_buffer_thread.start() def init_recv_buffer(self): if self.receive_buffer is None: @@ -97,7 +98,6 @@ class Device(QObject): self.errors.add(err) logger.error(err) - @property def received_data(self): return self.receive_buffer[:self.current_recv_index] @@ -245,33 +245,34 @@ class Device(QObject): def read_receiving_queue(self): while self.is_receiving: try: - # TODO: Do not use queue, but ctx in callback and just recv from Pipe? - while not self.queue.empty(): - byte_buffer = self.queue.get() + byte_buffer = self.parent_conn.recv_bytes() - nsamples = len(byte_buffer) // self.BYTES_PER_SAMPLE - if nsamples > 0: - if self.current_recv_index + nsamples >= len(self.receive_buffer): - if self.is_ringbuffer: - self.current_recv_index = 0 - if nsamples >= len(self.receive_buffer): - logger.warning("Receive buffer too small, skipping {0:d} samples".format(nsamples-len(self.receive_buffer))) - nsamples = len(self.receive_buffer) - 1 + nsamples = len(byte_buffer) // self.BYTES_PER_SAMPLE + if nsamples > 0: + if self.current_recv_index + nsamples >= len(self.receive_buffer): + if self.is_ringbuffer: + self.current_recv_index = 0 + if nsamples >= len(self.receive_buffer): + logger.warning("Receive buffer too small, skipping {0:d} samples".format(nsamples-len(self.receive_buffer))) + nsamples = len(self.receive_buffer) - 1 - else: - self.stop_rx_mode("Receiving buffer is full {0}/{1}".format(self.current_recv_index + nsamples, len(self.receive_buffer))) - return + else: + self.stop_rx_mode("Receiving buffer is full {0}/{1}".format(self.current_recv_index + nsamples, len(self.receive_buffer))) + return - end = nsamples*self.BYTES_PER_SAMPLE - self.receive_buffer[self.current_recv_index:self.current_recv_index + nsamples] = \ - self.unpack_complex(byte_buffer[:end], nsamples) + end = nsamples*self.BYTES_PER_SAMPLE + self.receive_buffer[self.current_recv_index:self.current_recv_index + nsamples] = \ + self.unpack_complex(byte_buffer[:end], nsamples) - old_index = self.current_recv_index - self.current_recv_index += nsamples + old_index = self.current_recv_index + self.current_recv_index += nsamples - self.rcv_index_changed.emit(old_index, self.current_recv_index) + self.rcv_index_changed.emit(old_index, self.current_recv_index) except BrokenPipeError: pass + except EOFError: + logger.info("EOF Error: Ending receive thread") + break time.sleep(0.01) @@ -332,10 +333,9 @@ class Device(QObject): else: logger.info("Skipped {0:d} samples in sending".format(len(self.samples_to_send) - self.current_sent_sample)) - def callback_recv(self, buffer): try: - self.queue.put(buffer) + self.child_conn.send_bytes(buffer) except BrokenPipeError: pass return 0 diff --git a/src/urh/dev/native/HackRF.py b/src/urh/dev/native/HackRF.py index fbaa7dbb..e23dcf3e 100644 --- a/src/urh/dev/native/HackRF.py +++ b/src/urh/dev/native/HackRF.py @@ -77,7 +77,7 @@ class HackRF(Device): if self.is_receiving: logger.info("HackRF: Starting receiving thread") - self._start_readqueue_thread() + self._start_read_rcv_buffer_thread() self.log_retcode(ret, "start_rx_mode") else: @@ -88,9 +88,9 @@ class HackRF(Device): logger.info("HackRF: Stopping RX Mode: " + msg) - if hasattr(self, "read_queue_thread") and self.read_queue_thread.is_alive(): + if hasattr(self, "read_queue_thread") and self.read_recv_buffer_thread.is_alive(): try: - self.read_queue_thread.join(0.001) + self.read_recv_buffer_thread.join(0.001) logger.info("HackRF: Joined read_queue_thread") except RuntimeError: logger.error("HackRF: Could not join read_queue_thread") diff --git a/src/urh/dev/native/RTLSDR.py b/src/urh/dev/native/RTLSDR.py index f7e4d7da..befa2705 100644 --- a/src/urh/dev/native/RTLSDR.py +++ b/src/urh/dev/native/RTLSDR.py @@ -35,7 +35,7 @@ class RTLSDR(Device): """ - self.bandwidth_is_adjustable = False + self.bandwidth_is_adjustable = False # e.g. not in Manjaro Linux / Ubuntu 14.04 self._max_frequency = 6e9 self._max_sample_rate = 3200000 self._max_frequency = 6e9 @@ -76,7 +76,7 @@ class RTLSDR(Device): if self.is_receiving: logger.info("RTLSDR: Starting receiving thread") - self._start_readqueue_thread() + self._start_read_rcv_buffer_thread() else: self.log_retcode(self.error_not_open, "start_rx_mode") @@ -87,9 +87,9 @@ class RTLSDR(Device): logger.info("RTLSDR: Stopping RX Mode: " + msg) - if hasattr(self, "read_queue_thread") and self.read_queue_thread.is_alive(): + if hasattr(self, "read_queue_thread") and self.read_recv_buffer_thread.is_alive(): try: - self.read_queue_thread.join(0.001) + self.read_recv_buffer_thread.join(0.001) logger.info("RTLSDR: Joined read_queue_thread") except RuntimeError: logger.error("RTLSDR: Could not join read_queue_thread")