做简历的网站都有哪些,百度海南分公司,塑胶制品塘厦东莞网站建设,销售网站建设怎么样1. Abstract
本文介绍一个工具 PreDataLoader#xff0c;它包装 torch.utils.data.DataLoader#xff0c;接收该类的一个实例 loader#xff0c;启动一个线程 t#xff0c;创建一个队列 q#xff0c;t 将 loader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数…1. Abstract
本文介绍一个工具 PreDataLoader它包装 torch.utils.data.DataLoader接收该类的一个实例 loader启动一个线程 t创建一个队列 qt 将 loader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间。代码
class PreDataLoader(object):Author: Yuwei from https://www.zhihu.com/people/aewil-zheng, with few changes** 包装 torch.utils.data.DataLoader, 接收该类的一个实例 loader, 启动一个线程 t, 创建一个队列 qt 将 loader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间** 若提供了 cuda device, 数据将直接被加载到 GPU 上def __init__(self, loader, deviceNone, queue_size2)::param loader: torch.utils.data.DataLoader:param device: torch.device(cuda or cpu), to use cpu, set None:param queue_size: the number of samples to be preloadedself.__loader loaderself.__device deviceself.__queue_size queue_sizeself.__load_stream torch.cuda.Stream(devicedevice) \if str(device).startswith(cuda) else None # 如果提供了 cuda device, 则创建 cuda 流self.__queue Queue(maxsizeself.__queue_size)self.__idx 0self.__worker Thread(targetself._load_loop)self.__worker.setDaemon(True)self.__worker.start()def _load_loop(self): 不断的将数据加载到队列里 if str(self.__device).startswith(cuda):logging.info(f data will be preloaded into device \{self.__device}\)logging.info(f this may cost more GPU memory!!!)# The loop that will load into the queue in the backgroundtorch.cuda.set_device(self.__device)while True:for sample in self.__loader:self.__queue.put(self._load_instance(sample))else:while True:for sample in self.__loader:self.__queue.put(sample)def _load_instance(self, sample): 将 batch 数据从 CPU 加载到 GPU 中 if torch.is_tensor(sample):with torch.cuda.stream(self.__load_stream):return sample.to(self.__device, non_blockingTrue)elif sample is None or type(sample) str:return sampleelif isinstance(sample, dict):return {k: self._load_instance(v) for k, v in sample.items()}else:return [self._load_instance(s) for s in sample]def __iter__(self):self.__idx 0return selfdef __next__(self):# 加载线程挂了if not self.__worker.is_alive() and self.__queue.empty():self.__idx 0self.__queue.join()self.__worker.join()raise StopIteration# 一个 epoch 加载完了elif self.__idx len(self.__loader):self.__idx 0raise StopIteration# 下一个 batchelse:out self.__queue.get()self.__queue.task_done()self.__idx 1return outdef next(self):return self.__next__()def __len__(self):return len(self.__loader)propertydef sampler(self):return self.__loader.samplerpropertydef dataset(self):return self.__loader.dataset如果你对实现技术细节不感兴趣也可直接拿来用。后面我将对相关细节展开讨论包括
python 中的并发与并行cuda 流torch.cuda.Stream(devicedevice)
2. python 中的并发与并行
总所周知由于 Global Interpreter Lock (GIL) 的存在Python 语言中任何时间点只有一个线程在执行即便在多核 CPU 上Python 的多线程也无法实现真正的并行计算。 GIL 的原因在于 Python 的内存管理并不是线程安全的。为了防止多个线程同时操作一个对象造成数据混乱的问题Python 设定了 GIL 来限制多线程的并发执行。因此尽管你可以在 Python 中创建多线程并且看起来他们是同时运行的但实质上在任一时刻只有一个线程在执行。 既然如此上面代码使用多线程是如何提高程序的效率的再看 然而如果你的程序是 IO 密集型的例如大量的网络请求或文件读写操作那么使用多线程还是能显著提高程序的效率的因为在等待 IO 的过程中其他线程还可以继续执行。 数据的预加载应该算是 IO 吧那模型计算和数据加载能并行吗
2.1 Numpy 和 PyTorch 底层计算是多线程并行的
Numpy 的底层实现是 C 语言计算速度和并发性远胜于 Python当我们使用 numpy 进行计算时特别是复杂的矩阵运算Python 程序会把这个任务抛给底层的 C 语言进行计算从而能够使用 CPU 多核。验证
import time
import numpy as npdef dot():start time.time()a np.random.randn(10000, 10000)b np.random.randn(10000, 10000)np.dot(a, b)end time.time()print(end - start)dot()验证代码用 numpy.dot() 计算两个 10000 10000 10000 维的矩阵乘法观察 CPU 的使用效率(i5-10400,6核心12线程)发现 CPU 使用率很快从不足 20% 提升至 80% 左右。计算时间约为 15s。 为了确定是否真的使用了多核再设计一个 Python 计算程序
import timedef add():cnt 1start time.time()for i in range(500000000): # 累加cnt 1end time.time()print(end - start)add()五亿次加法运算耗时约 20sCPU 使用率全程维持在 20% 以下。如此说来numpy 确实是在使用多核并行计算。 下面看一看 Python 多线程能不能使它们并行计算
import threading
import time
import numpy as npdef dot():start time.time()a np.random.randn(10000, 10000)b np.random.randn(10000, 10000)np.dot(a, b)end time.time()print(end - start)def add():cnt 1start time.time()for i in range(500000000):cnt 1end time.time()print(end - start)t threading.Thread(targetdot)s time.time()
add()
t.start()
t1.join()
e time.time()
print(e - s)输出
15.057043313980103
23.129913806915283
23.13091516494751如果说整个程序只能同时使用一个 CPU 核那么整体计算时间应该是两部分计算时间的和 35s 左右但这里只用了 23s可见 numpy 底层并行计算是实锤了。而且这两个函数的计算是并行的即 np.dot() 在计算的时候add() 也在计算。为什么 add 计算相比其单独运行时多了 3s而 np.dot() 计算时间基本没变
可以排除 CPU 资源不够的可能否则的话np.dot() 的计算时间也要加长再者我观察了 CPU 利用率全程未达到 100%。我觉得这是线程切换的开销add() 可能不是一直在运行的多个 Python 线程还是只能使用一个 CPU 核线程之间交替执行只不过 np.dot() 线程在离开后底层运行还在继续而 add() 线程离开后其不再运行。即有那么 3s 时间add() 没运行“单核 CPU” 转向了线程 np.dot() 检查计算结果是否已返回。
再增加一个 numpy 计算任务线程
...
t1 threading.Thread(targetdot)
t2 threading.Thread(targetdot)s time.time()
add()
t1.start()
t2.start()
t1.join()
t2.join()
e time.time()
print(e - s)输出
25.624603986740112
27.81219220161438
30.751672983169556
30.752644538879395时间增加了不少基本快赶上计算一次 dot() 时间的两倍了。这大概是由于 CPU 的计算达到了极限 CPU 利用率长时间维持在 100%。
以上验证对于 PyTorch 也是一样的。
结论numpy 和 pytorch 的计算不受 GIL 的限制可以使用 CPU 多核一个线程中numpy 和 pytorch 将计算丢给底层的 C/C 语言后“等待计算结果”类似于 IO会释放 GIL 锁而计算还在继续其他 python 线程可以得到执行。 推论使用 GPU 计算是同样的道理python 程序将计算丢给 GPU 后等待计算结果当前线程阻塞释放 GIL 锁其他 python 线程得以执行从而提高计算效率。
3. torch.cuda.Stream(devicedevice) torch.cuda.Stream 是 PyTorch 库中的一个类用于管理 GPU 上的异步操作。
在 GPU 上执行计算任务时通常可以使用多个流stream来并行执行不同的操作。每个流都有自己的命令队列可以独立地执行操作从而提高计算效率。torch.cuda.Stream 就是用来创建和管理这些流的。
使用 torch.cuda.Stream可以将一系列 GPU 操作放入一个流中并且可以通过调用流的 synchronize() 方法来等待流中所有操作完成。这对于需要处理多个 GPU 操作的情况非常有用。
以下是一个使用 torch.cuda.Stream 的示例代码
import torchstream torch.cuda.Stream() # 创建流对象with torch.cuda.stream(stream): # 在流中执行操作# 执行GPU操作# ...stream.synchronize() # 等待流中操作完成在上述示例中我们首先创建了一个 torch.cuda.Stream 对象 stream。然后我们使用 with 语句块将一些 GPU 操作放入流中执行。最后我们调用 stream.synchronize() 来等待流中的操作完成。
通过使用 torch.cuda.Stream我们可以更灵活地控制 GPU 操作的执行顺序和并行性以优化计算性能。 以上是 GPT3.5 给出的关于 torch.cuda.Stream 的简介。另外还可参考教程《如何在 Pytorch 中使用 CUDA 流(CUDA stream)》 讲的不错。我现在将其搬过来 什么是 CUDA 流(CUDA stream)
CUDA 流是一种在 GPU 上并行执行操作的机制。在默认情况下PyTorch 会在默认的流上执行所有的操作即在主流(default stream)上进行。但是当我们有一些可以并行执行的操作时通过将这些操作分配到不同的流上我们可以在 GPU 上更有效地利用计算资源。 第一句就强调并行执行操作的机制。 如何创建 CUDA 流
可以通过 torch.cuda.Stream() 函数来创建 CUDA 流
stream torch.cuda.Stream()使用 torch.cuda.Stream() 函数创建了一个名为 stream 的 CUDA 流。
如何使用 CUDA 流
通过 with 上下文管理操作并使用 stream.synchronize() 方法等待操作完成
import torch# 创建两个CUDA流
stream1 torch.cuda.Stream()
stream2 torch.cuda.Stream()# 分别将操作记录到两个流上
with torch.cuda.stream(stream1):# 执行操作1# ...with torch.cuda.stream(stream2):# 执行操作2# ...# 等待两个流上的操作完成
torch.cuda.synchronize(stream1)
torch.cuda.synchronize(stream2)我们创建了两个 CUDA 流 stream1 和 stream2。然后在两个流上分别记录操作并使用torch.cuda.synchronize() 方法等待这些操作完成。
如何利用 CUDA 流提高性能
一种常见的用法是将计算和数据传输操作分配到不同的流上从而实现计算和数据传输的并行执行。 3.1 对 PreDataLoader 中 CUDA 流的解释
with torch.cuda.stream(self.__load_stream):return sample.to(self.__device, non_blockingTrue)这一句 sample.to(self.__device, non_blockingTrue) 算是数据传输吧它处在一个数据预加载线程中想要与模型计算并行。那么按照上面的教程一个 CUDA 流中的操作是顺序执行的模型计算使用的是默认流(default stream)平时我们的代码 sample.to(device) 也使用了默认流这意味着数据的传输和模型计算是串行的。
所以PreDataLoader 中定义了一个新的 CUDA 流把 sample.to(self.__device, non_blockingTrue) 放入这个新 CUDA 流就可以和模型计算并行了。
4. property
property 是一个装饰器用于将类的方法转换为属性。通过使用 property您可以定义一个方法并将其作为实例的属性来访问而不需要使用函数调用的语法。
下面是一个示例说明如何使用 property 装饰器
class Circle:def __init__(self, radius):self.radius radiuspropertydef diameter(self):return 2 * self.radiusdiameter.setterdef diameter(self, value):self.radius value / 2# 创建 Circle 对象
circle Circle(5)# 访问 diameter 属性实际上是调用了 diameter 方法
print(circle.diameter) # 输出10# 设置 diameter 属性实际上是调用了 diameter.setter 方法
circle.diameter 14
print(circle.radius) # 输出7在上面的示例中Circle 类定义了一个 radius 实例变量和一个 diameter 方法被 property 装饰。当我们像访问属性一样访问 circle.diameter 时实际上是调用了 diameter 方法并返回其结果。
此外我们还可以使用 property 创建一个 setter 方法用于设置属性的值。在示例中diameter 属性的 setter 方法名为 diameter.setter它接受一个参数 value我们可以在 setter 方法中对 self.radius 进行更新。
总结使用 property 装饰器可以将一个方法定义为属性并提供更加方便和易读的方式来访问和设置属性。
既然担心 Python 线程的 GIL 问题为何不直接用多进程
答多进程没那么好用进程是重量级的有独立的内存管理共享内存是比较麻烦的
import multiprocessingclass Int(object):def __init__(self, i):self.__int idef add(self):self.__int 1def print(self):print(self.__int)def add(integer: Int):integer.add()integer.print()print(id(integer))if __name__ __main__:a_integer Int(0)p1 multiprocessing.Process(targetadd, args(a_integer,))p2 multiprocessing.Process(targetadd, args(a_integer,))p3 multiprocessing.Process(targetadd, args(a_integer,))p1.start()p2.start()p3.start()add(a_integer)a_integer.print()输出
1
1839132811024
1
1
2091010788944
1
1721319788112
1
2095109213776可见各进程操作的 Int 对象不是同一个即创建子进程时传入参数会是参数的一份拷贝。
如果将 multiprocessing.Process 换成 threading.Thread则输出
1
2691328945888
2
2691328945888
3
2691328945888
4
2691328945888
4创建线程时传入参数会是参数对象本身。
此外子进程不能访问主线程的变量如果
def add(integer: Int):integer.add()integer.print()b_integer.add() # 加一个主进程中的变量print(id(integer))则会报错。而线程则可以。
可以看到PreDataLoader 中的线程是访问了主程序的数据了的如果用进程一是编程比较麻烦二是效率也未必就高。