部分样板代码
MinMaxScaler 的 PyTorch 模板
class MinMaxScaler: def __init__(self, feature_range=(0, 1)): self.feature_range = feature_range self.min_ = None self.max_= None self.data_range_= None self.scale_ = None self.is_fitted_ = False
def fit(self, X: torch.Tensor) -> 'MinMaxScaler': if X.dim() == 1: X = X.unsqueeze(1) # 将 1D 转换为 2D self.min_ = X.min(dim=0).values self.max_ = X.max(dim=0).values self.data_range_ = self.max_ - self.min_ zero_range_mask = self.data_range_ == 0 self.data_range_[zero_range_mask] = 1 self.scale_ = (self.feature_range[1] - self.feature_range[0]) / self.data_range_ self.is_fitted_ = True return self
def transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return (X - self.min_) * self.scale_ + self.feature_range[0]
def fit_transform(self, X: torch.Tensor) -> torch.Tensor: self.fit(X) return self.transform(X)
def inverse_transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return (X - self.feature_range[0]) / self.scale_ + self.min_
使用方法
scaler = MinMaxScaler()X_scaled = scaler.fit_transform(X)X_original = scaler.inverse_transform(X_scaled)print(X_original)
StandardScaler 的 PyTorch 模板
class StandardScaler: def __init__(self): self.mean_ = None self.std_ = None self.is_fitted_ = False
def fit(self, X: torch.Tensor) -> "StandardScaler": if X.dim() == 1: X = X.unsqueeze(1) self.mean_ = X.mean(dim=0) self.std_ = X.std(dim=0, unbiased=False) zero_std_mask = self.std_ == 0 self.std_[zero_std_mask] = 1 self.is_fitted_ = True return self
def transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return (X - self.mean_) / self.std_
def fit_transform(self, X: torch.Tensor) -> torch.Tensor: self.fit(X) return self.transform(X)
def inverse_transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return X * self.std_ + self.mean_
RobustScaler 的 PyTorch 模板
class RobustScaler: def __init__(self, quantile_range=(25.0, 75.0)): self.quantile_range = quantile_range self.center_ = None self.scale_ = None self.is_fitted_ = False
def fit(self, X: torch.Tensor) -> "RobustScaler": if X.dim() == 1: X = X.unsqueeze(1) q_min = torch.quantile(X, self.quantile_range[0] / 100.0, dim=0) q_max = torch.quantile(X, self.quantile_range[1] / 100.0, dim=0) self.center_ = torch.median(X, dim=0).values self.scale_ = q_max - q_min zero_scale_mask = self.scale_ == 0 self.scale_[zero_scale_mask] = 1 self.is_fitted_ = True return self
def transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return (X - self.center_) / self.scale_
def fit_transform(self, X: torch.Tensor) -> torch.Tensor: self.fit(X) return self.transform(X)
def inverse_transform(self, X: torch.Tensor) -> torch.Tensor: if not self.is_fitted_: raise ValueError("Scaler is not fitted yet.") return X * self.scale_ + self.center_
校正决定系数的 PyTorch 模板
类实现
class R2Score: def __init__(self, y_true: torch.Tensor, n_features: int): self.__n_features = n_features self.__n_samples = y_true.size(0) self.__y_mean = torch.mean(y_true) self.__y_true = y_true
def __r2(self, y_pred: torch.Tensor) -> torch.Tensor: ssr = torch.sum((self.__y_true - y_pred) ** 2) sst = torch.sum((self.__y_true - self.__y_mean) ** 2) return 1 - ssr / sst
def r2(self, y_pred: torch.Tensor) -> float: return self.__r2(y_pred).item()
def adjusted_r2(self, y_pred: torch.Tensor) -> float: ar2 = 1 - (1 - self.__r2(y_pred)) * (self.__n_samples - 1) / ( self.__n_samples - self.__n_features - 1 ) return ar2.item()
函数实现
def adjusted_r2_score(y_true: torch.Tensor, y_pred: torch.Tensor, n_features: int) -> float: """ 计算校正决定系数 (Adjusted R²)
参数: y_true (Tensor): 实际值,形状为 (n_samples,) y_pred (Tensor): 预测值,形状为 (n_samples,) n_features (int): 预测变量的数量(模型的自变量数量)
返回: float: 校正决定系数 """ # 样本数量 n_samples = y_true.size(0)
# 平均实际值 y_mean = torch.mean(y_true)
# 计算 SSR 和 SST ssr = torch.sum((y_true - y_pred) ** 2) sst = torch.sum((y_true - y_mean) ** 2)
# 计算 R² r2 = 1 - ssr / sst
# 计算 Adjusted R² adjusted_r2 = 1 - (1 - r2) * (n_samples - 1) / (n_samples - n_features - 1)
return adjusted_r2.item()
MLP 的 PyTorch 模板
import torch.nn as nn
class Model(nn.Module): def __init__(self, features_dim: int, output_dim: int, hidden_dim: int, hidden_layers_num: int, nonlinearity: str, layer_norm: bool): super(Model, self).__init__() hidden_layers = [] for _ in range(hidden_layers_num): if layer_norm: hidden_layers.append(nn.LayerNorm(hidden_dim)) hidden_layers.append(nn.Linear(hidden_dim, hidden_dim)) hidden_layers.append(getattr(nn, nonlinearity)()) self.seq = nn.Sequential( nn.Linear(features_dim, hidden_dim), getattr(nn, nonlinearity)(), *hidden_layers, nn.Linear(hidden_dim, output_dim) ) def forward(self, x): return self.seq(x)
训练的 PyTorch 模板
import torchimport numpy as npimport matplotlib.pyplot as plt
# 训练数据features_dim = 1x_train = torch.arange(0.1, 10, 0.1).reshape(-1, features_dim) # 训练数据x_test = torch.arange(0.2, 10, 0.12).reshape(-1, features_dim) # 测试数据torch.manual_seed(42)y_train = torch.sin(x_train) + torch.normal(0, 0.1, x_train.shape) + 100 # 训练标签torch.manual_seed(32)y_test = torch.sin(x_test) + torch.normal(0, 0.15, x_test.shape) + 100 # 测试标签plt.scatter(x_train, y_train, marker='x', s=20)plt.show()
# 归一化input_scaler = MinMaxScaler(feature_range=(-1, 1))output_scaler = MinMaxScaler(feature_range=(-1, 1))
x_train_scaled = input_scaler.fit_transform(x_train)x_test_scaled = input_scaler.transform(x_test)y_train_scaled = output_scaler.fit_transform(y_train)y_test_scaled = output_scaler.transform(y_test)plt.scatter(x_train_scaled, y_train_scaled, marker='x', s=20)plt.show()
# 定义模型model = Model(features_dim=1, output_dim=1, hidden_dim=8, hidden_layers_num=2, nonlinearity='Sigmoid', layer_norm=True)
# 损失函数和优化器loss_fn = nn.MSELoss()optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# 训练epochs = 2000 # 训练轮数losses = np.zeros(epochs)test_losses = np.zeros(epochs)model.train()for epoch in range(epochs): y_pred = model(x_train_scaled) loss = loss_fn(y_pred, y_train_scaled) y_test_pred = model(x_test_scaled) test_loss = loss_fn(y_test_pred, y_test_scaled) optimizer.zero_grad() loss_value = loss.item() test_loss_value = test_loss.item() losses[epoch] = loss_value test_losses[epoch] = test_loss_value loss.backward() optimizer.step() if epoch % 100 == 0: ar2 = adjusted_r2_score(y_train_scaled, y_pred, features_dim) print(f'Epoch {epoch}|{epochs}, Loss: {loss_value:>.6f}, Test Loss: {test_loss_value:>.6f}, Adjusted R²: {ar2:>.6f}')plt.plot(losses, label='Train Loss')plt.plot(test_losses, label='Test Loss')plt.legend()plt.show()
# 预测model.eval()with torch.no_grad(): y_pred_scaled = model(x_test_scaled) y_pred = output_scaler.inverse_transform(y_pred_scaled) plt.scatter(x_test, y_test, marker='x', s=20, label='Actual') plt.plot(x_test, y_pred, label='Pred') plt.legend() plt.show()
# 封装模型class ModelWrapper(nn.Module): def __init__(self, base_model: nn.Module, input_scaler: MinMaxScaler, output_scaler: MinMaxScaler): super(ModelWrapper, self).__init__() self.base_model = base_model self.base_model.eval() self.input_scaler = input_scaler self.output_scaler = output_scaler def forward(self, x: torch.Tensor): x_scaled = self.input_scaler.transform(x) y_pred_scaled = self.base_model(x_scaled) y_pred = self.output_scaler.inverse_transform(y_pred_scaled) return y_pred
model_wrapper = ModelWrapper(model, input_scaler, output_scaler)
# 转换为 ONNXmodel_wrapper.eval() # 转换为 ONNX 格式前,需要将模型设置为评估模式torch.onnx.export( model_wrapper, (torch.randn(1, features_dim, requires_grad=True), ), # 输入数据 "model.onnx", # 输出文件名 opset_version=11, # ONNX 版本 export_params=True, # 是否导出参数 input_names=["input"], # 输入节点名称 output_names=["output"], # 输出节点名称)
部署 ONNX
模型
-
安装
ONNX Runtime
,Flask
:Terminal window pip install onnxpip install onnxruntimepip install flask -
创建一个 Flask 应用程序,并加载
ONNX
模型:from flask import Flask, request, jsonifyimport onnxruntime as ortimport onnxapp = Flask(__name__)model_path = "model.onnx"model = onnx.load(model_path)session = ort.InferenceSession(model.SerializeToString(), providers=["CPUExecutionProvider"])@app.route('/predict', methods=['POST'])def predict():try:data = request.get_json('input')input_name = 'input'output_names = ['output']result = session.run(output_names, {input_name: [[data]]})return jsonify({'code': 0,'msg': 'ok','output': f'{result[0][0][0]:>.8f}'})except Exception as e:return jsonify({'code': -1,'msg': str(e),'output': None})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, debug=True)