Python 相关指令
yield 的一些特殊用法
- https://blog.csdn.net/mieleizhi0522/article/details/82142856
- 带 yield 的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是 next 函数,next 就相当于“下一步”生成哪个数,这一次的 next 开始的地方是接着上一次的 next 停止的地方执行的,所以调用 next 的时候,生成器并不会从 foo 函数的开始执行,只是接着上一步停止的地方开始,然后遇到 yield 后,return 出要生成的数,此步就结束。
def foo():
print("starting...")
while True:
res = yield 4
print("res:",res)
g = foo()
print(next(g))
print("*"*20)
print(next(g))
# starting...
# 4
# ********************
# res: None (这里是因为最后一步为yield 4, 返回了之后下次进入函数就没有右值了!)
# 4
- 这里有一个 B 站的专栏学习,可以参考:
python import 踩坑指南
- 参考文章:https://blog.csdn.net/karmayh/article/details/108697835
-
问题就来自于 sys.path 的路径设置问题
- 详细的教程如下: http://www.coolpython.net/python_senior/module_concept/modify-sys-path.html
- sys.path 是一个列表,存放的是 python 搜索模块时可以搜索的路径,启动 python 脚本时,会将执行当前命令所在的目录添加到这个列表中,而且是在列表的最前面,正是因为这个操作,你才能在自己的项目里引用自己编写的模块,当模块名称与第三方模块或系统模块冲突时,优先引用项目里的模块。通常,sys.python 里的内容如下所示:
- 执行打印 sys.path 的操作结果如下:
>>> import sys
>>> sys.path
['', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python36.zip', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6/lib-dynload', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6/site-packages']
虚拟环境与包管理器的相关问题
Conda 安装与设置相关操作
wget https://mirrors.bfsu.edu.cn/anaconda/archive/[Anaconda3-2023.07-2-Linux-x86_64.sh](https://repo.anaconda.com/archive/Anaconda3-2023.07-2-Linux-x86_64.sh) --no-check-certificate
bash Anaconda3-2023.07-2-Linux-x86_64.sh
source ~/.bashrc
Conda 和 Pip 源 - 建议直接安装 conda 后直接修改
vim ~/.condarc
channels:
- defaults
show_channel_urls: true
default_channels:
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch-lts: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
deepmodeling: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/
pip 包管理器的完整执行流程
Pipenv
- 有一些仓库会出现 Pipfile 和 Pipfile.lock,故使用这个会比较好一些?
- 参考文献:Pipenv 的基本使用 — pipenv 2020.8.13.dev0 文档 (pipenv-zh.readthedocs.io)
-
但是突然发现这个 Pipfile 根本没有任何的进度条提示,非常不方便,所以想转换为 requirements.txt:
- pipenv requirements > requirements.txt && pipenv requirements –dev-only > requirements-dev.txt
pip 生成 requirements.txt
pip freeze > requirements.txt
从源码在 conda 虚拟环境中安装一个 pip 包的流程
python -m pip install -e .
是一个用于安装当前 Python 项目的命令,其中 -e
表示使用可编辑模式(editable mode)。这种模式下,安装的包实际上是在当前项目的源代码目录上创建的符号链接,而不是将包复制到 Python 安装目录。
以下是这个命令的执行流程:
-
python -m pip
:使用 Python 解释器的pip
模块进行包管理。 -
install
:指示pip
安装包。 -
-e .
:使用可编辑模式安装当前目录的包。-
-e
或--editable
表示使用编辑模式。 -
.
表示当前目录,即安装当前项目。
-
在执行这个命令时,发生的一些关键步骤包括:
-
pip
会在当前目录查找setup.py
文件,该文件描述了项目的元信息和依赖关系。 -
pip
将项目安装到一个虚拟环境中(如果存在),或者全局 Python 环境中。 - 使用可编辑模式安装时,
pip
不会将包复制到 Python 安装目录。相反,它会在当前项目的虚拟环境或全局环境中创建一个符号链接,指向项目的源代码目录。这样,对项目源代码的修改会立即反映在安装的包中,无需重新安装。
这种可编辑模式通常用于开发过程中,允许开发人员实时调试和修改项目代码,而无需反复安装包。在项目开发过程中,经常会使用这种模式来快速测试和验证代码修改。
import os
from pathlib import Path
from setuptools import find_packages
from skbuild import setup
import legate.install_info as lg_install_info
legate_dir = Path(lg_install_info.libpath).parent.as_posix()
cmake_flags = [
f"-Dlegate_core_ROOT:STRING={legate_dir}",
]
env_cmake_args = os.environ.get("CMAKE_ARGS")
if env_cmake_args is not None:
cmake_flags.append(env_cmake_args)
os.environ["CMAKE_ARGS"] = " ".join(cmake_flags)
setup(
name="Legate Hello",
version="0.1",
description="A Hello World for Legate",
author="NVIDIA Corporation",
license="Apache 2.0",
classifiers=[
"Intended Audience :: Developers",
"Topic :: Database",
"Topic :: Scientific/Engineering",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
packages=find_packages(
where=".",
include=["hello", "hello.*"],
),
include_package_data=True,
zip_safe=False,
)
伟大的 mamba
conda install mamba -c conda-forge
conda 删除 env
Vscode 的相关操作
搭建 conda 环境、搭建 pylint
远程连接 notebook
- 请直接用下面连接的方法即可完成
远程安装 vscode 插件非常慢的解决方案
- 参考文献:vscode 下链接远程服务器安装插件失败、速度慢等解决方法 (361shipin.com)
- 从 vsix 安装,从官网上下载,历史版本比较难下载到,需要注意。
Debug 问题
Invalid value encountered in long_scalars
- [Python]记一次 Debug 经历
- 自己也遇到了这个问题,抛出错误接不住
Vscode 的 python 插件不支持 python 3.6 版本的问题
- 参考文献:【vscode】python F5 调试 没反应 一闪而过 【已解决】-IT Blog (itcn.blog)
- 最近(2022 年 7 月 11 日)在使用 vscode 调试 python 时出现按 F5 无法进入调试的情况,经过一番查找,根本原因是 vscode 的 python 扩展插件的最新版本(2022.10.0)不再支持 python3.6 了
VScode 调试带参数
- 直接创建.vscode/launch.json 即可
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"args": [
"--cluster_job_log", "trace/cluster_job_log",
"--analyze_trace",
]
},
]
}
条件断点
debug 中出现的 open()相对路径找不到的解决
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true,
"args": [
],
// "env": {"PYTHONPATH": "${workspaceFolder}/yafs${pathSeparator}${env:PYTHONPATH}"}
"cwd": "${fileDirname}",
},
]
}
##
Python 原生语法
装饰器学习
-
建议通过廖雪峰学习
-
@register_model 的介绍
Python 遍历删除
- 参考文章:https://segmentfault.com/a/1190000007214571
-
具体的实现方式推荐:
- 列表解析
lst = [x for x in lst if x != 0]
print lst
with 关键字
-
这个关键字一直在用,但是之前没有很清晰地去理解含义
-
其实就是一个 Try-catch 结构的修改版
- 支持场景:1、文件操作。2、进程线程之间互斥对象。3、支持上下文其他对象。
-
原理:
- with 语句实质是上下文管理。
- 1、上下文管理协议。包含方法__enter__() 和 exit(),支持该协议对象要实现这两个方法。
- 2、上下文管理器,定义执行 with 语句时要建立的运行时上下文,负责执行 with 语句块上下文中的进入与退出操作。
- 3、进入上下文的时候执行__enter__方法,如果设置 as var 语句,var 变量接受__enter__()方法返回值。
- 4、如果运行时发生了异常,就退出上下文管理器。调用管理器__exit__方法
*号 与 **号
- 后面会增加和 zip 绑定在一起的操作,该操作经常出现在很多源码中。
- 用于乘法计算(*) 和幂乘(**)
-
用于元组打包与解包
- 打包:将传递给函数的任意多个(也可以是 0 个)非关键字参数/关键字参数打包成一个元组(*号)/字典(**号)【元组只能接收非关键字参数,字典只能接收关键字参数】
- 解包:就是传递给函数的列表、元组、字典拆分成独立的多个元素然后赋值给函数中参变量(包括普通的位置参数、关键字参数、非关键字参数)。【解出来传给函数的只有键值(.key)】
zip()和 zip(*)
- 参考文献 1: https://blog.nowcoder.net/n/f1caca2e5e2a4943b1697133f084bf6b
- zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。
- 如果各个迭代器的元素个数不一致,则返回列表长度与最短的对象相同。利用 * 号操作符,可以将元组解压为列表。zip(*)解压函数,将对象中的每一个元素/元祖/字符串的相同位置的子元素组合成一个新的元祖,并将所有组成的元祖以列表的形式进行展示。
>>> a = [1,2,3]
>>> b = [4,5,6]
>>> c = [4,5,6,7,8]
>>> zipped = zip(a,b)
# 打包为元组的列表 [(1, 4), (2, 5), (3, 6)]
>>> zip(a,c)
# 元素个数与最短的列表一致 [(1, 4), (2, 5), (3, 6)]
>>> zip(*zipped)
# 与 zip 相反,可理解为解压,返回二维矩阵式 [(1, 2, 3), (4, 5, 6)]
l = ['flower', 'flow', 'flight']
print(zip(l))
print(zip(*l))
print(list(zip(l))) #转换为list显示
print(list(zip(*l)))
print(tuple(zip(l)))
print(tuple(zip(*l)))
# <zip object at 0x0000018B0BBD4F08>
# <zip object at 0x0000018B0BBD4F08>
# [('flower',), ('flow',), ('flight',)]
# [('f', 'f', 'f'), ('l', 'l', 'l'), ('o', 'o', 'i'), ('w', 'w', 'g')]
# (('flower',), ('flow',), ('flight',))
# (('f', 'f', 'f'), ('l', 'l', 'l'), ('o', 'o', 'i'), ('w', 'w', 'g'))
l = ['flower', 'flow', 'flight']
m = ['a', 'b', 'c']
k = zip(l, m)
print k
print zip(*k)
# [('flower', 'a'), ('flow', 'b'), ('flight', 'c')]
# [('flower', 'flow', 'flight'), ('a', 'b', 'c')]
- 参考文献 2:https://developer.aliyun.com/article/373387
- 应用场景 1: 矩阵转置
- zip(*a):将 list 看成 tuple 解压,恰好得到我们“行列互换”的效果;
- map(list,zip(*a)):再通过对每个元素应用 list()函数,将 tuple 转换为 list;
- zip 函数接受任意多个序列作为参数,将所有序列按相同的索引组合成一个元素是各个序列合并成的 tuple 的新序列,新的序列的长度以参数中最短的序列为准。
- 另外(*)操作符与 zip 函数配合可以实现与 zip 相反的功能,即将合并的序列拆成多个 tuple。
- 应用场景 1: 矩阵转置
>>> a = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> zip(*a)
[(1, 4, 7), (2, 5, 8), (3, 6, 9)]
>>> map(list,zip(*a))
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]
###
List
获取二维及以上维度的 List 的最大最小值语法糖
print(min(min(row) for row in simUsers), max(max(row) for row in simUsers))
Map
获取 map 中 value 为特定值的 key
Tuple
nametuple
- 参考文献:
- namedtuple 是继承自 tuple 的子类。namedtuple 创建一个和 tuple 类似的对象,而且对象拥有可访问的属性。
from collections import namedtuple
# 定义一个namedtuple类型User,并包含name,sex和age属性。
User = namedtuple('User', ['name', 'sex', 'age'])
# 创建一个User对象
user = User(name='kongxx', sex='male', age=21)
# 也可以通过一个list来创建一个User对象,这里注意需要使用"_make"方法
user = User._make(['kongxx', 'male', 21])
print user
# User(name='user1', sex='male', age=21)
# 获取用户的属性
print user.name
print user.sex
print user.age
# 修改对象属性,注意要使用"_replace"方法
user = user._replace(age=22)
print user
# User(name='user1', sex='male', age=21)
# 将User对象转换成字典,注意要使用"_asdict"
print user._asdict()
# OrderedDict([('name', 'kongxx'), ('sex', 'male'), ('age', 22)])
JSON 操作
JSON 格式转化
json 读取和写入
Collection 语法
获取 List 的分布/频数
- 使用:
from collections import Counter
origin_label_distribution = Counter(train_all_dataset.get_subset_targets())
print(origin_label_distribution)
global()与 local()
- globals()函数和 locals()函数都是 python 的内置函数,他们的作用分别是以 dict 数据类型返回 python 程序的全局变量和某个局部的变量。globals()可读可写,而 locals()是只读!
>>> globals()
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': , '__spec__': None, '__annotations__': {}, '__builtins__': }
>>>
>>> a = 1
>>> b = 2
>>> import os
>>> import sys
>>> globals()
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': , '__spec__': None, '__annotations__': {}, '__builtins__': , 'a': 1, 'b': 2, 'os': , 'sys': }
>>>
文件操作
判断路径存在与创建新文件夹
os.path.exists(path)
列出文件夹目录下的所有文件名
Sorted
-
key 传参的用法
-
reverse 的用法
- True: 从大到小排列
-
将 index 和 value 一起排序
# 按列表a中元素的值进行排序,并返回元素对应索引序列
a = [1, 3, 4, 5, 2, 7, 9]
print('a:', a)
sorted_id = sorted(range(len(a)), key=lambda k: a[k], reverse=True)
print('元素索引序列:', sorted_id)
# 结果
# a: [1, 3, 4, 5, 2, 7, 9]
# 元素索引序列: [6, 5, 3, 2, 1, 4, 0]
datetime 语法中的问题
-
匹配’2021-05-30T21:17:08+00:00’现在不可以了,但是可以匹配’2021-05-30T21:17:08+0000’,因此需要删除最后一个’:’
- str_new = str[::-1].replace(‘:’,’‘,1)[::-1]
-
参考文献:datetime - Python: time data does not match format - Stack Overflow
多进程与多线程相关主题
Multiprocess
multiprocessing.SimpleQueue
-
multiprocessing.SimpleQueue
主要用于多进程编程中,允许多个进程之间安全地传递数据。它是一种轻量级的队列实现,适用于简单的生产者-消费者模型,其中一个或多个进程生产数据,而其他进程消费数据。
import multiprocessing
def producer(queue, data):
for item in data:
queue.put(item)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumer got: {item}")
if __name__ == "__main__":
data_to_process = [1, 2, 3, 4, 5]
# 创建一个 SimpleQueue
queue = multiprocessing.SimpleQueue()
# 创建生产者和消费者进程
producer_process = multiprocessing.Process(target=producer, args=(queue, data_to_process))
consumer_process = multiprocessing.Process(target=consumer, args=(queue))
producer_process.start()
consumer_process.start()
producer_process.join()
# 发送一个终止信号给消费者
queue.put(None)
consumer_process.join()
- 缺点:
- 功能有限:相对于其他队列实现(如
multiprocessing.Queue
),multiprocessing.SimpleQueue
的功能有限。它不支持进程等待或超时,不允许设置队列的最大大小等高级特性。 - 仅适用于简单场景:由于其功能有限,
multiprocessing.SimpleQueue
适用于相对简单的生产者-消费者场景,对于复杂的需求可能不够灵活。 - 不支持阻塞操作:它没有提供阻塞式的
get
操作,因此在没有数据的情况下,调用get
会立即返回。
- 功能有限:相对于其他队列实现(如
multiprocessing.JoinableQueue
multiprocess.JoinableQueue
是 Python multiprocessing
模块中的一个队列类型,它的主要作用是在多进程编程中实现进程之间的通信和同步。与普通队列不同,JoinableQueue
具有一些额外的特性,最重要的是可以跟踪队列中的任务并知道何时队列为空,以及何时可以安全地终止进程。
以下是 multiprocess.JoinableQueue
的主要作用和示例:
- 进程间通信:
JoinableQueue
允许多个进程之间安全地传递数据。一个进程可以将数据放入队列,而另一个进程可以从队列中获取数据,从而实现进程之间的通信。
from multiprocessing import Process, JoinableQueue
def worker(q):
data = q.get()
print(f"Worker got data: {data}")
if __name__ == "__main__":
queue = JoinableQueue()
p = Process(target=worker, args=(queue,))
p.start()
queue.put("Hello from main process")
queue.join()
p.join()
在上面的示例中,主进程向队列中放入了数据,并调用 queue.join()
来等待队列中的任务全部完成,以确保工作进程执行完毕。
- 跟踪任务完成:
JoinableQueue
允许主进程跟踪子进程完成的任务数量。当子进程完成任务时,它会通过调用queue.task_done()
来通知队列,从而使主进程知道何时所有任务都已完成。
from multiprocessing import Process, JoinableQueue
def worker(q):
data = q.get()
print(f"Worker got data: {data}")
q.task_done()
if __name__ == "__main__":
queue = JoinableQueue()
p = Process(target=worker, args=(queue,))
p.start()
queue.put("Hello from main process")
queue.join()
p.join()
在这个示例中,工作进程在完成任务后调用了 q.task_done()
,主进程通过 q.join()
来等待任务完成。
总之,JoinableQueue
是多进程编程中用于进程间通信和同步的一种重要工具,它允许主进程与子进程之间安全地传递数据和等待任务完成,从而更好地管理和控制多进程应用程序。
自定义 multi parallel queue
-
解决缓冲区大小限制问题:
- 在多进程编程中,使用队列来在进程之间传递数据非常常见。但是,某些队列实现(例如
multiprocessing.SimpleQueue
)使用系统管道(pipe),而这些管道的缓冲区大小是有限的,通常很小。这就意味着当队列中的数据达到管道缓冲区的上限时,进程可能会阻塞等待队列中的空间。 -
ParallelQueue
的主要目的是通过使用多个队列,以轮询的方式从中选择一个队列来解决这个问题。每个队列都有自己的缓冲区,因此总体上可以容纳更多的数据。这可以帮助避免进程之间的阻塞,提高并行性能。
- 在多进程编程中,使用队列来在进程之间传递数据非常常见。但是,某些队列实现(例如
-
轮询多个队列:
-
ParallelQueue
维护一个队列列表_queues
,并在每次调用put
或get
方法时,根据轮询的方式选择一个队列来执行操作。这确保了每个队列都有机会被使用,从而均衡了数据分布。
-
-
提高管道缓冲区大小:
- 在初始化过程中,
ParallelQueue
使用fcntl
模块来增加每个队列的管道缓冲区大小,将其设置为最大的管道大小(PIPE_MAX_SIZE
)。这可以帮助提高每个队列的容量,从而减少阻塞的可能性。
- 在初始化过程中,
-
可关闭:
-
ParallelQueue
提供了close()
方法,可以用于关闭所有包含的队列。
-
import fcntl
F_SETPIPE_SZ = 1031
F_GETPIPE_SZ = 1032
PIPE_MAX_SIZE = 1048576
class ParallelQueue(object):
'''
ParallelQueue is a wrapper around multiple queues which are scheduled in
round robin fashion. This is done to overcome the issue of buffer size limit
per queue. For example, mp.SimpleQueue uses system pipe and it's not straighforward
to increase the Pipe's buffer size and also the max size limit.
Therefore, using multiple parallel queues might overcome the issue of size limit.
'''
def __init__(self, QueueType, count=1):
self._cur_queue_idx = 0
self._total_queues = count
self._queues = [QueueType() for _ in range(count)]
self._init_queues()
def _init_queues(self):
'''
We increase the default pipe size to max pipe size
'''
for queue in self._queues:
r_fd = queue._reader.fileno()
w_fd = queue._writer.fileno()
fcntl.fcntl(r_fd, F_SETPIPE_SZ, PIPE_MAX_SIZE)
fcntl.fcntl(w_fd, F_SETPIPE_SZ, PIPE_MAX_SIZE)
def close(self):
[queue.close() for queue in self._queues]
def put(self, msg): # 每次都会选择当前队列的下一个
idx = self._cur_queue_idx
self._cur_queue_idx = (self._cur_queue_idx + 1) % self._total_queues
return self._queues[idx].put(msg)
def empty(self):
idx = self._cur_queue_idx
return self._queues[idx].empty()
def get(self):
idx = self._cur_queue_idx
self._cur_queue_idx = (self._cur_queue_idx + 1) % self._total_queues
return self._queues[idx].get()
multiprocess 中多种 queue 的选型问题
-
multiprocessing.SimpleQueue:
- 功能简单:
SimpleQueue
是最简单的队列实现,它提供了基本的队列操作,包括put
和get
。 - 无法阻塞:它不支持阻塞式的
get
操作,因此在没有数据的情况下,调用get
会立即返回。 - 不支持超时:与阻塞操作相关,它也不支持设置超时等待。
- 适用范围:适用于简单的生产者-消费者模型,对于复杂的场景功能有限。
- 功能简单:
-
multiprocessing.Queue:
- 功能丰富:
Queue
提供了比SimpleQueue
更丰富的功能,包括阻塞式的get
操作,支持设置队列的最大大小,允许设置超时等待。 - 阻塞操作:可以使用
get
方法进行阻塞等待,直到队列中有数据可用。 - 最大大小:可以通过参数设置队列的最大大小,以控制队列中的元素数量。
- 适用范围:适用于大多数多进程通信需求,功能相对完备。
- 功能丰富:
-
multiprocessing.JoinableQueue:
- 功能更强大:
JoinableQueue
是Queue
的一个子类,除了Queue
的功能外,还具备了一些额外的特性。 - 支持任务追踪:
JoinableQueue
允许跟踪进程的任务完成情况,通过task_done
方法标记任务已完成,并通过join
方法等待所有任务完成。 - 适用范围:适用于需要追踪任务完成状态的场景,例如,当所有任务完成后,执行某个特定操作。
- 功能更强大:
总之,选择合适的队列实现取决于你的多进程通信需求。如果只需要基本的队列操作,可以使用 SimpleQueue
。如果需要更多功能,如阻塞操作、设置队列大小等,可以使用 Queue
。如果需要追踪任务完成状态,可以使用 JoinableQueue
。根据具体场景的要求,选择合适的队列可以提高多进程编程的效率和可维护性。
multiprocess.event 的作用和示例
import multiprocessing,time,random
def restaurant_handle(event): #餐厅的处理进程
print("1、【餐厅】为食客安排座位,并在一旁等待食客点餐。。。")
time.sleep(random.randint(1,3))
event.set()#解除阻塞状态
event.clear()#清除已有的状态
event.wait()#等待食客后续处理
print("3、【餐厅】厨师接到菜单,开始烹饪美食。。。")
time.sleep(random.randint(1,3))
event.set() #解除阻塞状态
event.clear() # 之前的状态清空
event.wait()
print("5、【餐厅】收银台算正在算账。。。")
time.sleep(random.randint(1,3))
event.set() # 解除阻塞状态
event.clear() # 之前的状态清空
event.wait()
print("7、【餐厅】收银台收到钱。。。")
time.sleep(random.randint(1,3))
event.set()
event.clear()
event.wait()
def diners_hangle(event):#食客的处理进程
event.wait() #等待之前的第一步完成 两个进程所以先阻塞,让另一个执行
print("2、【食客】食客看完菜单,选好了自己心仪的美食。。。")
time.sleep(random.randint(1,3))
event.set() #解除阻塞状态
event.clear()#之前的状态清空
event.wait()#继续等待后续的处理步骤
print("4、【食客】享用丰盛的美食。。。")
time.sleep(random.randint(1,3))
event.set()
event.clear()
event.wait()
print("6、【食客】食客吃晚餐走向收银台付款。。。")
time.sleep(random.randint(1,3))
event.set()
event.clear()
event.wait()
print("8、【食客】食客离开")
event.set()
def main():
event = multiprocessing.Event()#定义一个event同步处理
restaurant_process = multiprocessing.Process(target=restaurant_handle,args=(event,),name="餐厅服务进程")
diners_process = multiprocessing.Process(target=diners_hangle,args=(event,),name="食客进程")
restaurant_process.start()
diners_process.start()
if __name__ == '__main__':
main()
Thread
RPC 专题
gRPC
- 为 gRPC 注册服务、函数和处理方式
-
grpc.stream_stream_rpc_method_handler
:- 用于定义双向流式 RPC 方法的处理器。
- 双向流式 RPC 是一种 RPC 类型,它允许客户端和服务器之间的双向通信,可以发送和接收多个消息。
- 在 gRPC 中,这种方法的处理器需要指定请求消息的反序列化器(
request_deserializer
)和响应消息的序列化器(response_serializer
)。
-
grpc.unary_stream_rpc_method_handler
:- 用于定义客户端流式 RPC 方法的处理器。
- 客户端流式 RPC 允许客户端向服务器流式发送多个消息,但服务器只能返回单个响应消息。
- 这种方法的处理器需要指定请求消息的反序列化器(
request_deserializer
)和响应消息的序列化器(response_serializer
)。
-
grpc.unary_unary_rpc_method_handler
:- 用于定义一元 RPC 方法的处理器。
- 一元 RPC 是最简单的 RPC 类型,客户端发送一个请求消息,服务器处理后返回一个响应消息。
- 这种方法的处理器需要指定请求消息的反序列化器(
request_deserializer
)和响应消息的序列化器(response_serializer
)。
-
def add_ModelServingServicer_to_server(servicer, server):
rpc_method_handlers = {
'predict': grpc.stream_stream_rpc_method_handler(
servicer.predict,
request_deserializer=predict__pb2.PredictRequest.FromString,
response_serializer=predict__pb2.PredictResponse.SerializeToString,
),
'predict_ack': grpc.unary_stream_rpc_method_handler(
servicer.predict_ack,
request_deserializer=predict__pb2.ClientToken.FromString,
response_serializer=predict__pb2.PredictRequestAck.SerializeToString,
),
'register': grpc.unary_unary_rpc_method_handler(
servicer.register,
request_deserializer=predict__pb2.RegisterRequest.FromString,
response_serializer=predict__pb2.RegisterResponse.SerializeToString,
),
'unregister': grpc.unary_unary_rpc_method_handler(
servicer.unregister,
request_deserializer=predict__pb2.ClientToken.FromString,
response_serializer=predict__pb2.Empty.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'model_serving.ModelServing', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
argparse
–store_true
- 默认不传的时候就是设置为 False
-
True/False 参数无法设置的问题
- 参考文献:Cannot pass BooleanParameter in as argument from commandline · Issue #193 · spotify/luigi (github.com)
- 解决方案:直接避免 True 和 False 的出现即可
打印所有的传入参数
-
Numpy
初始化一个特定的 Tensor
Argwhere
- 返回:非 0 的数组元素的索引,其中 a 是待索引数组的条件
import numpy as np
x = np.arange(6).reshape(2,3)
print(x)
# x=[[0 1 2]
# [3 4 5]]
y = np.argwhere(x>1) #x所有元素中大于1的元素的索引,分别是第0行第2列...以此类推
print(y)
# y=[[0 2]
# [1 0]
# [1 1]
# [1 2]]
Where
- 满足条件(condition),输出 x,不满足输出 y。
>>> aa = np.arange(10)
>>> np.where(aa,1,-1)
array([-1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) # 0为False,所以第一个输出-1
>>> np.where(aa > 5,1,-1)
array([-1, -1, -1, -1, -1, -1, 1, 1, 1, 1])
>>> np.where([[True,False], [True,True]], # 官网上的例子
[[1,2], [3,4]],
[[9,8], [7,6]])
array([[1, 8],
[3, 4]])
求和
拼接 Tensor
- 参考文档:https://zhuanlan.zhihu.com/p/130260017
- 列拼接:column_stack((a, b))
对应 Item 操作
- 就是我们简单的单元操作符: + - * /
数组排序与翻转数组
- 排序后得到 index 的数组:
#!/usr/bin/python3
#code-python(3.6)
import numpy as np
a = np.array([4,2,5,7])
b = a.argsort() #将数组升序排列,但不改变数组,且返回对应的索引
print(a) #[4 2 5 7],其索引是[0,1,2,3]
print(b) #升序后的索引是[1 0 2 3],对应元素[2,4,5,7]
#!/usr/bin/python3
#code-python(3.6)
import numpy as np
a = np.array([4,2,5,7])
b = a.argsort()[::-1] #将数组降序排列,但不改变数组,且返回对应的索引
print(a) #[4 2 5 7],其索引是[0,1,2,3]
print(b) #降序后的索引是[3 2 0 1],对应元素[7,5,4,2]
- 翻转数组:
[::-1]
data = data[data[:,2].argsort()] <em>#按照第3列对行排序</em>
- 按行排序
data = data[:,data[2].argsort()]
- 获取数组的排序 index
计数操作
- 参考文献:https://www.itranslater.com/qa/details/2119130387891880960
- 用于计算某个值在 numpy 的 narray 中出现的次数
>>> import numpy as np
>>> y = np.array([1, 2, 2, 2, 2, 0, 2, 3, 3, 3, 0, 0, 2, 2, 0])
>>> np.count_nonzero(y == 1)
1
>>> np.count_nonzero(y == 2)
7
>>> np.count_nonzero(y == 3)
3
升高维度和降低维度
-
增加一列
保存/读取 csv 文件
提取子矩阵
x=np.arange(16).reshape((4,4))
x[range(1,3), :][:,range(1,3)]
获取唯一的元素
Pandas 一些语法
获取子表格
- 参考文档 1【一个报错记录】:https://stackoverflow.com/questions/36921951/truth-value-of-a-series-is-ambiguous-use-a-empty-a-bool-a-item-a-any-o
# 训练集
train_df = UIT_header[UIT_header['day'] <= 18]
print(len(train_df))
val_df = UIT_header[(UIT_header['day'] > 18) & (UIT_header['day'] <= 24)]
print(len(val_df))
test_df = UIT_header[UIT_header['day'] > 24]
print(len(test_df))
- 根据一堆 index 获取子表格的方法:每次遍历得到的 c 都是一个混乱的 index 组合
for ind, c in enumerate(client_idcs):
sub_labels = labels[c]
sub_test_df = test_df.iloc[c, :]
all_split_df.append(sub_test_df)
行遍历与行遍历赋值
- 用于直接对 DataFrame 进行遍历的方法,目前只能用于 get【参考文档:https://zhuanlan.zhihu.com/p/339744795 】
for idx,data in df.iterrows():
print("[{}]: {}".format(idx,data))
- 行遍历赋值
import pandas as pd
# 创建一个示例 DataFrame
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
# 定义一个函数,在每行 DataFrame 中添加一个新的 'C' 字段
def add_new_column(row):
row['C'] = row['A'] + row['B']
return row
# 使用 apply 方法对每行 DataFrame 调用 add_new_column 函数
df = df.apply(add_new_column, axis=1)
# 输出添加新字段后的 DataFrame
print(df)
nan 元素处理
- 判断某个元素是否为 Nan
pd.isnull("A")
> False
pd.isnull(3)
> False
pd.isnull(np.nan)
> True
pd.isnull(None)
> True
- fillna 空值处理
#fill all Nan value with zero
df = df.fillna(0)
df = df.fillna(method='ffill', axis=0) # 用上一行的值替换空值
df = df.fillna(method='ffill', axis=1) # 用上一列的值替换空值
拼表操作(join/merge)
- 参考文献:https://blog.csdn.net/brucewong0516/article/details/82707492
-
主要涉及几个操作
- on:拼接列
- Suffixes:若出现相同的列时,如何重新命名
- How:如何拼接的问题
- Left: 只保留左表的完整数据
- inner:取交集:比如 left:[‘A’,‘B’,‘C’];right[’’A,‘C’,‘D’];inner 取交集的话,left 中出现的 A 会和 right 中出现的买一个 A 进行匹配拼接,如果没有是 B,在 right 中没有匹配到,则会丢失。
- outer:取并集。比如 left:[‘A’,‘B’,‘C’];right[’’A,‘C’,‘D’];’outer’取并集,出现的 A 会进行一一匹配,没有同时出现的会将缺失的部分添加缺失值。
# bchain_df大,gold_df小,则会按大表来,没有的内容填充空值
all_df = bchain_df.merge(gold_df, how='outer')
loc 操作 / iloc 操作
- loc 操作参考文献:https://blog.csdn.net/brucewong0516/article/details/82494090
- 根据标签(label)索引
dfa.loc[dfa.start_time==0, 'start_time'] = np.nan # 取'start_time'中为0的项目,赋值为np.nan
stock.loc[0:5,['股票代码','股票名称','当前价','涨跌额','涨跌幅','年初至今','成交量','成交额']]
- iloc 参考文献:https://zhuanlan.zhihu.com/p/129898162
- 根据行标签的位置索引。iloc 就是 integer loc 。可以理解为是在 loc 的基础上,用整数索引。即 iloc 是基于位置索引的,也就是我们常说的几行几列。
stock.iloc[0:6,0:8]
保存文件
保存为 CSV 文件[效率很慢]
- 参考文献:https://blog.csdn.net/tz_zs/article/details/81137998
- 参考文献 2:https://blog.csdn.net/qq_38268886/article/details/80744721
- 正常来说,应该是会自动填充表头的。
df.to_csv("tzzs_data2.csv", index_label="index_label")
- 直接将 numpy 保存为 csv
for index in trange(len(UIT_header)):
i_size[UIT_header.loc[index, 'i']] = UIT_header.loc[index, 'video_size']
print("get content size finished!")
columns = ['content_size']
i_size_pd = pd.DataFrame(columns=columns, data=i_size)
i_size_pd.to_csv('./dataset/i_size_pd.csv',encoding='utf-8')
- 不保存那个新的 Unnamed: 0
直接保存为 npy 文件
保存为特殊的二进制格式(pkl) [效率很快]
先随便用 numpy 生成一个二位数组 a = np.random.rand(500000, 50)
,查看大小约 200MB print( a._sizeof_())
,执行时间 236ms(内存里当然快) 转为一个 df df = pd.DataFrame(a)
- test1: 直接用
df.to_csv('test1.csv')
,保存时间 35.7 秒,文件大小 474MB - test2: 用
df.to_pickle('test2.pkl')
,保存时间 402ms,几乎是瞬间完成,文件大小 195MB - test3: 用 df.to_hdf(‘test3.h5’, ‘table’),用时 1.72 秒,文件大小 199MB
增加操作
data['d'] = 0
- 按条件赋值的列
df['x1'] = df.apply(lambda x: x.amount if x.name != "" else 0, axis=1)
删除操作
df_clear = df.drop(df[df['x']<0.01].index)
<em># 也可以使用多个条件
</em>df_clear = df.drop(df[(df['x']<0.01) | (df['x']>10)].index) <em>#删除x小于0.01或大于10的行</em>
- 删除特定列
small_df = df.drop(columns=['Unnamed: 0', 'index', 'job_name', 'task_name',
'group', 'start_time_j', 'end_time_j', 'start_time_i',
'end_time_i', 'run_time_i', 'duration_min', 'wait_time',
'start_date'])
统计 value 操作
df['收入'].value_counts(ascending=True) # 统计整一列中的每个特定值的数量
df['收入'].counts("123") # 计算一列中特定值的数量
向 DataFrame 中增加数据
- 参考文献:https://blog.csdn.net/goodbye_earth/article/details/85210631
- 很坑爹的语法:df = df.append(data)
整列操作
- 【风险:可能会引发链式操作,导致一些奇怪的 bug 的产生】map 操作:传入一个 lambda 函数进行执行即可:
small_df['plan_gpu'] = small_df['plan_gpu'].map(lambda x: x / 100.0)
- 修改方案:直接使用 loc 进行更改比较好!
sub_test_df.loc[sub_test_df['label'] == 1, 'label'] = 0
sub_test_df.loc[sub_test_df['label'] == 2, 'label'] = 1
Groupby 操作
-
参考文献:https://zhuanlan.zhihu.com/p/101284491
- 标准的 groupby 操作
-
相同值映射为 index
list_drop_items = small_df['job_id'].drop_duplicates().values.tolist()
job_map = {}
idx = 0
for item in list_drop_items:
job_map[item] = idx
idx += 1
list_drop_items = []
copy_small_df = small_df.copy()
small_df['job_id'] = small_df['job_id'].map(lambda x: job_map[x])
print(small_df)
job_map = {}
修改列顺序
表头信息与表头重命名
- 获取表头的方法:
df.columns # 返回表头的信息
df.rename(columns={'a': 'beijing_count'})
- 读取文件后给表头赋值
排序操作
df.sort_value(by=["???"])
采样操作
privacy_origin_df.sample(n=100000, replace=False)
链式操作的 bug 解决 [SettingwithCopy 解决]
- 写的非常详细的一个文档:Pandas SettingwithCopy 警告解决方案 - 知乎 (zhihu.com)
TensorFlow 一些语法
TensorFlow 的版本依赖关系
- 从源代码构建 TensorFlow (google.cn)
-
一般来说,tf 需要安装对应版本的 cuda 和 cudnn 【且要直接安装在全局路径 /usr/local 下,建议采用多版本切换的方法,同时还要将/usr/local/cuda/lib64 写入到 LD_LIBRARY_PATH 中!】
- 安装 cudnn 的教程:ubuntu 安装 cuda10.2 以及 cudnn7.6.5 - 简书 (jianshu.com)
静态图获取变量
- get_collection()
Session
- 参考文档:https://zhuanlan.zhihu.com/p/32869210
-
session 是客户端与整个 TensorFlow 系统交互的接口
- session 的创建:session 拥有和管理物理资源 CPU和GPU、网络连接的功能,它最典型的使用方式是作为上下文管理器使用
# 创建本地 session
with tf.Session() as sess:
# ...
# 创建远程 session
with tf.Session("grpc://example.org:2222"):
# ...
- session 的参数:可以见下一节 配置 Config
- session 的运行:tf.Session.run 是运行 OP 和获取 tensor 的值的主要方式,可以一次性传入多个 OP 和 tensor 给它,然后 TensorFlow 会自动执行所有需要的 OP 来得到结果
x = tf.constant([[37.0, -23.0], [1.0, 4.0]])
w = tf.Variable(tf.random_uniform([2, 2]))
y = tf.matmul(x, w)
output = tf.nn.softmax(y)
init_op = w.initializer
with tf.Session() as sess:
# Run the initializer on `w`.
sess.run(init_op)
# 虽然只计算图了 output,但是所有相关的x,w,y 都会被执行,最后返回 NumPy 数组
print(sess.run(output))
# 这里既执行了 y,又执行了output,可能有人会问,y 是不是会被执行两次,实际并不是,
# y 只会执行一次,然后作为 output 的输入
y_val, output_val = sess.run([y, output])
配置 Config
- 参考文档:https://zhuanlan.zhihu.com/p/32869210
-
tf.ConfigProto(): session 运行配置
- allow_soft_placement 如果指定为 TRUE,那么 session 就会自动把不适合在 GPU 上运行的 OP 全部放到 CPU 上运行。
-
gpu_options.allow_growth 设置会使得程序在开始时候逐步的增长 GPU 显存使用量,而不是一开始就最大化的使用所有显存。
初始化
全局变量初始化
- tf.global_variables_initializer()
- 添加节点用于初始化全局变量(GraphKeys.GLOBAL_VARIABLES)。返回一个初始化所有全局变量的操作(Op)。在你构建完整个模型并在会话中加载模型后,运行这个节点。
- 能够将所有的变量一步到位的初始化,非常的方便。通过 feed_dict, 你也可以将指定的列表传递给它,只初始化列表中的变量。
sess.run(tf.global_variables_initializer(),
feed_dict= {
learning_rate_dis: learning_rate_val_dis,
adam_beta1_d_tf: adam_beta1_d,
learning_rate_proj: learning_rate_val_proj,
lambda_ratio_tf: lambda_ratio,
lambda_l2_tf: lambda_l2,
lambda_latent_tf: lambda_latent,
lambda_img_tf: lambda_img,
lambda_de_tf: lambda_de,
adam_beta1_g_tf: adam_beta1_g,
}
)
# learning_rate_dis为设置的变量,learning_rate_val_dis为我设置的具体的值。后续同理
Summary 和 Tensorboard
- 参考文档:https://zhuanlan.zhihu.com/p/102776848
- tf.summary()提供了各类方法(支持各种多种格式)用于保存训练过程中产生的数据(比如 loss_value、accuracy、整个 variable),这些数据以日志文件的形式保存到指定的文件夹中。
tensorboard --logdir=./ # 默认localhost和6006
tensorboard --logdir=./ --host 服务器ip --port 23445
Saver
模型保存,先要创建一个 Saver 对象:
saver=tf.train.Saver()
保存操作
saver.save(sess,'ckpt/mnist.ckpt',global_step=step)
在创建这个 Saver 对象的时候,有一个参数我们经常会用到,就是 max_to_keep 参数,这个是用来设置保存模型的个数,默认为 5,即 max_to_keep=5,保存最近的 5 个模型。如果你想每训练一代(epoch)就想保存一次模型,则可以将 max_to_keep 设置为 None 或者 0。
恢复操作
model_file=tf.train.latest_checkpoint('ckpt/')
saver.restore(sess,model_file)
Multinomial 操作
- 参考文献:https://blog.csdn.net/qq1483661204/article/details/78962940
- 对一批次的数组,其 values 代表采样的概率,最后返回的是一批次的采样结果
tf.GradientTape
- 参考文献: https://blog.csdn.net/guanxs/article/details/102471843
- 一个求导的上下文管理器,可以用于求导。
with tf.GradientTape() as tape:
predictions = model(images)
loss = loss_object(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
tf.losses 函数
sparse_softmax_cross_entropy
Coordinator
- 知乎上的一些教程:[TensorFlow] 利用 tensorflow 中的队列和多线程读取数据来加快模型训练速度
- 官网文档:https://www.tensorflow.org/api_docs/python/tf/train/Coordinator
###
Torch 一些语法
Torch sparse 安装失败原因
- 参考文献: 如何安装 torch-sparse
- 官方安装教程:https://pytorch-geometric.readthedocs.io/en/latest/notes/installation.html
torch 实现图神经网络
- 参考文献:https://zhuanlan.zhihu.com/p/94491664
-
PyG(PyTorch Geometric Library)
- 基于 PyTorch 的用于处理不规则数据(比如图)的库,或者说是一个用于在图等数据上快速实现表征学习的框架。它的运行速度很快,训练模型速度可以达到 DGL(Deep Graph Library )v0.2 的 40 倍(数据来自论文)。
- 集成了很多论文中提出的方法(GCN,SGC,GAT,SAGE 等等)和常用数据集。
-
Data 数据结构
- 节点的属性/特征
x = torch.tensor([[2,1],[5,6],[3,7],[12,0]], dtype=torch.float) # 特征
y = torch.tensor([0,1,0,1], dtype=torch.float) # label
- 邻接/边连接信息:图的节点连接信息要以 COO 格式进行存储。在 COO 格式中,COO list 是一个 2*E 维的 list。第一个维度的节点是源节点(source nodes),第二个维度中是目标节点(target nodes),连接方式是由源节点指向目标节点。对于无向图来说,存贮的 source nodes 和 target node 是成对存在的。
edge_index = torch.tensor([[0,1,2,0,3],
[1,0,1,3,2]],dtype=torch,long)
model.train()和 model.eval()
- 参考文献:https://blog.csdn.net/qq_38410428/article/details/101102075
- 如果模型中有 BN 层(Batch Normalization)和 Dropout,需要在训练时添加 model.train()。model.train()是保证 BN 层能够用到每一批数据的均值和方差。对于 Dropout,model.train()是随机取一部分网络连接来训练更新参数。
- 如果模型中有 BN 层(Batch Normalization)和 Dropout,在测试时添加 model.eval()。model.eval()是保证 BN 层能够用全部训练数据的均值和方差,即测试过程中要保证 BN 层的均值和方差不变。对于 Dropout,model.eval()是利用到了所有网络连接,即不进行随机舍弃神经元。
- 训练完 train 样本后,生成的模型 model 要用来测试样本。在 model(test)之前,需要加上
model.eval()
,否则的话,有输入数据,即使不训练,它也会改变权值。这是 model 中含有 BN 层和 Dropout 所带来的的性质。 - 如果不在意显存大小和计算时间的话,仅仅使用
model.eval()
已足够得到正确的 validation/test 的结果;而with torch.no_grad()
则是更进一步加速和节省 gpu 空间(因为不用计算和存储梯度),从而可以更快计算,也可以跑更大的 batch 来测试。
学习率 lr_scheduler
保存模型与加载模型
- 参考文献:保存和加载模型 - PyTorch 官方教程中文版
- 可以保存模型和优化器的参数
# 加载模型
seq2seq_model.load_state_dict(torch.load(cfg.lstm_model_path))
Categorical
- 参考文献:https://www.cnblogs.com/pprp/p/14285062.html
- distributions 包主要是实现了参数化的概率分布和采样函数。参数化是为了让模型能够具有反向传播的能力,这样才可以用随机梯度下降的方法来进行优化。
Detach 函数
- 参考文献:https://zhuanlan.zhihu.com/p/410199046
- 作用:detach()就是返回一个新的 tensor,并且这个 tensor 是从当前的计算图中分离出来的。但是返回的 tensor 和原来的 tensor 是共享内存空间的。
import torch
a = torch.tensor([1.0, 2.0, 3.0], requires_grad = True)
a = a.detach() # 会将requires_grad 属性设置为False
print(a.requires_grad)
问题修复: 同时 backward 两个网络
- 参考文献:https://blog.csdn.net/weixin_44058333/article/details/99701876
-
retain_graph 这个参数在平常中我们是用不到的,但是在特殊的情况下我们会用到它:
- 一个网络有两个 output 分别执行 backward 进行回传的时候: output1.backward(), output2.backward().
- 一个网络有两个 loss 需要分别执行 backward 进行回传的时候: loss1.backward(), loss1.backward().
#两个网络的情况需要分别为两个网络分别定义optimizer
optimizer1= torch.optim.SGD(net1.parameters(), learning_rate, momentum,weight_decay)
optimizer2= torch.optim.SGD(net2.parameters(), learning_rate, momentum,weight_decay)
#train 部分的loss回传处理
loss1 = loss()
loss2 = loss()
optimizer1.zero_grad() #set the grade to zero
loss1.backward(retain_graph=True) #保留backward后的中间参数。
optimizer1.step()
optimizer2.zero_grad() #set the grade to zero
loss2.backward()
optimizer2.step()
ge(num)函数
- 参考文献:https://zhuanlan.zhihu.com/p/348035584
- torch.Tensor 可以调用该函数
- 获取 bool 掩码,当 item>num 的时候,设置为 True,否则为 False
chunk()函数
- 参考文献:https://cloud.tencent.com/developer/article/1794326
-
函数会将输入张量(input)沿着指定维度(dim)均匀的分割成特定数量的张量块(chunks),并返回元素为张量块的元组(tuple)。torch.chunk 函数有三个参数:
- input(Tensor)- 待分割的输入张量
- chunks(int)- 均匀分割张量块的数量
- dim(int)- 进行分割的维度
import torch
A = torch.randint(0, 255, (128, 3, 32, 32))
result = torch.chunk(input=A,
chunks=14,
dim=0)
print(len(result))
# 13
print(result[0].size())
# torch.Size([10, 3, 32, 32])
print(result[-1].size())
# torch.Size([8, 3, 32, 32])
- 注意,最后如果要重新变成 Tensor,可以执行这个函数
B = torch.randn((1185024, 2))
res = torch.stack(B.chunk(64,dim=0))
print(res.shape) # torch.Size([64, 18516, 2])
masked_select
- 参考文献:https://zhuanlan.zhihu.com/p/348035584
-
torch.masked_select(input, mask, out=None) 函数返回一个根据布尔掩码 (boolean mask) 索引输入张量的 1D 张量,其中布尔掩码和输入张量就是 torch.masked_select(input, mask, out = None) 函数的两个关键参数,函数的参数有:
- input(Tensor) - 需要进行索引操作的输入张量;
- mask(BoolTensor) - 要进行索引的布尔掩码;
- out(Tensor, optional) - 指定输出的张量。比如执行 torch.zeros([2, 2], out = tensor_a),相当于执行 tensor_a = torch.zeros([2, 2]);
import torch
x = torch.randn([3, 4])
print(x)
# 将x中的每一个元素与0.5进行比较
# 当元素大于等于0.5返回True,否则返回False
mask = x.ge(0.5)
print(mask)
print(torch.masked_select(x, mask))
'''
tensor([[ 1.2001, 1.2968, -0.6657, -0.6907],
[-2.0099, 0.6249, -0.5382, 1.4458],
[ 0.0684, 0.4118, 0.1011, -0.5684]])
tensor([[ True, True, False, False],
[False, True, False, True],
[False, False, False, False]])
tensor([1.2001, 1.2968, 0.6249, 1.4458])
'''
损失函数 Loss Function
Squeeze 和 Unsqueeze 函数
- 参考文献: https://blog.csdn.net/xiexu911/article/details/80820028
-
torch.squeeze(): 这个函数主要对数据的维度进行压缩,去掉维数为 1 的的维度,比如是一行或者一列这种,一个一行三列(1,3)的数去掉第一个维数为一的维度之后就变成(3)行。
- a.squeeze(N) 就是在 a 中指定位置 N 加上一个维数为 1 的维度。还有一种形式就是 b=torch.squeeze(a,N) a 就是在 a 中指定位置 N 加上一个维数为 1 的维度。
- torch.unsqueeze(): 这个函数主要是对数据维度进行扩充。给指定位置加上维数为一的维度,比如原本有个三行的数据(3),在 0 的位置加了一维就变成一行三列(1,3)。
拼接 Tensor
- 参考文献:https://blog.csdn.net/weixin_44613063/article/details/89576810
- 这种做法比 Numpy 感觉还要好,不会凭空多出来一个维度
torch.stack(sequence, dim=0)
判断 Tensor/Model 是否在外部设备上
import torch
import torchvision.models as models
model=models.vgg11(pretrained=False)
print(next(model.parameters()).is_cuda) #False
data=torch.ones((5,10))
print(data.device) #cpu
只打印数值,不打印设备信息
print('batch_loss: {:.3f} batch acc: {:.3f}'.format(loss.data, acc.data))
梯度传播因为更改的时候 RuntimeError
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super(Actor, self).__init__()
self.actor = nn.Sequential(
nn.Linear(state_dim, action_dim),
nn.Tanh(),
)
def forward(self, state):
raw_action = self.actor(state)
raw_action[:, 0:3] = F.softmax(raw_action[:, 0:3], dim=1) # 会报错!!!
return raw_action
gather()
- 参考文献:https://zhuanlan.zhihu.com/p/352877584
- 讲的非常有用,终于看懂了一次
- 具体的应用场景在这里:/home/linchangxiao/labInDiWu/easy-rl/codes/DQN/task0.py
获得向量中特定值所在的 index
DataLoader 和 Subset 和自定义 Dataset
- 参考文献:How to load a list of numpy arrays to pytorch dataset loader?
-
参考文献[自定义 Dataset]:Pytorch:自定义 Subset/Dataset 类完成数据集拆分 - orion-orion - 博客园 (cnblogs.com)
- Subset 对象虽然仍然还存有 data 属性,但是内置的 target 和 classes 属性已经不复存在。
- 注意:DataLoader 只是给数据集封装一个类,为了更少的使用空间,一般只会在内存中维护一个 dataset 实例,而 DataLoader 只是确定如何 load 这个 dataset 而已。
import torch
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
my_x = [np.array([[1.0,2],[3,4]]),np.array([[5.,6],[7,8]])] # a list of numpy arrays
my_y = [np.array([4.]), np.array([2.])] # another list of numpy arrays (targets)
tensor_x = torch.Tensor(my_x) # transform to torch tensor
tensor_y = torch.Tensor(my_y)
my_dataset = TensorDataset(tensor_x,tensor_y) # create your datset
my_dataloader = DataLoader(my_dataset) # create your dataloader
cuda 操作的异步性和手动同步方案
在执行 torch.matmul(x, y)
这样的 CUDA 操作后需要进行同步的原因是为了确保操作的完成。CUDA 操作通常是异步的,这意味着当你调用一个 CUDA 操作时,它可能会立即返回并开始在 GPU 上执行,而不会等待操作的实际完成。这是为了充分利用 GPU 的并行性能。
但在某些情况下,你可能需要确保 CUDA 操作已经完成,才能继续执行后续的操作,例如:
- 测量操作执行时间:如果你想测量某个 CUDA 操作的执行时间,那么在执行操作后立即同步可以确保计时的准确性。如果不同步,计时可能会在操作实际完成之前结束。
- 操作顺序的依赖性:如果你的后续操作依赖于前面的 CUDA 操作的结果,那么同步可以确保前面的操作已经完成,从而避免数据依赖性错误。
- 结果一致性:在某些情况下,你可能需要确保 CUDA 操作的结果在继续执行后续操作之前已经准备好,以避免数据不一致性。
因此,同步 CUDA 设备是一种控制操作执行顺序和确保操作已经完成的方式。你可以使用 torch.cuda.synchronize()
函数来实现同步,确保在继续执行后续操作之前等待 CUDA 操作的完成。这对于需要精确控制和管理 CUDA 操作的应用非常重要。
import torch
# 创建两个张量并将它们移到 CUDA 设备上
x = torch.randn(1000, 1000).cuda()
y = torch.randn(1000, 1000).cuda()
# 执行一些 CUDA 操作
result = torch.matmul(x, y)
# 同步 CUDA 设备
torch.cuda.synchronize()
# 执行其他操作
z = torch.randn(1000, 1000).cuda()
torch.cuda.Stream()
- Steam 是 CUDA 命令线性执行的抽象形式,分配给设备的 CUDA 命令按照入队序列的顺序执行。每个设备都有一个默认的 Steam,也可以通过 torch.cuda.Stream()创建新的 Stream。如果不同 Stream 中的命令交互执行,那么就不能保证命令绝对按顺序执行。下面的这个例子不同的 Stream 就可能会产生错误。
cuda = torch.device("cuda")
# 创建默认的stream,A就是使用的默认stream
s = torch.cuda.Stream()
A = torch.randn((1,10), device=cuda)
for i in range(100):
# 在新的stream上对默认的stream上创建的tensor进行求和
with torch.cuda.stream(s):
# 存在的问题是:torch.sum()可能会在torch.randn()之前执行
B = torch.sum(A)
print(B)
- 这个例子存在的问题是 torch.sum()可能会在 torch.randn()之前就执行。为了保证 Stream 中的命令绝对按顺序执行,接下来使用 Synchronize 同步方法解决上面例子的问题:
cuda = torch.device("cuda")
s = torch.cuda.Stream()
A = torch.randn((1,10), device=cuda)
default_stream = torch.cuda.current_stream()
print("Default Stream: {}".format(default_stream))
# 等待创建A的stream执行完毕
torch.cuda.Stream.synchronize(default_stream)
for i in range(100):
# 在新的stream上对默认的stream上创建的tensor进行求和
with torch.cuda.stream(s):
print("current stream: {}".format(torch.cuda.current_stream()))
B = torch.sum(A)
print(B)
固定缓冲区与 pin_memory
- 缓存就是当计算机内存不足的时候,就会把内存中的数据存储到硬盘上。固定缓冲区就是说常驻内存,不能把这部分数据缓存到硬盘上。可以直接使用 pin_memory 方法或在 Tensor 上直接调用 pin_memory 方法将 Tensor 复制到固定缓冲区。为什么要做固定缓冲区呢?目的只有一个,就是把 CPU 上的固定缓冲区拷贝到 GPU 上时速度快。Tensor 上的 is_pinned 方法可以查看该 Tensor 是否加载到固定缓冲区中。
from torch.utils.data._utils.pin_memory import pin_memory
x = torch.Tensor([[1,2,4], [5, 7, 9], [3, 7, 10]])
# 通过pin_memory()方法将x复制到固定缓冲区
y = pin_memory(x)
# 在tensor上直接调用pin_memory()方法将tensor复制到固定缓冲区
z = x.pin_memory()
# id()方法返回tensor的内存地址,pin_memory()返回tensor对象的拷贝,因此内存地址是不同的
print("id: {}".format(id(x)))
print("id: {}".format(id(y)))
print("id: {}".format(id(z)))
# 当tensor放入固定缓冲区后,就可以异步将数据复制到gpu设备上了
a = z.cuda(non_blocking=True)
print(a)
print("is_pinned: {}/{}".format(x.is_pinned(), z.is_pinned()))
# id: 1605289350472
# id: 1605969660408
# id: 1605969660248
# tensor([[ 1., 2., 4.],
# [ 5., 7., 9.],
# [ 3., 7., 10.]], device='cuda:0')
# is_pinned: False/True
Pytorch 原理 - 如何把数据从 CPU 传到 GPU 中(dispatcher, cuda, 调度)
cuda、cudnn、cuda toolkit 的区别
- 参考文献:【精选】一文讲清楚 CUDA、CUDA toolkit、CUDNN、NVCC 关系_健 0000 的博客-CSDN 博客
-
区分
- CUDA:为“GPU 通用计算”构建的运算平台。
- cudnn:为深度学习计算设计的软件库。
- CUDA Toolkit (nvidia): CUDA 完整的工具安装包,其中提供了 Nvidia 驱动程序、开发 CUDA 程序相关的开发工具包等可供安装的选项。包括 CUDA 程序的编译器、IDE、调试器等,CUDA 程序所对应的各式库文件以及它们的头文件。
- CUDA Toolkit (Pytorch): CUDA 不完整的工具安装包,其主要包含在使用 CUDA 相关的功能时所依赖的动态链接库。不会安装驱动程序。
- NVCC 是 CUDA 的编译器,只是 CUDA Toolkit 中的一部分
- 注:CUDA Toolkit 完整和不完整的区别:在安装了 CUDA Toolkit (Pytorch)后,只要系统上存在与当前的 cudatoolkit 所兼容的 Nvidia 驱动,则已经编译好的 CUDA 相关的程序就可以直接运行,不需要重新进行编译过程。如需要为 Pytorch 框架添加 CUDA 相关的拓展时(Custom C++ and CUDA Extensions),需要对编写的 CUDA 相关的程序进行编译等操作,则需安装完整的 Nvidia 官方提供的 CUDA Toolkit。
- pytorch 运行时的 CUDA 版本
# Pytorch 实际使用的运行时的 cuda 目录
import torch.utils.cpp_extension
torch.utils.cpp_extension.CUDA_HOME
# '/usr/local/cuda-11.6'
# 编译该 Pytorch release 版本时使用的 cuda 版本
import torch
torch.version.cuda
# '10.2'
- Pytorch 寻找 cuda 的流程
- 环境变量 CUDA_HOME 或 CUDA_PATH
- /usr/local/cuda
- which nvcc 的上级上级目录(which nvcc 会在环境变量 PATH 中找)
- 如果上述都不存在,则 torch.utils.cpp_extension.CUDA_HOME 为 None,会使用 conda 安装的 cudatoolkit,其路径为 cudart 库文件目录的上级目录(此时可能是通过 conda 安装的 cudatoolkit,一般直接用 conda install cudatoolkit,就是在这里搜索到 cuda 库的)。
进度条库 tqdm
- 参考文献:python 进度条库 tqdm 详解
import time
from tqdm import tqdm, trange
#trange(i)是tqdm(range(i))的一种简单写法
for i in trange(100):
time.sleep(0.05)
for i in tqdm(range(100), desc='Processing'):
time.sleep(0.05)
dic = ['a', 'b', 'c', 'd', 'e']
pbar = tqdm(dic)
for i in pbar:
pbar.set_description('Processing '+i)
time.sleep(0.2)
100%|██████████| 100/100 [00:06<00:00, 16.04it/s]
Processing: 100%|██████████| 100/100 [00:06<00:00, 16.05it/s]
Processing e: 100%|██████████| 5/5 [00:01<00:00, 4.69it/s]
Label 的 bug
- 参考文献:Pytorch 报错 CUDA error: device-side assert triggered_Dr. 卷心菜的博客-CSDN 博客
- 场景:在进行文本分类的时候,我遇到了一个问题,当 label 不是从 0 到 n 的数字时,直接就报错了!
opencv2 的一些实用
将视频中某时间段内的所有帧提取出来
import cv2
# 定义视频文件路径
video_path = "testdownload.mp4"
# 定义提取的时间范围(单位:秒)
start_time = 1 * 60 + 17 # 1分17秒
end_time = 1 * 60 + 19 # 1分19秒
# 打开视频文件
cap = cv2.VideoCapture(video_path)
# 获取视频的帧率
fps = cap.get(cv2.CAP_PROP_FPS)
# 计算要提取的帧的起始和结束帧索引
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
# 设置当前帧为起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
# 循环读取视频帧并保存
frame_count = start_frame
while frame_count <= end_frame:
# 读取视频帧
ret, frame = cap.read()
# 如果读取成功,则保存帧图像
if ret:
frame_file_path = f"frame_{frame_count}.jpg"
cv2.imwrite(frame_file_path, frame)
print(f"Saved frame {frame_count}")
frame_count += 1
# 如果到达结束帧,则跳出循环
if frame_count > end_frame:
break
# 关闭视频文件
cap.release()
超分辨率
- Waifu2x:主要用于动漫图片的 SR,但是很多高质量的 SR 需要花钱
- Image Upscaler[AI Image Upscaler - Upscale Photo, Cartoons in Batch Free (imgupscaler.com)]: 有几次免费的机会,不知道 SR 的配置
- BigJPG[Bigjpg - AI 人工智能图片无损放大 - 使用人工智能深度卷积神经网络(CNN)无损放大图片]:好像是免费的,但是一次居然要排队 40 多分钟…
强化学习
A2C 的一个很好的例子
loss 函数的问题
-
参考文献:https://www.cnblogs.com/yifanrensheng/p/13381934.html
- 需要手动将 Loss 函数进行一次允许求导,不然会报错
- 这个是因为没有将 requires_grad 设为 True,l=LOSS(out,label)中的 l 默认是 requires_grad 为 false,这个 l 其实也是一个张量 Tensor 类型,将其的 requires_grad 改为 True 后,使用 backward 函数就可以得到 requires_grad 为 True 的所有参数的梯度。
-
参考文献:PyTorch 学习笔记(六):PyTorch 的十八个损失函数
- 这篇文章总结了所有可能需要的用到损失函数
概率与采样
- 参考文献:https://zhuanlan.zhihu.com/p/343271979
- 这是一篇很好的介绍策略学习算法的具体实现方式的文章,概率采样的内容在这里
probs = self.policy_net(state)
m = Bernoulli(probs)
# 公式6的实现:本质上是个分类问题,负号是为了将梯度上升转为梯度下降
loss = -m.log_prob(action) * reward # Negtive score function x reward
- 参考文献:https://zhuanlan.zhihu.com/p/138117543
- 这是一篇介绍多标签问题的解决方案。
- 从 n 选 1 推进到 n 选 k
策略网络的训练问题
- 参考文献 1:https://zhuanlan.zhihu.com/p/343271979
- 参考文献 2:
matplot 画图
log 坐标轴与刻度
import matplotlib.pyplot as plt
x = [5,10,15,20,25,30]
y = [1,2,3,4,5,6]
fig=plt.figure()
ax = fig.add_subplot(111)
ax.plot(x, y, marker='v', color='r', label='example')
ax.legend( loc=(1.36/5,0.05/5),ncol=3) #fontsize=10,loc是图例左下角所在的点的x和y值,ncol是图例分几列显示
ax.grid() # 网格
ax.set_xlabel('x', fontsize=13) # 坐标轴标签
ax.set_xlim(5,30) # x轴只显示5到30区间
ax.set_xticks(x) # x轴上的刻度显示,可以把x改成字符串列表
ax.set_ylabel('y', fontsize=13)
ax.set_ylim(0.25,52) # y 轴只显示0.25到52区间
ax.set_yscale("log",base=4,subs=[2,3]) # y 轴上以4为底数呈对数显示,2、3表示会标记出2倍、3倍的位置
# ax.set_yticks([0,2,8,32]) # 用了log就不能用这个
ax.set_yticklabels(['0','0.25','1', '4', '16']) # 使y轴上刻度显示为这几个数,其中第一个数必须是0
ax.set_zorder(0)
legend 相关问题
- 调整图例间距:https://blog.csdn.net/qq_41645987/article/details/109147034
- 水平 legend: ncol
- legend 的手动调整与微移动:https://www.cnblogs.com/IvyWong/p/9916791.html
File 读写文件
mmap 内存映射文件
Mmap 的原理和底层 C 函数介绍
利用 mmap 写一个布隆过滤器
-
参考文献
- 比较好理解的版本:冷饭新炒:理解布隆过滤器算法的实现原理 - throwable - 博客园 (cnblogs.com)
- 布隆过滤器针对缓存场景:【为什么用布隆过滤器解决缓存问题】如何应对缓存问题 — G.Fukang’s Blog (gongfukangee.github.io)
- 布隆过滤器针对缓存场景:【如何利用布隆过滤器解决缓存问题】MeetingFilm/布隆过滤器解决缓存穿透.md at master · daydreamdev/MeetingFilm · GitHub
- 不是很好理解的版本:用 Python 从零开始实现一个 Bloomfilter_烟火君的博客-CSDN 博客
-
github 地址(一个很好的学习资料):GitHub - preytaren/fastbloom: A simple but fast bloomfilter written in Python
Logging
import logging
def get_logger(logging_file, enable_multiprocess, showing_stdout_level=logging.INFO):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_ch = logging.FileHandler(logging_file)
file_ch.setLevel(logging.DEBUG)
if enable_multiprocess:
file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] at [process_id: %(process)d] %(filename)s,%(lineno)d: %(message)s',
datefmt='%Y-%m-%d(%a)%H:%M:%S')
else:
file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] at %(filename)s,%(lineno)d: %(message)s',
datefmt='%Y-%m-%d(%a)%H:%M:%S')
file_ch.setFormatter(file_formatter)
logger.addHandler(file_ch)
#将大于或等于INFO级别的日志信息输出到StreamHandler(默认为标准错误)
console = logging.StreamHandler()
console.setLevel(showing_stdout_level)
formatter = logging.Formatter('[%(levelname)-8s] %(message)s')
console.setFormatter(formatter)
logger.addHandler(console)
return logger