denoise_RC/tools.py
Re1b00m 18ec90e437 coding
1.complete exp2
2.add some tools
2025-03-11 21:49:50 +08:00

237 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import numpy as np
from matplotlib import pyplot as plt
from reservoirpy import datasets
def print_exec_time(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f"Function {func.__name__} executed in {time.time() - start:.4f} seconds.")
return result
return wrapper
def add_noise(data, noise_type='gaussian', intensity=0.1, **kwargs):
"""
为输入数据添加噪声。
参数:
data: numpy 数组, 原始数据.
noise_type: str, 噪声类型,可选 'gaussian'(高斯白噪声)、'colored''1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声)。
intensity: float, 噪声强度.
kwargs: 针对某些噪声类型的额外参数,例如:
- 对于 'impulse': prob (脉冲发生概率, 默认 0.01)
- 对于 'sine' : freq (正弦信号频率, 默认 5)
返回:
添加噪声后的数据.
"""
noise_type = noise_type.lower()
# 确保噪声与数据形状匹配,处理多维数据
if data.ndim > 1:
n_samples, n_dimensions = data.shape
else:
n_samples = len(data)
n_dimensions = 1
if noise_type in ['gaussian', 'gaussian white']:
noise = intensity * np.random.randn(*data.shape)
return data + noise
elif noise_type in ['colored', '1/f']:
# 对于多维数据为每个维度单独生成1/f噪声
if data.ndim > 1:
noise = np.zeros_like(data)
for dim in range(n_dimensions):
f = np.fft.fftfreq(n_samples)
f[0] = 1e-10
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
factor = intensity / np.sqrt(np.abs(f))
noise[:, dim] = np.fft.ifft(noise_complex * factor).real
else:
f = np.fft.fftfreq(n_samples)
f[0] = 1e-10
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
factor = intensity / np.sqrt(np.abs(f))
noise = np.fft.ifft(noise_complex * factor).real
return data + noise
elif noise_type == 'impulse':
noise = np.zeros_like(data)
prob = kwargs.get('prob', 0.01)
mask = np.random.rand(*data.shape) < prob
impulse = intensity * (2 * np.random.rand(*data.shape) - 1)
noise[mask] = impulse[mask]
return data + noise
elif noise_type == 'sine':
t = np.arange(n_samples)
freq = kwargs.get('freq', 5)
# 对于多维数据,确保正弦波形状正确
if data.ndim > 1:
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples).reshape(-1, 1)
# 扩展到与数据相同的维度
sine_wave = np.tile(sine_wave, (1, n_dimensions))
else:
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples)
return data + sine_wave
else:
raise ValueError("未知的噪声类型。可选项:'gaussian', 'colored' (1/f), 'impulse', 'sine'")
def load_data(system='lorenz', init='random', noise=None, intensity=0.1, h=0.01, n_timesteps=10000, transient=1000, normlization=True, **kwargs):
"""
加载混沌系统数据.
参数:
system: str, 混沌系统类型, 可选 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'
init: 初始值. 如果为 'random' 则随机初始化,否则期望传入数组形式的初始值.
noise: None, str 或 str 列表, 指定要添加的噪声类型. 如果是列表,则混合各噪声 (取平均) 作为混合噪声.
-- 可选 'gaussian'(高斯白噪声)、'colored''1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声)
intensity: float, 噪声强度.
h: float, 时间步长 (TimeDelta).
n_timesteps: int, 时间步数.
transient: int, 需丢弃的时间步数.
normlization: bool, 是否对数据进行归一化处理.
kwargs: 其他系统参数.
返回:
(clean_data, noisy_data): 两个 numpy 数组, 分别为干净数据和添加噪声后的数据.
各系统默认值:
- lorenz: rho=28, sigma=10, beta=8/3, h=0.03, x0=[1, 1, 1]
- rossler: a=0.2, b=0.2, c=5.7, h=0.1, x0=[-0.1, 0, 0.02]
- multiscroll: a=40, b=3, c=28, h=0.01, x0=[-0.1, 0.5, -0.6]
- kuramoto_sivashinsky: N=128, M=16, h=0.25, x0=None
"""
system = system.lower()
if init == 'random':
if system in ['lorenz', 'rossler', 'multiscroll']:
# 默认3维初始值
state = np.random.rand(3)
elif system in ['kuramoto_sivashinsky']:
state = np.random.rand(1)
else:
raise ValueError("未知的混沌系统类型。")
else:
state = np.array(init, dtype=float)
if system == 'lorenz':
'''
(function) def lorenz(
n_timesteps: int,
rho: float = 28,
sigma: float = 10,
beta: float = 8 / 3,
x0: list | ndarray = [1, 1, 1],
h: float = 0.03,
**kwargs: Any
) -> ndarray
'''
sigma = kwargs.get('sigma', 10.0)
rho = kwargs.get('rho', 28.0)
beta = kwargs.get('beta', 8/3)
clean_data = datasets.lorenz(n_timesteps=n_timesteps, h=h, sigma=sigma, rho=rho, beta=beta, x0=state)
elif system == 'rossler':
'''
(function) def rossler(
n_timesteps: int,
a: float = 0.2,
b: float = 0.2,
c: float = 5.7,
x0: list | ndarray = [-0.1, 0, 0.02],
h: float = 0.1,
**kwargs: Any
) -> ndarray
'''
a = kwargs.get('a', 0.2)
b = kwargs.get('b', 0.2)
c = kwargs.get('c', 5.7)
clean_data = datasets.rossler(n_timesteps=n_timesteps, h=h, a=a, b=b, c=c, x0=state)
elif system == 'multiscroll':
'''
(function) def multiscroll(
n_timesteps: int,
a: float = 40,
b: float = 3,
c: float = 28,
x0: list | ndarray = [-0.1, 0.5, -0.6],
h: float = 0.01,
**kwargs: Any
) -> ndarray
'''
a = kwargs.get('a', 40)
b = kwargs.get('b', 3)
c = kwargs.get('c', 28)
clean_data = datasets.multiscroll(n_timesteps=n_timesteps, h=h, x0=state, a=a, b=b, c=c)
elif system == 'kuramoto_sivashinsky':
'''
(function) def kuramoto_sivashinsky(
n_timesteps: int,
warmup: int = 0,
N: int = 128,
M: float = 16,
x0: list | ndarray = None,
h: float = 0.25
) -> ndarray
'''
clean_data = datasets.kuramoto_sivashinsky(n_timesteps=n_timesteps, h=h, **kwargs)
else:
raise ValueError("未知的混沌系统类型。可选项: 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'")
if normlization:
clean_data = (clean_data - clean_data.mean(axis=0)) / clean_data.std(axis=0) # Z-score 归一化
clean_data = clean_data[transient:]
# 添加噪声
if noise is not None:
if isinstance(noise, list):
noise_sum = np.zeros_like(clean_data)
for nt in noise:
noise_sum += add_noise(np.zeros_like(clean_data), noise_type=nt, intensity=intensity, **kwargs)
mixed_noise = noise_sum / len(noise)
noisy_data = clean_data + mixed_noise
elif isinstance(noise, str):
noisy_data = add_noise(clean_data, noise_type=noise, intensity=intensity, **kwargs)
else:
raise ValueError("噪声参数 noise 必须为字符串或字符串列表。")
else:
noisy_data = clean_data.copy()
return clean_data, noisy_data
def visualize_data_diff(clean_data, noisy_data, prediction, warmup=0, title_prefix=""):
plt.figure(figsize=(12, 6))
for i in range(3):
plt.subplot(3, 2, 2*i+1)
plt.plot(noisy_data[warmup:, i], label='noisy')
plt.plot(clean_data[warmup:, i], label='clean')
plt.title(f"{title_prefix} Dim {i+1} (Noisy vs Clean)")
plt.legend()
plt.subplot(3, 2, 2*i+2)
plt.plot(prediction[warmup:, i], label='prediction')
plt.plot(clean_data[warmup:, i], label='clean')
plt.title(f"{title_prefix} Dim {i+1} (Prediction vs Clean)")
plt.legend()
plt.tight_layout()
def visualize_psd_diff(clean_data, noisy_data, prediction, warmup=0, title_prefix=""):
plt.figure(figsize=(12, 6))
for i in range(3):
plt.subplot(3, 1, i+1)
plt.psd(clean_data[warmup:, i], NFFT=1024, label='clean')
plt.psd(noisy_data[warmup:, i], NFFT=1024, label='noisy')
plt.psd(prediction[warmup:, i], NFFT=1024, label='prediction')
plt.title(f"{title_prefix} Dim {i+1} PSD Comparison")
plt.legend()
plt.tight_layout()
if __name__ == "__main__":
# 测试数据加载和噪声添加
clean_data, noisy_data = load_data(system='lorenz', noise=['gaussian', 'colored'], intensity=0.1, n_timesteps=10000, transient=1000)
print("Clean Data Shape:", clean_data.shape)
print("Noisy Data Shape:", noisy_data.shape)