跳到主要内容

工作流

工作流: 可以编排一系列的步骤,然后让它们按照一定的顺序和依赖关系运行;如训练 -> 构建 -> 部署。

创建工作流

https://portal.ailines.cn 中,选择菜单工作流,点击 创建 按钮,如图所示:

workflow list

填写工作流基本信息,并进入 下一步

workflow create

提示

工作流名称:工作流名称,用于便于于其他工作流区分
工作空间:工作流运行中需要使用的工作空间,用于工作步骤中存储读取数据等
开启定时任务?:选择是否需要开始定时执行工作流
Crontab:定时执行表达式,可根据提醒设置定时执行规则
描述:对工作流的描述,用于记录区分工作流

设置步骤和相关属性并 点击右上按钮 保存工作流

workflow step create

提示

名称:需要唯一,且是以字母开头的数字字母组合
镜像:可选择内置或者用户指定仓库里的镜像
数据集:为选填项,也可以选择多个数据集
CPU GPU 内存:选择步骤运行所需要的资源
输入:运行所需要的输入参数,可在命令行中使用{{inputs.parameters.参数名}}格式读取,也可以通过选择提示的值来使用前置步骤的输出作为输入参数
输出:输出结果以便接下来的步骤使用,将运行结果写入到临时文件中(临时文件完整路径可通过环境变量 OUTPUTPARAM输出参数名 获取),并作为输出参数
运行命令:对应需要运行的命令,如运行shell脚本或者python文件等

面板收起时,会自动保存步骤属性

运行工作流

点击列表页的 运行 可以直接运行工作流

workflow run

编辑工作流

点击 编辑 按钮

workflow edit

修改定时任务

点击 修改定时任务 按钮,可以关闭或者修改工作流的定时配置

workflow cron

查看运行历史

点击 运行历史 按钮

workflow history

运行历史界面可以查看此工作流的所有历史

workflow history list

查看日志

在工作流列表页,点击 最近日志 按钮,查看最近运行记录的日志

workflow log

或者在运行历史页,点击 日志,查看历史对应的日志

workflow log

查看监控

在工作流列表页,点击 监控 按钮,查看最近运行记录的监控

workflow monitor

或者在运行历史页,点击 监控,查看历史对应的监控

workflow monitor

删除工作流

在工作流列表页,点击 删除 按钮

workflow delete

内置步骤的使用

构建

使用内置的 构建 步骤

workflow package

提示

名称:需要唯一,且是以字母开头的数字字母组合
模型配置-打包文件:模型文件的完整路径,需要是pickle格式
模型配置-文件类型:pickle格式
服务-名称:构建之后的服务名称
服务-版本前缀:构建使用的版本前缀,构建后会在版本上自动添加时间戳
输入-参数名:调用时输入的参数名称
输入-输入类型:调用时输入的参数类型
输出-参数名:调用时输出的参数名称
输出-输出类型:调用时输出的参数类型
Framework:使用的框架
依赖库:代码依赖的库

面板收起时,会自动保存步骤属性

部署

workflow package workflow package

提示

名称:需要唯一,且是以字母开头的数字字母组合;当部署名称不发生改变时,多次运行将重新部署对应服务
部署来源-来源方式:来源方式可选前置节点已构建服务;选择前置节点,则需要保证该步骤前置为构建服务;选择已构建服务则需要选择已构建完成的服务和版本号
部署模版:选择一个对应的部署模版进行部署服务,部署模版的使用请参考部署模版部分

分布式训练

workflow distributed training

提示

名称:选择已有的分布式训练,分布式训练的使用请参考创建分布式训练

训练 -> 构建 -> 部署 流程示例

创建名称为mnist的数据集,结构如下图

workflow distributed training

提示

数据集的使用,请参照创建数据集
数据来源于

准备训练代码

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import pickle


class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output

def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
if args.dry_run:
break


def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)

print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))


def main():
data_root = os.getenv("DATASET_mnist", ".")
workspace = os.getenv("WORKSPACE", ".")
print('training begin')
print(data_root)
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=1, metavar='N',
help='number of epochs to train (default: 14)')
parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
help='learning rate (default: 1.0)')
parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
help='Learning rate step gamma (default: 0.7)')
parser.add_argument('--dry-run', action='store_true', default=False,
help='quickly check a single pass')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--dataset-folder', type=str, default=data_root, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--save-model', action='store_true', default=False,
help='For Saving the current Model')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
args = parser.parse_args()
# 如果存在GPU则使用GPU
use_cuda = not args.no_cuda and torch.cuda.is_available()

torch.manual_seed(args.seed)

if use_cuda:
print('running on GPU')
device = torch.device("cuda")
else:
device = torch.device("cpu")

train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}
if use_cuda:
cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)

dataset1 = datasets.MNIST(args.dataset_folder, train=True, download=True,
transform=transforms.ToTensor())
dataset2 = datasets.MNIST(args.dataset_folder, train=False,
transform=transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()

if args.save_model:
print('save model')
torch.save(model, workspace + "/mnist/model/mnist_cnn.pth")


if __name__ == '__main__':
main()
提示

data_root = os.getenv("DATASET_mnist", ".")
workspace = os.getenv("WORKSPACE", ".")

以上代码使用了工作流步骤中内置的环境变量,工作流步骤中内置的环境变量请参照内置环境变量

可使用Notebook将代码保存至对应的工作空间下,路径如下图

workflow sample code

提示

model文件夹:用于保存训练后的模型,且其中要包含Net定义文件

workflow sample code

提示

Net自定义文件:Net定义文件

比如如下代码

import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output

使用 自定义步骤 创建训练步骤,并设置属性如下

workflow sample training step

提示

镜像:选择系统内置的对应框架镜像或者使用用户指定的镜像(用户指定需进行镜像配置)
数据集:选择mnist名称的数据集
运行命令:运行工作空间下对应的python文件

创建 构建 步骤,并设置打包参数

workflow sample build step

提示

Framework:模型所使用的框架
模型路径: 模型打包文件存储路径, 上例模型存储路径为mnist/model/mnist_cnn.pth
服务名称: 模型打包文件对应的服务名称
版本前缀: 服务的版本前缀(工作流将自动添加一个时间戳作为后缀)
输入输出: 模型数输入和输出名称和类型,目前输出仅支持NumpyNdarray类型
输入前置处理函数: 如果模型的输入需要根据自身业务进行一定的处理,则请将处理内容加入到函数中。如果您不提供相应的处理,系统将提供默认行为。

参数具体配置也可参考构建相关文档
此例子为PyTorch框架,其他框架的构建使用
PyTorch
Sklearn
Tensorflow

创建 部署 步骤,并设置属性

workflow sample deploy step

提示

部署来源:前置节点选择构建步骤的名称
部署模版:选择已有的部署模版

点击右上的 保存工作流 进行工作流的保存后,可以选择在列表页进行运行或者设置定时运行

输入与输出参数的使用简单示例

创建带有输出参数的步骤

workflow sample output step

提示

输出:输出变量的参数名称
运行命令:使用命令或者代码将需要输出的内容写入到 $OUTPUT_PARAM_输出参数名(环境变量名) 文件中,以便后置步骤可以使用输入参数来接收

环境变量 OUTPUT_PARAM_输出参数名 为完全的输出参数对应的文件名称

创建带有输入参数的步骤,并使用前置步骤的输出

workflow sample output step

提示

输入:填写 参数名称参数值 从提醒项中选择step1的 输出参数名
运行命令:命令中使用 {{inputs.parameters.输入参数名}} 获取对应的输入参数值,或者利用命令行传入到运行的代码中,如 python main.py {{inputs.parameters.inputval}}

保存工作流,并在列表页面执行运行操作,得到step2的结果为step1的输出参数值"hello"

workflow sample output step

内置环境变量

环境变量名说明
WORKSPACE对应的工作空间路径
DATASET_数据集名对应挂载的数据集路径
OUTPUT_PARAM_输出变量名输出参数对应的文件全路径,使用命令或者代码将需要输出参数的值直接写入此文件