AI red teamer (人工智能红队)系列27 – 人工智能信息安全应用 – 数据转换
1. 数据转换概述
数据转换将原始数据转换成适合机器学习算法的格式,主要包括:
- 分类特征编码:将文本类别转换为数值
- 数值特征变换:处理偏斜分布和异常值
- 特征缩放:统一数值范围
- 数据分割:划分训练/验证/测试集
2. 分类特征编码
2.1 One-Hot 编码
One-Hot编码为每个类别创建一个二进制特征,避免引入人工顺序关系。
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
# 示例数据
df = pd.DataFrame({
'protocol': ['TCP', 'UDP', 'HTTP', 'TCP', 'HTTPS'],
'threat_level': [0, 1, 0, 2, 0]
})
# One-Hot编码
encoder = OneHotEncoder(sparse_output=False)
encoded = encoder.fit_transform(df[['protocol']])
encoded_df = pd.DataFrame(encoded, columns=encoder.get_feature_names_out(['protocol']))
# 合并到原数据
df_final = pd.concat([df.drop('protocol', axis=1), encoded_df], axis=1)
print(df_final.head())
2.2 标签编码
适用于有序分类变量:
from sklearn.preprocessing import LabelEncoder
# 有序类别
severity_order = ['low', 'medium', 'high', 'critical']
severity_mapping = {cat: idx for idx, cat in enumerate(severity_order)}
df['severity'] = ['low', 'high', 'medium', 'critical', 'low']
df['severity_encoded'] = df['severity'].map(severity_mapping)
2.3 目标编码
基于目标变量统计信息编码,适用于高基数分类变量:
# 计算每个类别对应的目标变量均值
target_means = df.groupby('protocol')['threat_level'].mean()
df['protocol_target_encoded'] = df['protocol'].map(target_means)
3. 处理偏斜分布
网络安全数据中的数值特征(如字节数、连接数)经常呈现偏斜分布,需要进行转换。
3.1 对数转换
import numpy as np
from scipy import stats
# 创建偏斜数据示例
np.random.seed(42)
skewed_data = np.random.lognormal(mean=5, sigma=2, size=1000)
# 检查偏斜度
original_skew = stats.skew(skewed_data)
print(f"原始偏斜度: {original_skew:.3f}")
# 对数转换
log_transformed = np.log1p(skewed_data) # log(x+1) 避免log(0)
new_skew = stats.skew(log_transformed)
print(f"转换后偏斜度: {new_skew:.3f}")
3.2 Box-Cox转换
from scipy.stats import boxcox
# 只适用于正值数据
if skewed_data.min() > 0:
boxcox_data, lambda_param = boxcox(skewed_data)
boxcox_skew = stats.skew(boxcox_data)
print(f"Box-Cox转换后偏斜度: {boxcox_skew:.3f}")
4. 特征缩放
不同特征的数值范围差异很大时,需要进行缩放。
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# 示例数据
data = np.array([[1, 2000, 0.1],
[2, 5000, 0.3],
[1, 1500, 0.2]])
# 标准化 (均值0,标准差1)
scaler = StandardScaler()
standardized = scaler.fit_transform(data)
# 归一化 (范围0-1)
minmax_scaler = MinMaxScaler()
normalized = minmax_scaler.fit_transform(data)
# 鲁棒缩放 (对异常值不敏感)
robust_scaler = RobustScaler()
robust_scaled = robust_scaler.fit_transform(data)
5. 特征工程
5.1 安全相关特征组合
def create_security_features(df):
"""创建安全相关特征"""
df_new = df.copy()
# 时间特征
if 'timestamp' in df.columns:
df_new['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df_new['is_night'] = (df_new['hour'] < 8) | (df_new['hour'] > 20)
df_new['is_weekend'] = pd.to_datetime(df['timestamp']).dt.dayofweek >= 5
# 网络特征
if 'bytes_transferred' in df.columns and 'packet_count' in df.columns:
df_new['bytes_per_packet'] = df['bytes_transferred'] / (df['packet_count'] + 1)
# 端口风险评分
if 'destination_port' in df.columns:
risky_ports = {23, 135, 139, 445, 1433, 3389}
df_new['port_risk'] = df['destination_port'].isin(risky_ports).astype(int)
return df_new
5.2 多项式特征
from sklearn.preprocessing import PolynomialFeatures
# 创建二次多项式特征
poly = PolynomialFeatures(degree=2, include_bias=False)
numerical_cols = ['bytes_transferred', 'packet_count']
poly_features = poly.fit_transform(df[numerical_cols])
# 获取特征名
feature_names = poly.get_feature_names_out(numerical_cols)
poly_df = pd.DataFrame(poly_features, columns=feature_names)
6. 特征选择
6.1 统计特征选择
from sklearn.feature_selection import SelectKBest, f_classif
# 选择最佳k个特征
X = df.select_dtypes(include=[np.number]).drop('threat_level', axis=1)
y = df['threat_level']
selector = SelectKBest(score_func=f_classif, k=10)
X_selected = selector.fit_transform(X, y)
# 获取选中的特征
selected_features = X.columns[selector.get_support()]
print(f"选中特征: {list(selected_features)}")
6.2 基于重要性的选择
from sklearn.ensemble import RandomForestClassifier
# 使用随机森林获取特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
# 选择重要性大于阈值的特征
importance_threshold = 0.01
important_features = X.columns[rf.feature_importances_ > importance_threshold]
print(f"重要特征: {list(important_features)}")
7. 数据分割
7.1 基础分割
from sklearn.model_selection import train_test_split
# 准备特征和目标
X = df.drop('threat_level', axis=1)
y = df['threat_level']
# 分割:60% 训练,20% 验证,20% 测试
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)
print(f"训练集: {len(X_train)}, 验证集: {len(X_val)}, 测试集: {len(X_test)}")
7.2 时间序列分割
对于安全日志等时序数据,应按时间顺序分割:
# 按时间排序
df_sorted = df.sort_values('timestamp')
# 按比例分割
n = len(df_sorted)
train_end = int(n * 0.6)
val_end = int(n * 0.8)
train_data = df_sorted.iloc[:train_end]
val_data = df_sorted.iloc[train_end:val_end]
test_data = df_sorted.iloc[val_end:]
8. 小结
数据转换的核心要点:
- 编码选择:根据变量性质选择合适的编码方法
- 分布转换:处理偏斜分布提升模型性能
- 特征缩放:统一数值范围避免某些特征主导
- 特征工程:结合领域知识创建有价值的新特征
- 数据分割:合理分割确保模型评估的可靠性
转换完成后,数据就可以用于训练机器学习模型了。
Comments NOTHING