uses pipe instead of queue in native backend

This commit is contained in:
Johannes Pohl
2017-02-27 14:53:13 +01:00
parent ccd757848a
commit bd336d85f7
3 changed files with 35 additions and 35 deletions

View File

@@ -9,6 +9,7 @@ import time
import psutil
from PyQt5.QtCore import QObject, pyqtSignal
from multiprocessing import Pipe
from urh.util.Formatter import Formatter
from urh.util.Logger import logger
@@ -40,7 +41,7 @@ class Device(QObject):
self.error_codes = {}
self.errors = set()
self.queue = Queue()
self.parent_conn, self.child_conn = Pipe()
self.send_buffer = None
self.send_buffer_reader = None
@@ -66,10 +67,10 @@ class Device(QObject):
self.sendbuffer_thread.daemon = True
self.sendbuffer_thread.start()
def _start_readqueue_thread(self):
self.read_queue_thread = threading.Thread(target=self.read_receiving_queue)
self.read_queue_thread.daemon = True
self.read_queue_thread.start()
def _start_read_rcv_buffer_thread(self):
self.read_recv_buffer_thread = threading.Thread(target=self.read_receiving_queue)
self.read_recv_buffer_thread.daemon = True
self.read_recv_buffer_thread.start()
def init_recv_buffer(self):
if self.receive_buffer is None:
@@ -97,7 +98,6 @@ class Device(QObject):
self.errors.add(err)
logger.error(err)
@property
def received_data(self):
return self.receive_buffer[:self.current_recv_index]
@@ -245,33 +245,34 @@ class Device(QObject):
def read_receiving_queue(self):
while self.is_receiving:
try:
# TODO: Do not use queue, but ctx in callback and just recv from Pipe?
while not self.queue.empty():
byte_buffer = self.queue.get()
byte_buffer = self.parent_conn.recv_bytes()
nsamples = len(byte_buffer) // self.BYTES_PER_SAMPLE
if nsamples > 0:
if self.current_recv_index + nsamples >= len(self.receive_buffer):
if self.is_ringbuffer:
self.current_recv_index = 0
if nsamples >= len(self.receive_buffer):
logger.warning("Receive buffer too small, skipping {0:d} samples".format(nsamples-len(self.receive_buffer)))
nsamples = len(self.receive_buffer) - 1
nsamples = len(byte_buffer) // self.BYTES_PER_SAMPLE
if nsamples > 0:
if self.current_recv_index + nsamples >= len(self.receive_buffer):
if self.is_ringbuffer:
self.current_recv_index = 0
if nsamples >= len(self.receive_buffer):
logger.warning("Receive buffer too small, skipping {0:d} samples".format(nsamples-len(self.receive_buffer)))
nsamples = len(self.receive_buffer) - 1
else:
self.stop_rx_mode("Receiving buffer is full {0}/{1}".format(self.current_recv_index + nsamples, len(self.receive_buffer)))
return
else:
self.stop_rx_mode("Receiving buffer is full {0}/{1}".format(self.current_recv_index + nsamples, len(self.receive_buffer)))
return
end = nsamples*self.BYTES_PER_SAMPLE
self.receive_buffer[self.current_recv_index:self.current_recv_index + nsamples] = \
self.unpack_complex(byte_buffer[:end], nsamples)
end = nsamples*self.BYTES_PER_SAMPLE
self.receive_buffer[self.current_recv_index:self.current_recv_index + nsamples] = \
self.unpack_complex(byte_buffer[:end], nsamples)
old_index = self.current_recv_index
self.current_recv_index += nsamples
old_index = self.current_recv_index
self.current_recv_index += nsamples
self.rcv_index_changed.emit(old_index, self.current_recv_index)
self.rcv_index_changed.emit(old_index, self.current_recv_index)
except BrokenPipeError:
pass
except EOFError:
logger.info("EOF Error: Ending receive thread")
break
time.sleep(0.01)
@@ -332,10 +333,9 @@ class Device(QObject):
else:
logger.info("Skipped {0:d} samples in sending".format(len(self.samples_to_send) - self.current_sent_sample))
def callback_recv(self, buffer):
try:
self.queue.put(buffer)
self.child_conn.send_bytes(buffer)
except BrokenPipeError:
pass
return 0

View File

@@ -77,7 +77,7 @@ class HackRF(Device):
if self.is_receiving:
logger.info("HackRF: Starting receiving thread")
self._start_readqueue_thread()
self._start_read_rcv_buffer_thread()
self.log_retcode(ret, "start_rx_mode")
else:
@@ -88,9 +88,9 @@ class HackRF(Device):
logger.info("HackRF: Stopping RX Mode: " + msg)
if hasattr(self, "read_queue_thread") and self.read_queue_thread.is_alive():
if hasattr(self, "read_queue_thread") and self.read_recv_buffer_thread.is_alive():
try:
self.read_queue_thread.join(0.001)
self.read_recv_buffer_thread.join(0.001)
logger.info("HackRF: Joined read_queue_thread")
except RuntimeError:
logger.error("HackRF: Could not join read_queue_thread")

View File

@@ -35,7 +35,7 @@ class RTLSDR(Device):
"""
self.bandwidth_is_adjustable = False
self.bandwidth_is_adjustable = False # e.g. not in Manjaro Linux / Ubuntu 14.04
self._max_frequency = 6e9
self._max_sample_rate = 3200000
self._max_frequency = 6e9
@@ -76,7 +76,7 @@ class RTLSDR(Device):
if self.is_receiving:
logger.info("RTLSDR: Starting receiving thread")
self._start_readqueue_thread()
self._start_read_rcv_buffer_thread()
else:
self.log_retcode(self.error_not_open, "start_rx_mode")
@@ -87,9 +87,9 @@ class RTLSDR(Device):
logger.info("RTLSDR: Stopping RX Mode: " + msg)
if hasattr(self, "read_queue_thread") and self.read_queue_thread.is_alive():
if hasattr(self, "read_queue_thread") and self.read_recv_buffer_thread.is_alive():
try:
self.read_queue_thread.join(0.001)
self.read_recv_buffer_thread.join(0.001)
logger.info("RTLSDR: Joined read_queue_thread")
except RuntimeError:
logger.error("RTLSDR: Could not join read_queue_thread")