前言
前文已经详细介绍了CRC并行结算的数学原理,基于此原理,本文使用Python实现了任意CRC算法的计算程序。
最终结果与流行的CRC库pycrc进行对比,结果一致,说明了程序的正确性,也验证了CRC并行算法的正确性。
一 编码思路
考虑CRC算法的计算参数:
-
多项式:例如: 0x1021
(0001_0000_0010_0001
) -
初始异或值:例如: 0xFFFF
-
输入按字节反转:是 或者 否 -
输出整体反转:是 或者 否 -
最终异或值:例如: 0x0000
CRC计算步骤如下图所示。虚线框表示可选步骤。
编码就按照计算步骤来计算即可。
二. CRCConfig类与_CRCCalculator类定义
@dataclass(frozen=True)
class CRCConfig:
width: int = 16 # CRC宽度,默认值为 16
poly: int = 0x8005 # CRC生成多项式,默认值为 CRC-16/MODBUS
reflect_in: bool = True # 输入反转标志,默认反转
xor_in: int = 0x0000 # 输入异或值,默认值为 0x0000
reflect_out: bool = True # 输出反转标志,默认反转
xor_out: int = 0x0000 # 输出异或值,默认值为 0x0000
def __post_init__(self):
# 验证配置参数的合法性,确保它们在允许范围内
if not (4 <= self.width <= 64):
raise ValueError(f"width ({self.width}) 必须在 4 到 64 之间")
if self.poly > (1 << self.width):
raise ValueError(
f"多项式 poly 的二进制位宽 ({self.poly.bit_length()}) 超过了指定的位宽 width ({self.width})")
if self.xor_in > (1 << self.width):
raise ValueError(
f"xor_in 的二进制位宽 ({self.xor_in.bit_length()}) 超过了指定的位宽 width ({self.width})")
if self.xor_out > (1 << self.width):
raise ValueError(
f"xor_out 的二进制位宽 ({self.xor_out.bit_length()}) 超过了指定的位宽 width ({self.width})")
def calc_crc(self, input_value: str) -> str:
# 实例化类, 直接计算给定输入的CRC值
crc_calculator = _CRCCalculator(self)
return crc_calculator._calculate_crc(input_value)
CRCConfig类的相关字段名称保持与pycrc完全一致。
还定义了一些流行的CRC算法的固定配置:
CRC_4_ITU = CRCConfig(
width = 4,
poly = 0x03,
reflect_in = True,
xor_in = 0x00,
reflect_out = True,
xor_out = 0x00
)
...
CRC_8_MAXIM = CRCConfig(
width = 8,
poly = 0x31,
reflect_in = True,
xor_in = 0x00,
reflect_out = True,
xor_out = 0x00
)
...
_CRCCalculator类定义:注意这是个内部类。
class _CRCCalculator:
def __init__(self, crc_config: CRCConfig):
# 初始化CRC计算器,传入CRC配置
self.crc_config = crc_config
self.N = self.crc_config.width
self.T = self._build_transfer_matrix(self.crc_config.poly, self.N)
...
三. 输入数据处理
3.1 输入类型转换
输入数据分两种类型,一种为数字,另一种为字符串。
如果是字符串则需要先转为Unicode码,再转为16进制。
注意,选择的字符集不同,则同一字符串对应的输出不同。这里固定字符集为utf-8
。
def _string_to_hex(self, input_value) -> str:
# 将输入值转换为十六进制字符串表示形式
if isinstance(input_value, int):
return hex(input_value)[2:] # 如果输入是数字,直接转换为十六进制
# 如果是字符串,则根据配置进行编码和转换
encoding = 'utf-8'
try:
encoded_bytes = input_value.encode(encoding)
hex_str = ''.join(f'{byte:02x}' for byte in encoded_bytes)
return hex_str
except UnicodeEncodeError as e:
raise ValueError(f"字符集 '{encoding}' 不支持字符串中的某些字符: {e}")
3.2 输入数据正确性检查与标准化
对于非整数字节的16进制输入,则需要补前导0到整数字节。
def _validate_hex_input(self, input_str: str) -> str:
# 清理并验证输入的十六进制字符串是否有效
stripped_input = re.sub(r'\s+', '', input_str)
if stripped_input == "":
raise ValueError("输入不能为空")
if stripped_input.startswith("0x") or stripped_input.startswith("0X"):
stripped_input = stripped_input[2:]
if not re.fullmatch(r"[0-9a-fA-F]+", stripped_input):
raise ValueError("输入不是有效的16进制数")
if len(stripped_input) % 2 != 0:
stripped_input = '0' + stripped_input
return stripped_input
3.3 输入按字节反转
def _reverse_single_byte(self, hex_str: str) -> str:
# 反转每个字节内的半字节顺序(适用于需要反转输入/输出的情况)
reverse_table = {
'0': '0', '1': '8', '2': '4', '3': 'C',
'4': '2', '5': 'A', '6': '6', '7': 'E',
'8': '1', '9': '9', 'A': '5', 'B': 'D',
'C': '3', 'D': 'B', 'E': '7', 'F': 'F',
'a': 'f', 'b': 'd', 'c': '3', 'd': 'b',
'e': '7', 'f': 'f'
}
reversed_hex = ''
for i in range(0, len(hex_str), 2):
byte = hex_str[i:i+2]
high_nibble = byte[0].upper()
low_nibble = byte[1].upper()
reversed_high = reverse_table.get(high_nibble, high_nibble)
reversed_low = reverse_table.get(low_nibble, low_nibble)
reversed_hex += reversed_low + reversed_high
return reversed_hex
3.4 低位补0 + 高位与初始值异或
def _pad0_and_xorin(self, hex_str: str) -> str:
# 对输入的十六进制字符串进行填充零和与初始值xor_in进行按位异或操作
hex_to_bin_table = {
'0': '0000', '1': '0001', '2': '0010', '3': '0011',
'4': '0100', '5': '0101', '6': '0110', '7': '0111',
'8': '1000', '9': '1001', 'A': '1010', 'B': '1011',
'C': '1100', 'D': '1101', 'E': '1110', 'F': '1111',
'a': '1010', 'b': '1011', 'c': '1100', 'd': '1101',
'e': '1110', 'f': '1111'
}
n = self.N
# 使用 format 函数将 xor_in 转换为固定长度的二进制字符串
xor_bin_str = format(self.crc_config.xor_in, f'0{n}b')
# 将输入的十六进制字符串转换为二进制字符串
bin_str = ''.join(hex_to_bin_table[char] for char in hex_str.upper())
bin_str += '0' * n # 在二进制字符串后面直接添加 n 个零
# 对最开始的 n 位进行异或操作
high_n_bits = bin_str[:n]
xor_result_high_bits = ''.join(
'1' if a != b else '0' for a, b in zip(high_n_bits, xor_bin_str))
low_bits = bin_str[n:] # 取剩余的低位
final_bin = xor_result_high_bits + low_bits
return final_bin
四. CRC并行计算
4.1 状态转移矩阵生成函数
def _build_transfer_matrix(self, g_hex: str, n: int) -> np.ndarray:
# 构建基于生成多项式的状态转移矩阵
g_bin = bin(g_hex)[2:].zfill(n)
T = np.zeros((n, n), dtype=int)
for i in range(n):
if i < len(g_bin):
T[0, n - 1 - i] = int(g_bin[-1 - i])
for i in range(1, n):
T[i, i - 1] = 1
return T
4.2 mod2运算
def _get_next_c(self, bin_input: str, T: np.ndarray) -> str:
# 根据状态转移矩阵计算下一个CRC校验值
n = self.N
this_c = np.zeros(n, dtype=int)
C = np.dot(this_c, np.linalg.matrix_power(T, len(bin_input)))
blocks = [bin_input[i:i+n] for i in range(0, len(bin_input), n)]
i = len(blocks[-1]) if blocks else 0
for j, block in enumerate(blocks):
D = np.array([int(bit) for bit in block], dtype=int)
if len(D) < n:
D = np.pad(D, (n - len(D), 0), 'constant')
C += D
break
power = n * (len(blocks)-j-2) + i
T_power = np.linalg.matrix_power(T, power)
C += np.dot(D, T_power)
C = C % 2
C = ''.join(map(str, C))
return C
五. 输出CRC处理
输出要进行整体反转和与结果异或值进行异或的操作:
def _output_reverse_and_xor(self, bin_input: str) -> str:
# 对最终的CRC结果进行可能的反转和与xor_out的按位异或操作
processed_bin = bin_input[::-1] if self.crc_config.reflect_out else bin_input
# 将 xor_out 转换为二进制字符串并确保其长度与 processed_bin 一致
xor_out_bin = format(self.crc_config.xor_out, f'0{len(processed_bin)}b')
# 对 processed_bin 和 xor_out 进行按位异或操作
result_bin = ''.join('1' if a != b else '0' for a, b in zip(processed_bin, xor_out_bin))
return result_bin
六. 最终的CRC计算函数
仍然定义为内部函数,用户需要在CRCConfig类中使用封装函数。
def _calculate_crc(self, input_value: str) -> str:
# 执行完整的CRC计算流程
# 将输入转换为十六进制字符串
hex_str = self._string_to_hex(input_value)
# 根据配置验证并清理十六进制字符串
validated_hex = self._validate_hex_input(hex_str)
# 可选地反转单个字节内的半字节
if self.crc_config.reflect_in:
validated_hex = self._reverse_single_byte(validated_hex)
# 填充与异或操作(这里假设只对高n位进行异或)
bin_str = self._pad0_and_xorin(validated_hex)
# 计算状态转移
C = self._get_next_c(bin_str, self.T)
# 对输出结果进行反转和/或取反
final_result = self._output_reverse_and_xor(C)
return final_result
七. 对比验证
pycrc:流行的开源的CRC计算库,本文的CRCConfig类的字段名称与pycrc完全一致。
仓库:tpircher/pycrc: Free, easy to use Cyclic Redundancy Check (CRC) calculator and source code generator
与pycrc对比,随机输入。
import pycrc.algorithms
import myCRC
import random
# myCRC示例用法
crc_custom = myCRC.CRCConfig(
width = 16 ,
poly = 0x1021,
reflect_in = True ,
xor_in = 0x0000,
reflect_out = True ,
xor_out = 0x0000
)
# 打印CRC配置
print("myCRC配置:")
print(f"{'width':<12}= " + str(crc_custom.width))
print(f"{'poly':<12}= " + f"0x{crc_custom.poly:X}")
print(f"{'reflect_in':<12}= " + str(crc_custom.reflect_in))
print(f"{'xor_in':<12}= " + f"0x{crc_custom.xor_in:X}")
print(f"{'reflect_out':<12}= " + str(crc_custom.reflect_out))
print(f"{'xor_out':<12}= " + f"0x{crc_custom.xor_out:X}")
print('-------------------------------------------------------------')
# 随机选择位宽
bitwidth = random.randint(1, 64)
max_value_for_bitwidth = (1 << bitwidth) - 1
input_data = random.randint(0, max_value_for_bitwidth) # 随机输入
# input_data = 0x12345678 # 手动输入数据
print(f"输入数据: 0x{input_data:X}")
mycrc_result = crc_custom.calc_crc(input_data)
final_crc_hex = hex(int(mycrc_result, 2))[2:].upper()
print(f"myCrc最终CRC值二进制: {mycrc_result}")
print(f"myCrc最终CRC值16进制: 0x{final_crc_hex}")
print('-------------------------------------------------------------')
# pycrc示例用法
crc = pycrc.algorithms.Crc(
width = 16 ,
poly = 0x1021,
reflect_in = True ,
xor_in = 0x0000,
reflect_out = True ,
xor_out = 0x0000
)
# 检查CRC配置是否一致
assert crc.width == crc_custom.width
assert crc.poly == crc_custom.poly
assert crc.reflect_in == crc_custom.reflect_in
assert crc.xor_in == crc_custom.xor_in
assert crc.reflect_out == crc_custom.reflect_out
assert crc.xor_out == crc_custom.xor_out
print("CRC配置一致!")
# 将整数转换为字节序列,假设是大端字节序
input_data_list = input_data.to_bytes((input_data.bit_length() + 7) // 8, byteorder='big') or bytes([0])
pycrc_result = crc.bit_by_bit(input_data_list)
print(f"pyCRC的CRC结果是: 0x{pycrc_result:02X}")
# 判断两个16进制是否一样
if final_crc_hex == hex(pycrc_result)[2:].upper():
print("myCRC和pyCRC的结果一致!")
else:
print("myCRC和pyCRC的结果不一致!")
运行结果:
myCRC配置:
width = 16
poly = 0x1021
reflect_in = True
xor_in = 0x0
reflect_out = True
xor_out = 0x0
-------------------------------------------------------------
输入数据: 0x12E133
myCrc最终CRC值二进制: 1100001101110100
myCrc最终CRC值16进制: 0xC374
-------------------------------------------------------------
CRC配置一致!
pyCRC的CRC结果是: 0xC374
myCRC和pyCRC的结果一致!
与网页对比,结果一致。
八. 源码分享
两处仓库同步。
Gitee:myCRC: 计算CRC的代码
没有回复内容