diff --git a/exp1_basic.py b/exp1_basic.py index 06c7e9d..49bf475 100644 --- a/exp1_basic.py +++ b/exp1_basic.py @@ -9,31 +9,115 @@ - RC 模型: 使用 ESN 等 RC 模型,调优储备池大小、谱半径等参数。 - 训练方法: 监督学习 (例如 Ridge 回归)。 评估指标: - - 降噪指标: SNR 提升、均方根误差 (RMSE)。 + - 降噪指标: SNR 提升、均方根误差 (RMSE)、相关系数、峰值信噪比。 - 动力学特性恢复指标: 吸引子重构质量、Lyapunov 指数估计误差、分形维数估计误差、预测精度。 结果展示: - 可视化时间序列、吸引子重构、PSD 对比图和表格展示指标结果。 """ -def load_data(): - # TODO: 生成并加载带噪及干净混沌系统数据 - pass +import numpy as np +import matplotlib.pyplot as plt -def build_esn_model(): - # TODO: 构建 RC/ESN 模型并设置关键参数 - pass +import reservoirpy as rpy +import reservoirpy.nodes as rpn +rpy.verbosity(0) +rpy.set_seed(42) -def train_model(): - # TODO: 利用训练集进行训练,使用监督学习方法 - pass +from tools import load_data -def evaluate_results(): - # TODO: 计算 SNR, RMSE, 吸引子重构、Lyapunov 指数估计误差等指标 - pass +def build_esn_model(units, lr, sr, reg, output_dim, reservoir_node=None): + + if reservoir_node is None: + reservoir_node = rpn.Reservoir(units=units, lr=lr, sr=sr) + output_node = rpn.Ridge(output_dim=output_dim, ridge=reg) + + model = reservoir_node >> output_node + return model, reservoir_node +def train_model(model, clean_data, noisy_data, warmup=1000): + model.fit(noisy_data, clean_data, warmup=warmup) + prediction = model.run(noisy_data) + return model, prediction + +def evaluate_results(model, clean_data, noisy_data, warmup=0, vis_data=True, vis_psd=False): + # 预测 + prediction = model.run(noisy_data) + # 计算各种指标 + snr = 10 * np.log10(np.mean(clean_data**2) / np.mean((clean_data - prediction)**2)) + rmse = np.sqrt(np.mean((clean_data - prediction)**2)) + psnr = 20 * np.log10(np.max(clean_data) / rmse) + corr_coef = np.corrcoef(clean_data, prediction)[0, 1] + + # 3个维度分别绘制 noisy vs clean | prediction vs clean + if vis_data: + plt.figure(figsize=(12, 6)) + for i in range(3): + plt.subplot(3, 2, 2*i+1) + plt.plot(clean_data[warmup:, i], label='clean') + plt.plot(noisy_data[warmup:, i], label='noisy') + plt.title(f"Dim {i+1} (Noisy vs Clean)") + plt.legend() + plt.subplot(3, 2, 2*i+2) + plt.plot(clean_data[warmup:, i], label='clean') + plt.plot(prediction[warmup:, i], label='prediction') + plt.title(f"Dim {i+1} (Prediction vs Clean)") + plt.legend() + # PSD 对比 + if vis_psd: + plt.figure(figsize=(12, 6)) + for i in range(3): + plt.subplot(3, 1, i+1) + plt.psd(clean_data[warmup:, i], NFFT=1024, label='clean') + plt.psd(noisy_data[warmup:, i], NFFT=1024, label='noisy') + plt.psd(prediction[warmup:, i], NFFT=1024, label='prediction') + plt.title(f"Dim {i+1} PSD Comparison") + plt.legend() + + return snr, rmse, psnr, corr_coef, prediction + +def run(): + # 数据加载与预处理 + clean_data, noisy_data = load_data(system='lorenz', noise='gaussian', + intensity=0.5, init=[1, 1, 1], + n_timesteps=40000, transient=10000, h=0.01) + # 分为训练集和测试集 按百分比 + train_size = int(len(clean_data) * 0.8) + train_clean_data = clean_data[:train_size] + train_noisy_data = noisy_data[:train_size] + test_clean_data = clean_data[train_size:] + test_noisy_data = noisy_data[train_size:] + + # LV1 + model_lv1, reservoir_node = build_esn_model(units=1000, lr=0.1, sr=0.9, reg=1e-5, output_dim=3) + model_lv1, lv1_train_pred = train_model(model_lv1, train_clean_data, train_noisy_data) + snr, rmse, psnr, corr_coef, lv1_pred = evaluate_results(model_lv1, test_clean_data, test_noisy_data) + print(f"SNR: {snr:.6f} RMSE: {rmse:.6f} PSNR: {psnr:.6f} CorrCoef: {corr_coef:.6f}") + + # LV2 + model_lv2, _ = build_esn_model(units=1000, lr=0.2, sr=0.9, reg=1e-5, output_dim=3, reservoir_node=reservoir_node) + model_lv2, lv2_train_pred = train_model(model_lv2, train_clean_data, lv1_train_pred) + snr, rmse, psnr, corr_coef, lv2_pred = evaluate_results(model_lv2, test_clean_data, lv1_pred) + print(f"SNR: {snr:.6f} RMSE: {rmse:.6f} PSNR: {psnr:.6f} CorrCoef: {corr_coef:.6f}") + + # LV3 + model_lv3, _ = build_esn_model(units=1000, lr=0.3, sr=0.9, reg=1e-5, output_dim=3, reservoir_node=reservoir_node) + model_lv3, lv3_train_pred = train_model(model_lv3, train_clean_data, lv2_train_pred) + snr, rmse, psnr, corr_coef, lv3_pred = evaluate_results(model_lv3, test_clean_data, lv2_pred) + print(f"SNR: {snr:.6f} RMSE: {rmse:.6f} PSNR: {psnr:.6f} CorrCoef: {corr_coef:.6f}") + + # LV4 + model_lv4, _ = build_esn_model(units=1000, lr=0.4, sr=0.9, reg=1e-4, output_dim=3, reservoir_node=reservoir_node) + model_lv4, lv4_train_pred = train_model(model_lv4, train_clean_data, lv3_train_pred) + snr, rmse, psnr, corr_coef, lv4_pred = evaluate_results(model_lv4, test_clean_data, lv3_pred) + print(f"SNR: {snr:.6f} RMSE: {rmse:.6f} PSNR: {psnr:.6f} CorrCoef: {corr_coef:.6f}") + + # LV5 + model_lv5, _ = build_esn_model(units=1000, lr=0.5, sr=0.9, reg=1e-3, output_dim=3, reservoir_node=reservoir_node) + model_lv5, lv5_train_pred = train_model(model_lv5, train_clean_data, lv4_train_pred) + snr, rmse, psnr, corr_coef, lv5_pred = evaluate_results(model_lv5, test_clean_data, lv4_pred) + print(f"SNR: {snr:.6f} RMSE: {rmse:.6f} PSNR: {psnr:.6f} CorrCoef: {corr_coef:.6f}") + + plt.show() + if __name__ == "__main__": - # 主实验流程 - load_data() - build_esn_model() - train_model() - evaluate_results() + run() \ No newline at end of file diff --git a/exp4_noise.py b/exp4_noise.py index 5ab9ef4..c349bf3 100644 --- a/exp4_noise.py +++ b/exp4_noise.py @@ -1,7 +1,7 @@ """ 实验4: 不同噪声类型实验 (鲁棒性分析,可选但加分) 目标: - 评估 RC 降噪方法在不同噪声类型下的表现,包括高斯白噪声、色噪声 (1/f噪声)、脉冲噪声和混合噪声, + 评估 RC 降噪方法在不同噪声类型下的表现,包括高斯白噪声、色噪声 (1/f噪声)、脉冲噪声和正弦噪声, 分析不同噪声条件下的降噪效果及动力学特性恢复能力。 实验设置: - 保持混沌系统、数据划分等设置不变,仅改变噪声类型。 diff --git a/tools.py b/tools.py index e69de29..0783e99 100644 --- a/tools.py +++ b/tools.py @@ -0,0 +1,201 @@ +from reservoirpy import datasets +import numpy as np + +def add_noise(data, noise_type='gaussian', intensity=0.1, **kwargs): + """ + 为输入数据添加噪声。 + + 参数: + data: numpy 数组, 原始数据. + noise_type: str, 噪声类型,可选 'gaussian'(高斯白噪声)、'colored' 或 '1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声)。 + intensity: float, 噪声强度. + kwargs: 针对某些噪声类型的额外参数,例如: + - 对于 'impulse': prob (脉冲发生概率, 默认 0.01) + - 对于 'sine' : freq (正弦信号频率, 默认 5) + 返回: + 添加噪声后的数据. + """ + noise_type = noise_type.lower() + + # 确保噪声与数据形状匹配,处理多维数据 + if data.ndim > 1: + n_samples, n_dimensions = data.shape + else: + n_samples = len(data) + n_dimensions = 1 + + if noise_type in ['gaussian', 'gaussian white']: + noise = intensity * np.random.randn(*data.shape) + return data + noise + + elif noise_type in ['colored', '1/f']: + # 对于多维数据,为每个维度单独生成1/f噪声 + if data.ndim > 1: + noise = np.zeros_like(data) + for dim in range(n_dimensions): + f = np.fft.fftfreq(n_samples) + f[0] = 1e-10 + noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples) + factor = intensity / np.sqrt(np.abs(f)) + noise[:, dim] = np.fft.ifft(noise_complex * factor).real + else: + f = np.fft.fftfreq(n_samples) + f[0] = 1e-10 + noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples) + factor = intensity / np.sqrt(np.abs(f)) + noise = np.fft.ifft(noise_complex * factor).real + return data + noise + + elif noise_type == 'impulse': + noise = np.zeros_like(data) + prob = kwargs.get('prob', 0.01) + mask = np.random.rand(*data.shape) < prob + impulse = intensity * (2 * np.random.rand(*data.shape) - 1) + noise[mask] = impulse[mask] + return data + noise + + elif noise_type == 'sine': + t = np.arange(n_samples) + freq = kwargs.get('freq', 5) + + # 对于多维数据,确保正弦波形状正确 + if data.ndim > 1: + sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples).reshape(-1, 1) + # 扩展到与数据相同的维度 + sine_wave = np.tile(sine_wave, (1, n_dimensions)) + else: + sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples) + return data + sine_wave + + else: + raise ValueError("未知的噪声类型。可选项:'gaussian', 'colored' (1/f), 'impulse', 'sine'") + +def load_data(system='lorenz', init='random', noise=None, intensity=0.1, h=0.01, n_timesteps=10000, transient=1000, normlization=True, **kwargs): + """ + 加载混沌系统数据. + + 参数: + system: str, 混沌系统类型, 可选 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'。 + init: 初始值. 如果为 'random' 则随机初始化,否则期望传入数组形式的初始值. + noise: None, str 或 str 列表, 指定要添加的噪声类型. 如果是列表,则混合各噪声 (取平均) 作为混合噪声. + -- 可选 'gaussian'(高斯白噪声)、'colored' 或 '1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声) + intensity: float, 噪声强度. + h: float, 时间步长 (TimeDelta). + n_timesteps: int, 时间步数. + transient: int, 需丢弃的时间步数. + normlization: bool, 是否对数据进行归一化处理. + kwargs: 其他系统参数. + 返回: + (clean_data, noisy_data): 两个 numpy 数组, 分别为干净数据和添加噪声后的数据. + + 各系统默认值: + - lorenz: rho=28, sigma=10, beta=8/3, h=0.03, x0=[1, 1, 1] + - rossler: a=0.2, b=0.2, c=5.7, h=0.1, x0=[-0.1, 0, 0.02] + - multiscroll: a=40, b=3, c=28, h=0.01, x0=[-0.1, 0.5, -0.6] + - kuramoto_sivashinsky: N=128, M=16, h=0.25, x0=None + """ + system = system.lower() + if init == 'random': + if system in ['lorenz', 'rossler', 'multiscroll']: + # 默认3维初始值 + state = np.random.rand(3) + elif system in ['kuramoto_sivashinsky']: + state = np.random.rand(1) + else: + raise ValueError("未知的混沌系统类型。") + else: + state = np.array(init, dtype=float) + + if system == 'lorenz': + ''' + (function) def lorenz( + n_timesteps: int, + rho: float = 28, + sigma: float = 10, + beta: float = 8 / 3, + x0: list | ndarray = [1, 1, 1], + h: float = 0.03, + **kwargs: Any + ) -> ndarray + ''' + sigma = kwargs.get('sigma', 10.0) + rho = kwargs.get('rho', 28.0) + beta = kwargs.get('beta', 8/3) + clean_data = datasets.lorenz(n_timesteps=n_timesteps, h=h, sigma=sigma, rho=rho, beta=beta, x0=state) + + elif system == 'rossler': + ''' + (function) def rossler( + n_timesteps: int, + a: float = 0.2, + b: float = 0.2, + c: float = 5.7, + x0: list | ndarray = [-0.1, 0, 0.02], + h: float = 0.1, + **kwargs: Any + ) -> ndarray + ''' + a = kwargs.get('a', 0.2) + b = kwargs.get('b', 0.2) + c = kwargs.get('c', 5.7) + clean_data = datasets.rossler(n_timesteps=n_timesteps, h=h, a=a, b=b, c=c, x0=state) + + elif system == 'multiscroll': + ''' + (function) def multiscroll( + n_timesteps: int, + a: float = 40, + b: float = 3, + c: float = 28, + x0: list | ndarray = [-0.1, 0.5, -0.6], + h: float = 0.01, + **kwargs: Any + ) -> ndarray + ''' + a = kwargs.get('a', 40) + b = kwargs.get('b', 3) + c = kwargs.get('c', 28) + clean_data = datasets.multiscroll(n_timesteps=n_timesteps, h=h, x0=state, a=a, b=b, c=c) + + elif system == 'kuramoto_sivashinsky': + ''' + (function) def kuramoto_sivashinsky( + n_timesteps: int, + warmup: int = 0, + N: int = 128, + M: float = 16, + x0: list | ndarray = None, + h: float = 0.25 + ) -> ndarray + ''' + clean_data = datasets.kuramoto_sivashinsky(n_timesteps=n_timesteps, h=h, **kwargs) + + else: + raise ValueError("未知的混沌系统类型。可选项: 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'") + + if normlization: + clean_data = (clean_data - clean_data.mean(axis=0)) / clean_data.std(axis=0) # Z-score 归一化 + clean_data = clean_data[transient:] + + # 添加噪声 + if noise is not None: + if isinstance(noise, list): + noise_sum = np.zeros_like(clean_data) + for nt in noise: + noise_sum += add_noise(np.zeros_like(clean_data), noise_type=nt, intensity=intensity, **kwargs) + mixed_noise = noise_sum / len(noise) + noisy_data = clean_data + mixed_noise + elif isinstance(noise, str): + noisy_data = add_noise(clean_data, noise_type=noise, intensity=intensity, **kwargs) + else: + raise ValueError("噪声参数 noise 必须为字符串或字符串列表。") + else: + noisy_data = clean_data.copy() + + return clean_data, noisy_data + +if __name__ == "__main__": + # 测试数据加载和噪声添加 + clean_data, noisy_data = load_data(system='lorenz', noise=['gaussian', 'colored'], intensity=0.1, n_timesteps=10000, transient=1000) + print("Clean Data Shape:", clean_data.shape) + print("Noisy Data Shape:", noisy_data.shape) \ No newline at end of file diff --git a/well_pattern_config.py b/well_pattern_config.py new file mode 100644 index 0000000..e69de29