201 lines
7.8 KiB
Python
201 lines
7.8 KiB
Python
from reservoirpy import datasets
|
||
import numpy as np
|
||
|
||
def add_noise(data, noise_type='gaussian', intensity=0.1, **kwargs):
|
||
"""
|
||
为输入数据添加噪声。
|
||
|
||
参数:
|
||
data: numpy 数组, 原始数据.
|
||
noise_type: str, 噪声类型,可选 'gaussian'(高斯白噪声)、'colored' 或 '1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声)。
|
||
intensity: float, 噪声强度.
|
||
kwargs: 针对某些噪声类型的额外参数,例如:
|
||
- 对于 'impulse': prob (脉冲发生概率, 默认 0.01)
|
||
- 对于 'sine' : freq (正弦信号频率, 默认 5)
|
||
返回:
|
||
添加噪声后的数据.
|
||
"""
|
||
noise_type = noise_type.lower()
|
||
|
||
# 确保噪声与数据形状匹配,处理多维数据
|
||
if data.ndim > 1:
|
||
n_samples, n_dimensions = data.shape
|
||
else:
|
||
n_samples = len(data)
|
||
n_dimensions = 1
|
||
|
||
if noise_type in ['gaussian', 'gaussian white']:
|
||
noise = intensity * np.random.randn(*data.shape)
|
||
return data + noise
|
||
|
||
elif noise_type in ['colored', '1/f']:
|
||
# 对于多维数据,为每个维度单独生成1/f噪声
|
||
if data.ndim > 1:
|
||
noise = np.zeros_like(data)
|
||
for dim in range(n_dimensions):
|
||
f = np.fft.fftfreq(n_samples)
|
||
f[0] = 1e-10
|
||
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
|
||
factor = intensity / np.sqrt(np.abs(f))
|
||
noise[:, dim] = np.fft.ifft(noise_complex * factor).real
|
||
else:
|
||
f = np.fft.fftfreq(n_samples)
|
||
f[0] = 1e-10
|
||
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
|
||
factor = intensity / np.sqrt(np.abs(f))
|
||
noise = np.fft.ifft(noise_complex * factor).real
|
||
return data + noise
|
||
|
||
elif noise_type == 'impulse':
|
||
noise = np.zeros_like(data)
|
||
prob = kwargs.get('prob', 0.01)
|
||
mask = np.random.rand(*data.shape) < prob
|
||
impulse = intensity * (2 * np.random.rand(*data.shape) - 1)
|
||
noise[mask] = impulse[mask]
|
||
return data + noise
|
||
|
||
elif noise_type == 'sine':
|
||
t = np.arange(n_samples)
|
||
freq = kwargs.get('freq', 5)
|
||
|
||
# 对于多维数据,确保正弦波形状正确
|
||
if data.ndim > 1:
|
||
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples).reshape(-1, 1)
|
||
# 扩展到与数据相同的维度
|
||
sine_wave = np.tile(sine_wave, (1, n_dimensions))
|
||
else:
|
||
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples)
|
||
return data + sine_wave
|
||
|
||
else:
|
||
raise ValueError("未知的噪声类型。可选项:'gaussian', 'colored' (1/f), 'impulse', 'sine'")
|
||
|
||
def load_data(system='lorenz', init='random', noise=None, intensity=0.1, h=0.01, n_timesteps=10000, transient=1000, normlization=True, **kwargs):
|
||
"""
|
||
加载混沌系统数据.
|
||
|
||
参数:
|
||
system: str, 混沌系统类型, 可选 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'。
|
||
init: 初始值. 如果为 'random' 则随机初始化,否则期望传入数组形式的初始值.
|
||
noise: None, str 或 str 列表, 指定要添加的噪声类型. 如果是列表,则混合各噪声 (取平均) 作为混合噪声.
|
||
-- 可选 'gaussian'(高斯白噪声)、'colored' 或 '1/f'(色噪声)、'impulse'(脉冲噪声)、'sine'(正弦噪声)
|
||
intensity: float, 噪声强度.
|
||
h: float, 时间步长 (TimeDelta).
|
||
n_timesteps: int, 时间步数.
|
||
transient: int, 需丢弃的时间步数.
|
||
normlization: bool, 是否对数据进行归一化处理.
|
||
kwargs: 其他系统参数.
|
||
返回:
|
||
(clean_data, noisy_data): 两个 numpy 数组, 分别为干净数据和添加噪声后的数据.
|
||
|
||
各系统默认值:
|
||
- lorenz: rho=28, sigma=10, beta=8/3, h=0.03, x0=[1, 1, 1]
|
||
- rossler: a=0.2, b=0.2, c=5.7, h=0.1, x0=[-0.1, 0, 0.02]
|
||
- multiscroll: a=40, b=3, c=28, h=0.01, x0=[-0.1, 0.5, -0.6]
|
||
- kuramoto_sivashinsky: N=128, M=16, h=0.25, x0=None
|
||
"""
|
||
system = system.lower()
|
||
if init == 'random':
|
||
if system in ['lorenz', 'rossler', 'multiscroll']:
|
||
# 默认3维初始值
|
||
state = np.random.rand(3)
|
||
elif system in ['kuramoto_sivashinsky']:
|
||
state = np.random.rand(1)
|
||
else:
|
||
raise ValueError("未知的混沌系统类型。")
|
||
else:
|
||
state = np.array(init, dtype=float)
|
||
|
||
if system == 'lorenz':
|
||
'''
|
||
(function) def lorenz(
|
||
n_timesteps: int,
|
||
rho: float = 28,
|
||
sigma: float = 10,
|
||
beta: float = 8 / 3,
|
||
x0: list | ndarray = [1, 1, 1],
|
||
h: float = 0.03,
|
||
**kwargs: Any
|
||
) -> ndarray
|
||
'''
|
||
sigma = kwargs.get('sigma', 10.0)
|
||
rho = kwargs.get('rho', 28.0)
|
||
beta = kwargs.get('beta', 8/3)
|
||
clean_data = datasets.lorenz(n_timesteps=n_timesteps, h=h, sigma=sigma, rho=rho, beta=beta, x0=state)
|
||
|
||
elif system == 'rossler':
|
||
'''
|
||
(function) def rossler(
|
||
n_timesteps: int,
|
||
a: float = 0.2,
|
||
b: float = 0.2,
|
||
c: float = 5.7,
|
||
x0: list | ndarray = [-0.1, 0, 0.02],
|
||
h: float = 0.1,
|
||
**kwargs: Any
|
||
) -> ndarray
|
||
'''
|
||
a = kwargs.get('a', 0.2)
|
||
b = kwargs.get('b', 0.2)
|
||
c = kwargs.get('c', 5.7)
|
||
clean_data = datasets.rossler(n_timesteps=n_timesteps, h=h, a=a, b=b, c=c, x0=state)
|
||
|
||
elif system == 'multiscroll':
|
||
'''
|
||
(function) def multiscroll(
|
||
n_timesteps: int,
|
||
a: float = 40,
|
||
b: float = 3,
|
||
c: float = 28,
|
||
x0: list | ndarray = [-0.1, 0.5, -0.6],
|
||
h: float = 0.01,
|
||
**kwargs: Any
|
||
) -> ndarray
|
||
'''
|
||
a = kwargs.get('a', 40)
|
||
b = kwargs.get('b', 3)
|
||
c = kwargs.get('c', 28)
|
||
clean_data = datasets.multiscroll(n_timesteps=n_timesteps, h=h, x0=state, a=a, b=b, c=c)
|
||
|
||
elif system == 'kuramoto_sivashinsky':
|
||
'''
|
||
(function) def kuramoto_sivashinsky(
|
||
n_timesteps: int,
|
||
warmup: int = 0,
|
||
N: int = 128,
|
||
M: float = 16,
|
||
x0: list | ndarray = None,
|
||
h: float = 0.25
|
||
) -> ndarray
|
||
'''
|
||
clean_data = datasets.kuramoto_sivashinsky(n_timesteps=n_timesteps, h=h, **kwargs)
|
||
|
||
else:
|
||
raise ValueError("未知的混沌系统类型。可选项: 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'")
|
||
|
||
if normlization:
|
||
clean_data = (clean_data - clean_data.mean(axis=0)) / clean_data.std(axis=0) # Z-score 归一化
|
||
clean_data = clean_data[transient:]
|
||
|
||
# 添加噪声
|
||
if noise is not None:
|
||
if isinstance(noise, list):
|
||
noise_sum = np.zeros_like(clean_data)
|
||
for nt in noise:
|
||
noise_sum += add_noise(np.zeros_like(clean_data), noise_type=nt, intensity=intensity, **kwargs)
|
||
mixed_noise = noise_sum / len(noise)
|
||
noisy_data = clean_data + mixed_noise
|
||
elif isinstance(noise, str):
|
||
noisy_data = add_noise(clean_data, noise_type=noise, intensity=intensity, **kwargs)
|
||
else:
|
||
raise ValueError("噪声参数 noise 必须为字符串或字符串列表。")
|
||
else:
|
||
noisy_data = clean_data.copy()
|
||
|
||
return clean_data, noisy_data
|
||
|
||
if __name__ == "__main__":
|
||
# 测试数据加载和噪声添加
|
||
clean_data, noisy_data = load_data(system='lorenz', noise=['gaussian', 'colored'], intensity=0.1, n_timesteps=10000, transient=1000)
|
||
print("Clean Data Shape:", clean_data.shape)
|
||
print("Noisy Data Shape:", noisy_data.shape) |