PyTorch卷积の实现(CUDA)
参考链接:知乎
搭建项目
项目地址:Github
Windows环境
提前安装:
- NVIDIA GPU Computing Toolkit
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117
编译
运行python setup.py develop,就能一键编译和安装。如果运行后没有报编译错误,就可以把实现的卷积用起来了

python setup.py develop编译
把源文件加入cpp_src里。之后,把CppExtension改成CUDAExtension。
1 | from setuptools import setup |
CUDA实现
和CPU实现基本一样 自定义算子:复现CPU版本的二维卷积
但是在CUDA版本中,要实现my_conv_im2col_cuda这个函数
my_conv_cuda.cu代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78// Modify from https://github.com/open-mmlab/mmcv/blob/my_conv/mmcv/ops/csrc/common/cuda/deform_conv_cuda_kernel.cuh
// Copyright (c) OpenMMLab. All rights reserved.
template <typename T>
__global__ void my_conv_im2col_gpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
CUDA_1D_KERNEL_LOOP(index, n)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;
const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;
for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}
void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;
AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "my_conv_im2col_gpu", [&]
{ my_conv_im2col_gpu_kernel<scalar_t><<<GET_BLOCKS(num_kernels),
THREADS_PER_BLOCK, 0,
at::cuda::getCurrentCUDAStream()>>>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });
AT_CUDA_CHECK(cudaGetLastError());
}
测试
把device_name改成cuda:01
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40import torch
import torch.nn as nn
from panoflow_cuda.core.op.my_conv import MyConv2d
inc = 3
outc = 4
img_shaspe = (50, 50)
device_name = 'cuda:0'
# device_name = 'cpu'
open_bias = True
def test_one():
ts = torch.ones([1, 1, 3, 3]).to(device_name)
layer = nn.Conv2d(1, 1, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(1, 1, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)
def test_two():
ts = torch.rand([1, inc, *img_shaspe]).to(device_name)
layer = nn.Conv2d(inc, outc, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(inc, outc, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)
if __name__ == '__main__':
test_one()
test_two()
没报错即可