reformat code

This commit is contained in:
jopohl
2016-12-03 17:32:09 +01:00
parent d18df1f3cf
commit 14fed5bda7

View File

@@ -8,6 +8,7 @@ class Encoder(object):
"""
Full featured encoding/decoding of protocols.
"""
class ErrorState:
SUCCESS = "success"
WRONG_CRC = "wrong crc"
@@ -30,17 +31,17 @@ class Encoder(object):
self.src = [] # [[True, True], [True, False], [False, True], [False, False]]
self.dst = [] # [[False, False], [False, True], [True, False], [True, True]]
self.carrier = "1_"
self.cutmark = [True,False]
self.cutmode = 0 # 0 = before, 1 = after, 2 = before_pos, 3 = after_pos
self.cutmark = [True, False]
self.cutmode = 0 # 0 = before, 1 = after, 2 = before_pos, 3 = after_pos
self.__symbol_len = 1
# Configure CC1101 Date Whitening
polynomial = [False, False, True, False, False, False, False, True] # x^5+x^0
polynomial = [False, False, True, False, False, False, False, True] # x^5+x^0
sync_bytes = [True, True, True, False, True, False, False, True, True, True, False, False,
True, False, True, False, True, True, True, False, True, False, False, True,
True, True, False, False, True, False, True, False] # "e9cae9ca"
#sync_bytes = self.str2bit("01100111011010000110011101101000") # "67686768" (RWE Default)
#sync_bytes = self.str2bit("01101001111101100110100111110111") # "69f669f7" (Special RWE)
# sync_bytes = self.str2bit("01100111011010000110011101101000") # "67686768" (RWE Default)
# sync_bytes = self.str2bit("01101001111101100110100111110111") # "69f669f7" (Special RWE)
self.data_whitening_polynomial = polynomial # Set polynomial
self.data_whitening_sync = sync_bytes # Sync Bytes
@@ -48,7 +49,7 @@ class Encoder(object):
self.data_whitening_preamble = [True, False] * 16 # 010101...
self.lfsr_state = []
self.data_whitening_apply_crc = True # Apply CRC with XOR
self.data_whitening_apply_crc = True # Apply CRC with XOR
self.data_whitening_preamble_rm = True # Remove Preamble
self.data_whitening_sync_rm = True # Remove Sync Bytes
self.data_whitening_crc_rm = False # Remove CRC
@@ -109,7 +110,7 @@ class Encoder(object):
if i < len(names):
self.chain.append(names[i])
else:
self.chain.append("0xe9cae9ca;0x21;0x8") # Default Sync Bytes
self.chain.append("0xe9cae9ca;0x21;0x8") # Default Sync Bytes
elif constants.DECODING_CARRIER in names[i]:
self.chain.append(self.code_carrier)
i += 1
@@ -250,11 +251,11 @@ class Encoder(object):
elif self.code_data_whitening == operation:
if self.chain[i + 1].count(';') == 2:
self.data_whitening_sync, self.data_whitening_polynomial, opt = self.chain[i + 1].split(";")
if(len(self.data_whitening_sync) > 0 and len(self.data_whitening_polynomial) > 0) and len(opt) > 0:
if (len(self.data_whitening_sync) > 0 and len(self.data_whitening_polynomial) > 0) and len(opt) > 0:
self.data_whitening_sync = self.hex2bit(self.data_whitening_sync)
self.data_whitening_polynomial = self.hex2bit(self.data_whitening_polynomial)
opt = self.hex2bit(opt)
if len(opt)>=4:
if len(opt) >= 4:
self.data_whitening_apply_crc = opt[0]
self.data_whitening_preamble_rm = opt[1]
self.data_whitening_sync_rm = opt[2]
@@ -333,7 +334,7 @@ class Encoder(object):
# inpt empty, polynomial or syncbytes are zero! (Shouldn't happen)
if inpt_to < 1 or len_polynomial < 1 or len_sync < 1:
return inpt[inpt_from:inpt_to], 0, self.ErrorState.MISC # Misc Error
return inpt[inpt_from:inpt_to], 0, self.ErrorState.MISC # Misc Error
# Force inpt to contain bool values; overwrite everything else with True
for i in range(0, inpt_to):
@@ -346,12 +347,12 @@ class Encoder(object):
while i < (inpt_to - len_sync):
equalbits = 0
for j in range(0, len_sync):
if inpt[i+j] == self.data_whitening_sync[j]:
if inpt[i + j] == self.data_whitening_sync[j]:
equalbits += 1
else:
continue
if len_sync == equalbits:
whitening_start_pos = i+j+1
whitening_start_pos = i + j + 1
break
else:
i += 1
@@ -371,8 +372,8 @@ class Encoder(object):
keystream.extend(self.lfsr(8))
# If data whitening polynomial is wrong, keystream can be less than needed. Check and exit.
if len(keystream) < inpt_to-whitening_start_pos:
return inpt[inpt_from:inpt_to], 0, self.ErrorState.MISC # Error 31338
if len(keystream) < inpt_to - whitening_start_pos:
return inpt[inpt_from:inpt_to], 0, self.ErrorState.MISC # Error 31338
# Apply keystream (xor) - Decoding
if decoding:
@@ -440,12 +441,12 @@ class Encoder(object):
if len(self.carrier) > 0:
for x in range(0, len(inpt)):
tmp = self.carrier[x % len(self.carrier)]
if tmp not in ("0", "1", "*"): # Data!
if tmp not in ("0", "1", "*"): # Data!
output.append(inpt[x])
else: # Carrier -> 0, 1, *
else: # Carrier -> 0, 1, *
if tmp in ("0", "1"):
if (inpt[x] and tmp != "1") or (not inpt[x] and tmp != "0"):
#print("Pos", x, self.carrier[x % cl], inpt[x])
# print("Pos", x, self.carrier[x % cl], inpt[x])
errors += 1
else:
# Add carrier if encoding
@@ -457,7 +458,8 @@ class Encoder(object):
output.append(i)
x += 1
while self.carrier[x % len(self.carrier)] in ("0", "1", "*"):
output.append(False if self.carrier[x % len(self.carrier)] in ("0", "*") else True) # Add 0 when there is a wildcard (*) in carrier description
output.append(False if self.carrier[x % len(self.carrier)] in (
"0", "*") else True) # Add 0 when there is a wildcard (*) in carrier description
x += 1
return output, errors, self.ErrorState.SUCCESS
@@ -465,7 +467,6 @@ class Encoder(object):
# XOR Data Whitening
return self.apply_data_whitening(decoding, inpt)
def code_lsb_first(self, decoding, inpt):
output = inpt.copy()
errors = len(inpt) % 8
@@ -625,8 +626,8 @@ class Encoder(object):
# Cutmark is not valid
return inpt, 0, self.ErrorState.INVALID_CUTMARK
for i in range(0, len(inpt)-len_cutmark):
if all(inpt[i+j] == self.cutmark[j] for j in range(len_cutmark)):
for i in range(0, len(inpt) - len_cutmark):
if all(inpt[i + j] == self.cutmark[j] for j in range(len_cutmark)):
pos = i
break
else:
@@ -637,7 +638,7 @@ class Encoder(object):
if self.cutmode == 0 or self.cutmode == 2:
output.extend(inpt[pos:])
else:
# Delete after
# Delete after
if self.cutmode == 1:
pos += len(self.cutmark)
else:
@@ -658,17 +659,17 @@ class Encoder(object):
val = inpt.copy()
val[-4:] = [False, False, False, False]
for i in range(0, len(val), 8):
hash += int("".join(map(str,map(int, val[i:i+8]))),2)
hash = (((hash & 0xf0)>>4) + (hash & 0x0f)) & 0x0f
hash += int("".join(map(str, map(int, val[i:i + 8]))), 2)
hash = (((hash & 0xf0) >> 4) + (hash & 0x0f)) & 0x0f
return list(map(bool, map(int, "{0:04b}".format(hash))))
@staticmethod
def enocean_checksum8(inpt):
hash = 0
print(inpt[-8:], inpt)
for i in range(0, len(inpt)-8, 8):
hash += int("".join(map(str,map(int, inpt[i:i+8]))),2)
#hash += int("".join(map(str, map(int, val[i+8:i:-1]))), 2)
for i in range(0, len(inpt) - 8, 8):
hash += int("".join(map(str, map(int, inpt[i:i + 8]))), 2)
# hash += int("".join(map(str, map(int, val[i+8:i:-1]))), 2)
return hash % 256
@staticmethod
@@ -684,7 +685,7 @@ class Encoder(object):
eof = [True, False, True, True]
if decoding:
inpt, _, _ = self.code_invert(True, inpt) # Invert
inpt, _, _ = self.code_invert(True, inpt) # Invert
# Insert a leading 1, to ensure protocol starts with 1
# The first 1 (inverted) of EnOcean is so weak, that it often drowns in noise
inpt.insert(0, True)
@@ -715,23 +716,26 @@ class Encoder(object):
state = self.ErrorState.SUCCESS
if decoding:
for n in range (start, end, 12):
for n in range(start, end, 12):
errors += sum([inpt[n + 2] == inpt[n + 3], inpt[n + 6] == inpt[n + 7]])
errors += sum([inpt[n+10] != False, inpt[n+11] != True]) if n < end - 11 else 0
output.extend([inpt[n], inpt[n+1], inpt[n+2], inpt[n+4], inpt[n+5], inpt[n+6], inpt[n+8], inpt[n+9]])
errors += sum([inpt[n + 10] != False, inpt[n + 11] != True]) if n < end - 11 else 0
output.extend([inpt[n], inpt[n + 1], inpt[n + 2], inpt[n + 4], inpt[n + 5], inpt[n + 6], inpt[n + 8],
inpt[n + 9]])
if output[-4:] != self.enocean_checksum4(output[12:]):
state = self.ErrorState.WRONG_CRC
# Finalize output
output.extend(inpt[end:end+4])
output.extend(inpt[end:end + 4])
else:
# Calculate hash
inpt[end-4:end] = self.enocean_checksum4(inpt[start:end])
inpt[end - 4:end] = self.enocean_checksum4(inpt[start:end])
for n in range(start, end, 8):
output.extend([inpt[n], inpt[n+1], inpt[n+2], not inpt[n+2], inpt[n+3], inpt[n+4], inpt[n+5], not inpt[n+5], inpt[n+6], inpt[n+7]])
output.extend(
[inpt[n], inpt[n + 1], inpt[n + 2], not inpt[n + 2], inpt[n + 3], inpt[n + 4], inpt[n + 5],
not inpt[n + 5], inpt[n + 6], inpt[n + 7]])
if n < len(inpt) - 15:
output.extend([False, True])
@@ -754,7 +758,6 @@ class Encoder(object):
bit_errors, state = self.analyze(msg)
return bit_errors == 0 and state == self.ErrorState.SUCCESS
def analyze(self, inpt):
"""
return number of bit errors and state
@@ -812,6 +815,5 @@ class Encoder(object):
bitstring = bin(int(inpt, base=16))[2:]
return "0" * (4 * len(inpt.lstrip('0x')) - len(bitstring)) + bitstring
def __eq__(self, other):
return self.get_chain() == other.get_chain()
return self.get_chain() == other.get_chain()