import io import time import unittest import numpy as np from urh.dev.native.HackRF import HackRF from urh.dev.native.lib import hackrf class TestHackRF(unittest.TestCase): def callback_fun(self, buffer): out = [] print(buffer) for i in range(0, len(buffer), 4): try: r = np.fromstring(buffer[i:i + 2], dtype=np.float16) / 32767.5 i = np.fromstring(buffer[i + 2:i + 4], dtype=np.float16) / 32767.5 except ValueError: continue if r and i: print(r, i) # out.append(complex(float(buffer[i:i+1])/32767.5, float(buffer[i+2:i+3])/32767.5)) return 0 def test_set_rx(self): hackrf.setup() hackrf.set_freq(433.92e6) print(hackrf.is_streaming()) hackrf.start_rx_mode(self.callback_fun) time.sleep(1) hackrf.stop_rx_mode() def test_fromstring(self): buffer = b'\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xff\xfd\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xfe\xfd\xfe\xff\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xfe' r = np.empty(len(buffer) // 2, dtype=np.float32) i = np.empty(len(buffer) // 2, dtype=np.float32) c = np.empty(len(buffer) // 2, dtype=np.complex64) # dtype = unpacked = np.frombuffer(buffer, dtype=[('r', np.uint8), ('i', np.uint8)]) ru = unpacked['r'] / 128.0 iu = unpacked['i'] / 128.0 # for j in range(0, len(buffer)-1, 2): # r[j//2] = np.frombuffer(buffer[j:j + 1], dtype=np.int8) / 128.0 # i[j//2] = np.frombuffer(buffer[j + 1:j + 2], dtype=np.int8) / 128.0 # r2 = np.fromstring(buffer[], dtype=np.float16) / 32767.5 c.real = ru c.imag = iu print(c) # x,y = np.frombuffer(buffer, dtype=[('x', np.float16), ('y', np.float16)]) def test_fromstring2(self): buffer = b'\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xff\xfd\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xfe\xfd\xfe\xff\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfd\xfe' c = np.empty(len(buffer) // 2, dtype=np.complex64) # dtype = unpacked = np.frombuffer(buffer, dtype="> 8)) * 1 / 128 imag = float(np.uint8(i & 0xff)) * 1 / 128 lookup[i] = complex(real, imag) buffer = b"\x00\x01" unpacked = np.frombuffer(buffer, dtype=[('r', np.uint8), ('i', np.uint8)]) ru = unpacked['r'] / 128.0 iu = unpacked['i'] / 128.0 # seems to be the same # Convert floated again??? # https://github.com/osmocom/gr-osmosdr/blob/master/lib/osmosdr/osmosdr_src_c.cc#L235 print(lookup[0x0001]) print(ru, iu) def test_pack_complex(self): hkrf = HackRF(1,1,1,1) print(hkrf.pack_complex(np.array([complex(0.1, 0.1), complex(0.5, 0.1)], dtype=np.complex64))) def test_buffer_io(self): b = io.BytesIO(b"\x00\x01\x02\x03\x04\x05\x06") br = io.BufferedReader(b) # Buffered Reader is thread safe https://docs.python.org/3/library/io.html#multi-threading print(bool(br.peek())) print(br.read(2)) print(br.tell()) print(br.read(2)) print(br.read(2)) print(br.read(2)) br.seek(0) print(br.read1(2)) print(br.tell()) print(bool(br.peek())) br.close() b.close() br.close()