跳转到内容

部分样板代码

MinMaxScaler 的 PyTorch 模板

class MinMaxScaler:
def __init__(self, feature_range=(0, 1)):
self.feature_range = feature_range
self.min_ = None
self.max_= None
self.data_range_= None
self.scale_ = None
self.is_fitted_ = False
def fit(self, X: torch.Tensor) -> 'MinMaxScaler':
if X.dim() == 1:
X = X.unsqueeze(1) # 将 1D 转换为 2D
self.min_ = X.min(dim=0).values
self.max_ = X.max(dim=0).values
self.data_range_ = self.max_ - self.min_
zero_range_mask = self.data_range_ == 0
self.data_range_[zero_range_mask] = 1
self.scale_ = (self.feature_range[1] - self.feature_range[0]) / self.data_range_
self.is_fitted_ = True
return self
def transform(self, X: torch.Tensor) -> torch.Tensor:
if not self.is_fitted_:
raise ValueError("Scaler is not fitted yet.")
return (X - self.min_) * self.scale_ + self.feature_range[0]
def fit_transform(self, X: torch.Tensor) -> torch.Tensor:
self.fit(X)
return self.transform(X)
def inverse_transform(self, X: torch.Tensor) -> torch.Tensor:
if not self.is_fitted_:
raise ValueError("Scaler is not fitted yet.")
return (X - self.feature_range[0]) / self.scale_ + self.min_

使用方法

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
X_original = scaler.inverse_transform(X_scaled)
print(X_original)

校正决定系数的 PyTorch 模板

def adjusted_r2_score(y_true: torch.Tensor, y_pred: torch.Tensor, n_features: int) -> float:
"""
计算校正决定系数 (Adjusted R²)
参数:
y_true (Tensor): 实际值,形状为 (n_samples,)
y_pred (Tensor): 预测值,形状为 (n_samples,)
n_features (int): 预测变量的数量(模型的自变量数量)
返回:
float: 校正决定系数
"""
# 样本数量
n_samples = y_true.size(0)
# 平均实际值
y_mean = torch.mean(y_true)
# 计算 SSR 和 SST
ssr = torch.sum((y_true - y_pred) ** 2)
sst = torch.sum((y_true - y_mean) ** 2)
# 计算 R²
r2 = 1 - ssr / sst
# 计算 Adjusted R²
adjusted_r2 = 1 - (1 - r2) * (n_samples - 1) / (n_samples - n_features - 1)
return adjusted_r2.item()

MLP 的 PyTorch 模板

import torch.nn as nn
class Model(nn.Module):
def __init__(self, features_dim: int, output_dim: int, hidden_dim: int, hidden_layers_num: int, nonlinearity: str, layer_norm: bool):
super(Model, self).__init__()
hidden_layers = []
for _ in range(hidden_layers_num):
if layer_norm:
hidden_layers.append(nn.LayerNorm(hidden_dim))
hidden_layers.append(nn.Linear(hidden_dim, hidden_dim))
hidden_layers.append(getattr(nn, nonlinearity)())
self.seq = nn.Sequential(
nn.Linear(features_dim, hidden_dim),
getattr(nn, nonlinearity)(),
*hidden_layers,
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.seq(x)

训练的 PyTorch 模板

import torch
import numpy as np
import matplotlib.pyplot as plt
# 训练数据
features_dim = 1
x_train = torch.arange(0.1, 10, 0.1).reshape(-1, features_dim) # 训练数据
x_test = torch.arange(0.2, 10, 0.12).reshape(-1, features_dim) # 测试数据
torch.manual_seed(42)
y_train = torch.sin(x_train) + torch.normal(0, 0.1, x_train.shape) + 100 # 训练标签
torch.manual_seed(32)
y_test = torch.sin(x_test) + torch.normal(0, 0.15, x_test.shape) + 100 # 测试标签
plt.scatter(x_train, y_train, marker='x', s=20)
plt.show()
# 归一化
input_scaler = MinMaxScaler(feature_range=(-1, 1))
output_scaler = MinMaxScaler(feature_range=(-1, 1))
x_train_scaled = input_scaler.fit_transform(x_train)
x_test_scaled = input_scaler.transform(x_test)
y_train_scaled = output_scaler.fit_transform(y_train)
y_test_scaled = output_scaler.transform(y_test)
plt.scatter(x_train_scaled, y_train_scaled, marker='x', s=20)
plt.show()
# 定义模型
model = Model(features_dim=1, output_dim=1, hidden_dim=8, hidden_layers_num=2, nonlinearity='Sigmoid', layer_norm=True)
# 损失函数和优化器
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# 训练
epochs = 2000 # 训练轮数
losses = np.zeros(epochs)
test_losses = np.zeros(epochs)
model.train()
for epoch in range(epochs):
y_pred = model(x_train_scaled)
loss = loss_fn(y_pred, y_train_scaled)
y_test_pred = model(x_test_scaled)
test_loss = loss_fn(y_test_pred, y_test_scaled)
optimizer.zero_grad()
loss_value = loss.item()
test_loss_value = test_loss.item()
losses[epoch] = loss_value
test_losses[epoch] = test_loss_value
loss.backward()
optimizer.step()
if epoch % 100 == 0:
ar2 = adjusted_r2_score(y_train_scaled, y_pred, features_dim)
print(f'Epoch {epoch}|{epochs}, Loss: {loss_value:>.6f}, Test Loss: {test_loss_value:>.6f}, Adjusted R²: {ar2:>.6f}')
plt.plot(losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.legend()
plt.show()
# 预测
model.eval()
with torch.no_grad():
y_pred_scaled = model(x_test_scaled)
y_pred = output_scaler.inverse_transform(y_pred_scaled)
plt.scatter(x_test, y_test, marker='x', s=20, label='Actual')
plt.plot(x_test, y_pred, label='Pred')
plt.legend()
plt.show()
# 封装模型
class ModelWrapper(nn.Module):
def __init__(self, base_model: nn.Module, input_scaler: MinMaxScaler, output_scaler: MinMaxScaler):
super(ModelWrapper, self).__init__()
self.base_model = base_model
self.base_model.eval()
self.input_scaler = input_scaler
self.output_scaler = output_scaler
def forward(self, x: torch.Tensor):
x_scaled = self.input_scaler.transform(x)
y_pred_scaled = self.base_model(x_scaled)
y_pred = self.output_scaler.inverse_transform(y_pred_scaled)
return y_pred
model_wrapper = ModelWrapper(model, input_scaler, output_scaler)
# 转换为 ONNX
model_wrapper.eval() # 转换为 ONNX 格式前,需要将模型设置为评估模式
torch.onnx.export(
model_wrapper,
(torch.randn(1, features_dim, requires_grad=True), ), # 输入数据
"model.onnx", # 输出文件名
opset_version=11, # ONNX 版本
export_params=True, # 是否导出参数
input_names=["input"], # 输入节点名称
output_names=["output"], # 输出节点名称
)

部署 ONNX 模型

  1. 安装 ONNX Runtime, Flask:

    Terminal window
    pip install onnx
    pip install onnxruntime
    pip install flask
  2. 创建一个 Flask 应用程序,并加载 ONNX 模型:

    from flask import Flask, request, jsonify
    import onnxruntime as ort
    import onnx
    app = Flask(__name__)
    model_path = "model.onnx"
    model = onnx.load(model_path)
    session = ort.InferenceSession(model.SerializeToString(), providers=["CPUExecutionProvider"])
    @app.route('/predict', methods=['POST'])
    def predict():
    try:
    data = request.get_json('input')
    input_name = 'input'
    output_names = ['output']
    result = session.run(output_names, {input_name: [[data]]})
    return jsonify({
    'code': 0,
    'msg': 'ok',
    'output': f'{result[0][0][0]:>.8f}'
    })
    except Exception as e:
    return jsonify({
    'code': -1,
    'msg': str(e),
    'output': None
    })
    if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)