yield 的一些特殊用法

  • https://blog.csdn.net/mieleizhi0522/article/details/82142856
  • 带 yield 的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是 next 函数,next 就相当于“下一步”生成哪个数,这一次的 next 开始的地方是接着上一次的 next 停止的地方执行的,所以调用 next 的时候,生成器并不会从 foo 函数的开始执行,只是接着上一步停止的地方开始,然后遇到 yield 后,return 出要生成的数,此步就结束。
def foo():
    print("starting...")
    while True:
        res = yield 4
        print("res:",res)
g = foo()
print(next(g))
print("*"*20)
print(next(g))

# starting...
# 4
# ********************
# res: None (这里是因为最后一步为yield 4, 返回了之后下次进入函数就没有右值了!)
# 4

python import 踩坑指南

  • 参考文章:https://blog.csdn.net/karmayh/article/details/108697835
  • 问题就来自于 sys.path 的路径设置问题

    • 详细的教程如下: http://www.coolpython.net/python_senior/module_concept/modify-sys-path.html
    • sys.path 是一个列表,存放的是 python 搜索模块时可以搜索的路径,启动 python 脚本时,会将执行当前命令所在的目录添加到这个列表中,而且是在列表的最前面,正是因为这个操作,你才能在自己的项目里引用自己编写的模块,当模块名称与第三方模块或系统模块冲突时,优先引用项目里的模块。通常,sys.python 里的内容如下所示:
    • 执行打印 sys.path 的操作结果如下:
>>> import sys
>>> sys.path
['', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python36.zip', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6/lib-dynload', '/home/linchangxiao/anaconda3/envs/py36tf112/lib/python3.6/site-packages']

虚拟环境与包管理器的相关问题

Conda 安装与设置相关操作

wget https://mirrors.bfsu.edu.cn/anaconda/archive/[Anaconda3-2023.07-2-Linux-x86_64.sh](https://repo.anaconda.com/archive/Anaconda3-2023.07-2-Linux-x86_64.sh) --no-check-certificate

bash Anaconda3-2023.07-2-Linux-x86_64.sh

source ~/.bashrc

Conda 和 Pip 源 - 建议直接安装 conda 后直接修改

vim ~/.condarc


channels:
  - defaults
show_channel_urls: true
default_channels:
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
  conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  pytorch-lts: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  deepmodeling: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/

pip 包管理器的完整执行流程

Pipenv

  • 有一些仓库会出现 Pipfile 和 Pipfile.lock,故使用这个会比较好一些?
  • 参考文献:Pipenv 的基本使用 — pipenv 2020.8.13.dev0 文档 (pipenv-zh.readthedocs.io)
  • 但是突然发现这个 Pipfile 根本没有任何的进度条提示,非常不方便,所以想转换为 requirements.txt:

    • pipenv requirements > requirements.txt && pipenv requirements –dev-only > requirements-dev.txt

pip 生成 requirements.txt

pip freeze > requirements.txt

从源码在 conda 虚拟环境中安装一个 pip 包的流程

python -m pip install -e . 是一个用于安装当前 Python 项目的命令,其中 -e 表示使用可编辑模式(editable mode)。这种模式下,安装的包实际上是在当前项目的源代码目录上创建的符号链接,而不是将包复制到 Python 安装目录。

以下是这个命令的执行流程:

  1. python -m pip:使用 Python 解释器的 pip 模块进行包管理。
  2. install:指示 pip 安装包。
  3. -e .:使用可编辑模式安装当前目录的包。

    • -e--editable 表示使用编辑模式。
    • . 表示当前目录,即安装当前项目。

在执行这个命令时,发生的一些关键步骤包括:

  • pip 会在当前目录查找 setup.py 文件,该文件描述了项目的元信息和依赖关系。
  • pip 将项目安装到一个虚拟环境中(如果存在),或者全局 Python 环境中。
  • 使用可编辑模式安装时,pip 不会将包复制到 Python 安装目录。相反,它会在当前项目的虚拟环境或全局环境中创建一个符号链接,指向项目的源代码目录。这样,对项目源代码的修改会立即反映在安装的包中,无需重新安装。

这种可编辑模式通常用于开发过程中,允许开发人员实时调试和修改项目代码,而无需反复安装包。在项目开发过程中,经常会使用这种模式来快速测试和验证代码修改。

import os
from pathlib import Path

from setuptools import find_packages
from skbuild import setup

import legate.install_info as lg_install_info

legate_dir = Path(lg_install_info.libpath).parent.as_posix()

cmake_flags = [
    f"-Dlegate_core_ROOT:STRING={legate_dir}",
]

env_cmake_args = os.environ.get("CMAKE_ARGS")
if env_cmake_args is not None:
    cmake_flags.append(env_cmake_args)
os.environ["CMAKE_ARGS"] = " ".join(cmake_flags)


setup(
    name="Legate Hello",
    version="0.1",
    description="A Hello World for Legate",
    author="NVIDIA Corporation",
    license="Apache 2.0",
    classifiers=[
        "Intended Audience :: Developers",
        "Topic :: Database",
        "Topic :: Scientific/Engineering",
        "License :: OSI Approved :: Apache Software License",
        "Programming Language :: Python",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
        "Programming Language :: Python :: 3.11",
    ],
    packages=find_packages(
        where=".",
        include=["hello", "hello.*"],
    ),
    include_package_data=True,
    zip_safe=False,
)

伟大的 mamba

conda install mamba -c conda-forge

conda 删除 env

Vscode 的相关操作

搭建 conda 环境、搭建 pylint

远程连接 notebook

远程安装 vscode 插件非常慢的解决方案

Debug 问题

Invalid value encountered in long_scalars

Vscode 的 python 插件不支持 python 3.6 版本的问题

VScode 调试带参数

  • 直接创建.vscode/launch.json 即可
{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [

        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "justMyCode": true,
            "args": [
                "--cluster_job_log", "trace/cluster_job_log",
                "--analyze_trace",
            ]
        },
    ]
}

条件断点

debug 中出现的 open()相对路径找不到的解决

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [


        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "justMyCode": true,
            "args": [
                
            ],
            // "env": {"PYTHONPATH": "${workspaceFolder}/yafs${pathSeparator}${env:PYTHONPATH}"}
            "cwd": "${fileDirname}",
        },
    ]
}

##

Python 原生语法

装饰器学习

Python 遍历删除

lst = [x for x in lst if x != 0]
print lst

with 关键字

  • 这个关键字一直在用,但是之前没有很清晰地去理解含义

  • 其实就是一个 Try-catch 结构的修改版

    • 支持场景:1、文件操作。2、进程线程之间互斥对象。3、支持上下文其他对象。
  • 原理:

    • with 语句实质是上下文管理。
    • 1、上下文管理协议。包含方法__enter__() 和 exit(),支持该协议对象要实现这两个方法。
    • 2、上下文管理器,定义执行 with 语句时要建立的运行时上下文,负责执行 with 语句块上下文中的进入与退出操作。
    • 3、进入上下文的时候执行__enter__方法,如果设置 as var 语句,var 变量接受__enter__()方法返回值。
    • 4、如果运行时发生了异常,就退出上下文管理器。调用管理器__exit__方法

*号 与 **号

  • 后面会增加和 zip 绑定在一起的操作,该操作经常出现在很多源码中。
  • 用于乘法计算(*) 和幂乘(**)
  • 用于元组打包与解包

    • 打包:将传递给函数的任意多个(也可以是 0 个)非关键字参数/关键字参数打包成一个元组(*号)/字典(**号)【元组只能接收非关键字参数,字典只能接收关键字参数】
  • 解包:就是传递给函数的列表、元组、字典拆分成独立的多个元素然后赋值给函数中参变量(包括普通的位置参数、关键字参数、非关键字参数)。【解出来传给函数的只有键值(.key)】

zip()和 zip(*)

  • 参考文献 1: https://blog.nowcoder.net/n/f1caca2e5e2a4943b1697133f084bf6b
    • zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。
    • 如果各个迭代器的元素个数不一致,则返回列表长度与最短的对象相同。利用 * 号操作符,可以将元组解压为列表。zip(*)解压函数,将对象中的每一个元素/元祖/字符串的相同位置的子元素组合成一个新的元祖,并将所有组成的元祖以列表的形式进行展示。
>>> a = [1,2,3] 
>>> b = [4,5,6] 
>>> c = [4,5,6,7,8] 
>>> zipped = zip(a,b) 
# 打包为元组的列表 [(1, 4), (2, 5), (3, 6)] 
>>> zip(a,c) 
# 元素个数与最短的列表一致 [(1, 4), (2, 5), (3, 6)] 
>>> zip(*zipped) 
#  zip 相反,可理解为解压,返回二维矩阵式 [(1, 2, 3), (4, 5, 6)]
l = ['flower', 'flow', 'flight']
print(zip(l))
print(zip(*l))
print(list(zip(l))) #转换为list显示
print(list(zip(*l)))
print(tuple(zip(l)))
print(tuple(zip(*l)))

# <zip object at 0x0000018B0BBD4F08>
# <zip object at 0x0000018B0BBD4F08>
# [('flower',), ('flow',), ('flight',)]
# [('f', 'f', 'f'), ('l', 'l', 'l'), ('o', 'o', 'i'), ('w', 'w', 'g')]
# (('flower',), ('flow',), ('flight',))
# (('f', 'f', 'f'), ('l', 'l', 'l'), ('o', 'o', 'i'), ('w', 'w', 'g'))

l = ['flower', 'flow', 'flight']
m = ['a', 'b', 'c']
k = zip(l, m)
print k
print zip(*k)

# [('flower', 'a'), ('flow', 'b'), ('flight', 'c')]
# [('flower', 'flow', 'flight'), ('a', 'b', 'c')]
  • 参考文献 2:https://developer.aliyun.com/article/373387
    • 应用场景 1: 矩阵转置
      • zip(*a):将 list 看成 tuple 解压,恰好得到我们“行列互换”的效果;
      • map(list,zip(*a)):再通过对每个元素应用 list()函数,将 tuple 转换为 list;
      • zip 函数接受任意多个序列作为参数,将所有序列按相同的索引组合成一个元素是各个序列合并成的 tuple 的新序列,新的序列的长度以参数中最短的序列为准。
      • 另外(*)操作符与 zip 函数配合可以实现与 zip 相反的功能,即将合并的序列拆成多个 tuple。
>>> a = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> zip(*a)
[(1, 4, 7), (2, 5, 8), (3, 6, 9)]
>>> map(list,zip(*a))
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]

###

List

获取二维及以上维度的 List 的最大最小值语法糖

print(min(min(row) for row in simUsers), max(max(row) for row in simUsers))

Map

获取 map 中 value 为特定值的 key

Tuple

nametuple

  • 参考文献:
  • namedtuple 是继承自 tuple 的子类。namedtuple 创建一个和 tuple 类似的对象,而且对象拥有可访问的属性。
from collections import namedtuple

# 定义一个namedtuple类型User,并包含name,sex和age属性。
User = namedtuple('User', ['name', 'sex', 'age'])

# 创建一个User对象
user = User(name='kongxx', sex='male', age=21)

# 也可以通过一个list来创建一个User对象,这里注意需要使用"_make"方法
user = User._make(['kongxx', 'male', 21])

print user
# User(name='user1', sex='male', age=21)

# 获取用户的属性
print user.name
print user.sex
print user.age

# 修改对象属性,注意要使用"_replace"方法
user = user._replace(age=22)
print user
# User(name='user1', sex='male', age=21)

# 将User对象转换成字典,注意要使用"_asdict"
print user._asdict()
# OrderedDict([('name', 'kongxx'), ('sex', 'male'), ('age', 22)])

JSON 操作

JSON 格式转化

json 读取和写入

Collection 语法

获取 List 的分布/频数

  • 使用:
from collections import Counter
origin_label_distribution = Counter(train_all_dataset.get_subset_targets())
print(origin_label_distribution)

global()与 local()

  • globals()函数和 locals()函数都是 python 的内置函数,他们的作用分别是以 dict 数据类型返回 python 程序的全局变量和某个局部的变量。globals()可读可写,而 locals()是只读!
>>> globals() 
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': , '__spec__': None, '__annotations__': {}, '__builtins__': } 
>>> 
>>> a = 1 
>>> b = 2 
>>> import os 
>>> import sys 
>>> globals() 
{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': , '__spec__': None, '__annotations__': {}, '__builtins__': , 'a': 1, 'b': 2, 'os': , 'sys': } 
>>>

文件操作

判断路径存在与创建新文件夹

os.path.exists(path)

列出文件夹目录下的所有文件名

Sorted

# 按列表a中元素的值进行排序,并返回元素对应索引序列
a = [1, 3, 4, 5, 2, 7, 9]
print('a:', a)
sorted_id = sorted(range(len(a)), key=lambda k: a[k], reverse=True)
print('元素索引序列:', sorted_id)

# 结果
# a: [1, 3, 4, 5, 2, 7, 9]
# 元素索引序列: [6, 5, 3, 2, 1, 4, 0]

datetime 语法中的问题

多进程与多线程相关主题

Multiprocess

multiprocessing.SimpleQueue

  • multiprocessing.SimpleQueue 主要用于多进程编程中,允许多个进程之间安全地传递数据。它是一种轻量级的队列实现,适用于简单的生产者-消费者模型,其中一个或多个进程生产数据,而其他进程消费数据。
import multiprocessing

def producer(queue, data):
    for item in data:
        queue.put(item)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumer got: {item}")

if __name__ == "__main__":
    data_to_process = [1, 2, 3, 4, 5]

    # 创建一个 SimpleQueue
    queue = multiprocessing.SimpleQueue()

    # 创建生产者和消费者进程
    producer_process = multiprocessing.Process(target=producer, args=(queue, data_to_process))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue))

    producer_process.start()
    consumer_process.start()

    producer_process.join()
    # 发送一个终止信号给消费者
    queue.put(None)
    consumer_process.join()
  • 缺点:
    • 功能有限:相对于其他队列实现(如 multiprocessing.Queue),multiprocessing.SimpleQueue 的功能有限。它不支持进程等待或超时,不允许设置队列的最大大小等高级特性。
    • 仅适用于简单场景:由于其功能有限,multiprocessing.SimpleQueue 适用于相对简单的生产者-消费者场景,对于复杂的需求可能不够灵活。
    • 不支持阻塞操作:它没有提供阻塞式的 get 操作,因此在没有数据的情况下,调用 get 会立即返回。

multiprocessing.JoinableQueue

multiprocess.JoinableQueue 是 Python multiprocessing 模块中的一个队列类型,它的主要作用是在多进程编程中实现进程之间的通信和同步。与普通队列不同,JoinableQueue 具有一些额外的特性,最重要的是可以跟踪队列中的任务并知道何时队列为空,以及何时可以安全地终止进程。

以下是 multiprocess.JoinableQueue 的主要作用和示例:

  1. 进程间通信JoinableQueue 允许多个进程之间安全地传递数据。一个进程可以将数据放入队列,而另一个进程可以从队列中获取数据,从而实现进程之间的通信。
from multiprocessing import Process, JoinableQueue

def worker(q):
   data = q.get()
   print(f"Worker got data: {data}")

if __name__ == "__main__":
   queue = JoinableQueue()
   p = Process(target=worker, args=(queue,))
   p.start()
   queue.put("Hello from main process")
   queue.join()
   p.join()

在上面的示例中,主进程向队列中放入了数据,并调用 queue.join() 来等待队列中的任务全部完成,以确保工作进程执行完毕。

  1. 跟踪任务完成JoinableQueue 允许主进程跟踪子进程完成的任务数量。当子进程完成任务时,它会通过调用 queue.task_done() 来通知队列,从而使主进程知道何时所有任务都已完成。
from multiprocessing import Process, JoinableQueue

def worker(q):
   data = q.get()
   print(f"Worker got data: {data}")
   q.task_done()

if __name__ == "__main__":
   queue = JoinableQueue()
   p = Process(target=worker, args=(queue,))
   p.start()
   queue.put("Hello from main process")
   queue.join()
   p.join()

在这个示例中,工作进程在完成任务后调用了 q.task_done(),主进程通过 q.join() 来等待任务完成。

总之,JoinableQueue 是多进程编程中用于进程间通信和同步的一种重要工具,它允许主进程与子进程之间安全地传递数据和等待任务完成,从而更好地管理和控制多进程应用程序。

自定义 multi parallel queue

  • 解决缓冲区大小限制问题

    • 在多进程编程中,使用队列来在进程之间传递数据非常常见。但是,某些队列实现(例如 multiprocessing.SimpleQueue)使用系统管道(pipe),而这些管道的缓冲区大小是有限的,通常很小。这就意味着当队列中的数据达到管道缓冲区的上限时,进程可能会阻塞等待队列中的空间。
    • ParallelQueue 的主要目的是通过使用多个队列,以轮询的方式从中选择一个队列来解决这个问题。每个队列都有自己的缓冲区,因此总体上可以容纳更多的数据。这可以帮助避免进程之间的阻塞,提高并行性能。
  • 轮询多个队列

    • ParallelQueue 维护一个队列列表 _queues,并在每次调用 putget 方法时,根据轮询的方式选择一个队列来执行操作。这确保了每个队列都有机会被使用,从而均衡了数据分布。
  • 提高管道缓冲区大小

    • 在初始化过程中,ParallelQueue 使用 fcntl 模块来增加每个队列的管道缓冲区大小,将其设置为最大的管道大小(PIPE_MAX_SIZE)。这可以帮助提高每个队列的容量,从而减少阻塞的可能性。
  • 可关闭

    • ParallelQueue 提供了 close() 方法,可以用于关闭所有包含的队列。
import fcntl

F_SETPIPE_SZ = 1031
F_GETPIPE_SZ = 1032
PIPE_MAX_SIZE = 1048576


class ParallelQueue(object):
    '''
    ParallelQueue is a wrapper around multiple queues which are scheduled in
    round robin fashion. This is done to overcome the issue of buffer size limit
    per queue. For example, mp.SimpleQueue uses system pipe and it's not straighforward 
    to increase the Pipe's buffer size and also the max size limit. 
    Therefore, using multiple parallel queues might overcome the issue of size limit.
    
    '''

    def __init__(self, QueueType, count=1):
        self._cur_queue_idx = 0
        self._total_queues = count
        self._queues = [QueueType() for _ in range(count)]

        self._init_queues()

    def _init_queues(self):
        ''' 
        We increase the default pipe  size to max pipe size
        '''
        for queue in self._queues:
            r_fd = queue._reader.fileno()
            w_fd = queue._writer.fileno()
            fcntl.fcntl(r_fd,  F_SETPIPE_SZ, PIPE_MAX_SIZE)
            fcntl.fcntl(w_fd,  F_SETPIPE_SZ, PIPE_MAX_SIZE)

    def close(self):
        [queue.close() for queue in self._queues]

    def put(self, msg): # 每次都会选择当前队列的下一个
        idx = self._cur_queue_idx
        self._cur_queue_idx = (self._cur_queue_idx + 1) % self._total_queues
        return self._queues[idx].put(msg)

    def empty(self):
        idx = self._cur_queue_idx
        return self._queues[idx].empty()

    def get(self):
        idx = self._cur_queue_idx
        self._cur_queue_idx = (self._cur_queue_idx + 1) % self._total_queues
        return self._queues[idx].get()

multiprocess 中多种 queue 的选型问题

  1. multiprocessing.SimpleQueue

    • 功能简单SimpleQueue 是最简单的队列实现,它提供了基本的队列操作,包括 putget
    • 无法阻塞:它不支持阻塞式的 get 操作,因此在没有数据的情况下,调用 get 会立即返回。
    • 不支持超时:与阻塞操作相关,它也不支持设置超时等待。
    • 适用范围:适用于简单的生产者-消费者模型,对于复杂的场景功能有限。
  2. multiprocessing.Queue

    • 功能丰富Queue 提供了比 SimpleQueue 更丰富的功能,包括阻塞式的 get 操作,支持设置队列的最大大小,允许设置超时等待。
    • 阻塞操作:可以使用 get 方法进行阻塞等待,直到队列中有数据可用。
    • 最大大小:可以通过参数设置队列的最大大小,以控制队列中的元素数量。
    • 适用范围:适用于大多数多进程通信需求,功能相对完备。
  3. multiprocessing.JoinableQueue

    • 功能更强大JoinableQueueQueue 的一个子类,除了 Queue 的功能外,还具备了一些额外的特性。
    • 支持任务追踪JoinableQueue 允许跟踪进程的任务完成情况,通过 task_done 方法标记任务已完成,并通过 join 方法等待所有任务完成。
    • 适用范围:适用于需要追踪任务完成状态的场景,例如,当所有任务完成后,执行某个特定操作。

总之,选择合适的队列实现取决于你的多进程通信需求。如果只需要基本的队列操作,可以使用 SimpleQueue。如果需要更多功能,如阻塞操作、设置队列大小等,可以使用 Queue。如果需要追踪任务完成状态,可以使用 JoinableQueue。根据具体场景的要求,选择合适的队列可以提高多进程编程的效率和可维护性。

multiprocess.event 的作用和示例

import multiprocessing,time,random
def restaurant_handle(event): #餐厅的处理进程
    print("1、【餐厅】为食客安排座位,并在一旁等待食客点餐。。。")
    time.sleep(random.randint(1,3))
    event.set()#解除阻塞状态
    event.clear()#清除已有的状态
    event.wait()#等待食客后续处理

    print("3、【餐厅】厨师接到菜单,开始烹饪美食。。。")
    time.sleep(random.randint(1,3))
    event.set() #解除阻塞状态
    event.clear()  # 之前的状态清空
    event.wait()

    print("5、【餐厅】收银台算正在算账。。。")
    time.sleep(random.randint(1,3))
    event.set()  # 解除阻塞状态
    event.clear()  # 之前的状态清空
    event.wait()

    print("7、【餐厅】收银台收到钱。。。")
    time.sleep(random.randint(1,3))
    event.set()
    event.clear()
    event.wait()

def diners_hangle(event):#食客的处理进程
    event.wait() #等待之前的第一步完成  两个进程所以先阻塞,让另一个执行

    print("2、【食客】食客看完菜单,选好了自己心仪的美食。。。")
    time.sleep(random.randint(1,3))
    event.set() #解除阻塞状态
    event.clear()#之前的状态清空
    event.wait()#继续等待后续的处理步骤

    print("4、【食客】享用丰盛的美食。。。")
    time.sleep(random.randint(1,3))
    event.set()
    event.clear()
    event.wait()

    print("6、【食客】食客吃晚餐走向收银台付款。。。")
    time.sleep(random.randint(1,3))
    event.set()
    event.clear()
    event.wait()

    print("8、【食客】食客离开")
    event.set()

def main():
    event = multiprocessing.Event()#定义一个event同步处理
    restaurant_process = multiprocessing.Process(target=restaurant_handle,args=(event,),name="餐厅服务进程")
    diners_process = multiprocessing.Process(target=diners_hangle,args=(event,),name="食客进程")
    restaurant_process.start()
    diners_process.start()
if __name__ == '__main__':
    main()

Thread

RPC 专题

gRPC

  • 为 gRPC 注册服务、函数和处理方式
    1. grpc.stream_stream_rpc_method_handler
      • 用于定义双向流式 RPC 方法的处理器。
      • 双向流式 RPC 是一种 RPC 类型,它允许客户端和服务器之间的双向通信,可以发送和接收多个消息。
      • 在 gRPC 中,这种方法的处理器需要指定请求消息的反序列化器(request_deserializer)和响应消息的序列化器(response_serializer)。
    2. grpc.unary_stream_rpc_method_handler
      • 用于定义客户端流式 RPC 方法的处理器。
      • 客户端流式 RPC 允许客户端向服务器流式发送多个消息,但服务器只能返回单个响应消息。
      • 这种方法的处理器需要指定请求消息的反序列化器(request_deserializer)和响应消息的序列化器(response_serializer)。
    3. grpc.unary_unary_rpc_method_handler
      • 用于定义一元 RPC 方法的处理器。
      • 一元 RPC 是最简单的 RPC 类型,客户端发送一个请求消息,服务器处理后返回一个响应消息。
      • 这种方法的处理器需要指定请求消息的反序列化器(request_deserializer)和响应消息的序列化器(response_serializer)。
def add_ModelServingServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'predict': grpc.stream_stream_rpc_method_handler(
                    servicer.predict,
                    request_deserializer=predict__pb2.PredictRequest.FromString,
                    response_serializer=predict__pb2.PredictResponse.SerializeToString,
            ),
            'predict_ack': grpc.unary_stream_rpc_method_handler(
                    servicer.predict_ack,
                    request_deserializer=predict__pb2.ClientToken.FromString,
                    response_serializer=predict__pb2.PredictRequestAck.SerializeToString,
            ),
            'register': grpc.unary_unary_rpc_method_handler(
                    servicer.register,
                    request_deserializer=predict__pb2.RegisterRequest.FromString,
                    response_serializer=predict__pb2.RegisterResponse.SerializeToString,
            ),
            'unregister': grpc.unary_unary_rpc_method_handler(
                    servicer.unregister,
                    request_deserializer=predict__pb2.ClientToken.FromString,
                    response_serializer=predict__pb2.Empty.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'model_serving.ModelServing', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))

argparse

–store_true

打印所有的传入参数

-

Numpy

初始化一个特定的 Tensor


Argwhere

  • 返回:非 0 的数组元素的索引,其中 a 是待索引数组的条件
import numpy as np
x = np.arange(6).reshape(2,3)
print(x)
# x=[[0 1 2]
# [3 4 5]]
 
y = np.argwhere(x>1) #x所有元素中大于1的元素的索引,分别是第0行第2列...以此类推
print(y)
# y=[[0 2]
# [1 0]
# [1 1]
# [1 2]]

Where

  • 满足条件(condition),输出 x,不满足输出 y。
>>> aa = np.arange(10)
>>> np.where(aa,1,-1)
array([-1,  1,  1,  1,  1,  1,  1,  1,  1,  1])  # 0False,所以第一个输出-1
>>> np.where(aa > 5,1,-1)
array([-1, -1, -1, -1, -1, -1,  1,  1,  1,  1])

>>> np.where([[True,False], [True,True]],    # 官网上的例子
             [[1,2], [3,4]],
             [[9,8], [7,6]])
array([[1, 8],
       [3, 4]])

求和

拼接 Tensor

对应 Item 操作

  • 就是我们简单的单元操作符: + - * /

数组排序与翻转数组

  • 排序后得到 index 的数组:
#!/usr/bin/python3
#code-python(3.6)
import numpy as np
a = np.array([4,2,5,7])
b = a.argsort() #将数组升序排列,但不改变数组,且返回对应的索引
print(a)    #[4 2 5 7],其索引是[0,1,2,3]
print(b)    #升序后的索引是[1 0 2 3],对应元素[2,4,5,7]
#!/usr/bin/python3
#code-python(3.6)
import numpy as np
a = np.array([4,2,5,7])
b = a.argsort()[::-1] #将数组降序排列,但不改变数组,且返回对应的索引
print(a)    #[4 2 5 7],其索引是[0,1,2,3]
print(b)    #降序后的索引是[3 2 0 1],对应元素[7,5,4,2]
  • 翻转数组:
[::-1]
data = data[data[:,2].argsort()] <em>#按照第3列对行排序</em>
  • 按行排序
data = data[:,data[2].argsort()]
  • 获取数组的排序 index

计数操作

>>> import numpy as np
>>> y = np.array([1, 2, 2, 2, 2, 0, 2, 3, 3, 3, 0, 0, 2, 2, 0])

>>> np.count_nonzero(y == 1)
1
>>> np.count_nonzero(y == 2)
7
>>> np.count_nonzero(y == 3)
3

升高维度和降低维度

-

增加一列

保存/读取 csv 文件

提取子矩阵

x=np.arange(16).reshape((4,4))
x[range(1,3), :][:,range(1,3)]

获取唯一的元素

Pandas 一些语法

获取子表格

# 训练集
train_df = UIT_header[UIT_header['day'] <= 18]
print(len(train_df))

val_df = UIT_header[(UIT_header['day'] > 18) & (UIT_header['day'] <= 24)]
print(len(val_df))

test_df = UIT_header[UIT_header['day'] > 24]
print(len(test_df))
  • 根据一堆 index 获取子表格的方法:每次遍历得到的 c 都是一个混乱的 index 组合
for ind, c in enumerate(client_idcs):
        sub_labels = labels[c]
        sub_test_df = test_df.iloc[c, :]
        all_split_df.append(sub_test_df)

行遍历与行遍历赋值

for idx,data in df.iterrows():
    print("[{}]: {}".format(idx,data))
  • 行遍历赋值
import pandas as pd

# 创建一个示例 DataFrame
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})

# 定义一个函数,在每行 DataFrame 中添加一个新的 'C' 字段
def add_new_column(row):
    row['C'] = row['A'] + row['B']
    return row

# 使用 apply 方法对每行 DataFrame 调用 add_new_column 函数
df = df.apply(add_new_column, axis=1)

# 输出添加新字段后的 DataFrame
print(df)

nan 元素处理

pd.isnull("A") 
> False 

pd.isnull(3) 
> False 

pd.isnull(np.nan) 
> True 

pd.isnull(None)
> True
#fill all Nan value with zero
df = df.fillna(0)

df = df.fillna(method='ffill', axis=0) # 用上一行的值替换空值
df = df.fillna(method='ffill', axis=1) # 用上一列的值替换空值

拼表操作(join/merge)

  • 参考文献:https://blog.csdn.net/brucewong0516/article/details/82707492
  • 主要涉及几个操作

    • on:拼接列
    • Suffixes:若出现相同的列时,如何重新命名
    • How:如何拼接的问题
      • Left: 只保留左表的完整数据
      • inner:取交集:比如 left:[‘A’,‘B’,‘C’];right[’’A,‘C’,‘D’];inner 取交集的话,left 中出现的 A 会和 right 中出现的买一个 A 进行匹配拼接,如果没有是 B,在 right 中没有匹配到,则会丢失。
      • outer:取并集。比如 left:[‘A’,‘B’,‘C’];right[’’A,‘C’,‘D’];’outer’取并集,出现的 A 会进行一一匹配,没有同时出现的会将缺失的部分添加缺失值。
# bchain_df大,gold_df小,则会按大表来,没有的内容填充空值
all_df = bchain_df.merge(gold_df, how='outer')

loc 操作 / iloc 操作

dfa.loc[dfa.start_time==0, 'start_time'] = np.nan # 取'start_time'中为0的项目,赋值为np.nan
stock.loc[0:5,['股票代码','股票名称','当前价','涨跌额','涨跌幅','年初至今','成交量','成交额']]
  • iloc 参考文献:https://zhuanlan.zhihu.com/p/129898162
    • 根据行标签的位置索引。iloc 就是 integer loc 。可以理解为是在 loc 的基础上,用整数索引。即 iloc 是基于位置索引的,也就是我们常说的几行几列。
stock.iloc[0:6,0:8]

保存文件

保存为 CSV 文件[效率很慢]

df.to_csv("tzzs_data2.csv", index_label="index_label")
  • 直接将 numpy 保存为 csv
for index in trange(len(UIT_header)):
    i_size[UIT_header.loc[index, 'i']] = UIT_header.loc[index, 'video_size']
print("get content size finished!")
columns = ['content_size']
i_size_pd = pd.DataFrame(columns=columns, data=i_size)
i_size_pd.to_csv('./dataset/i_size_pd.csv',encoding='utf-8')
  • 不保存那个新的 Unnamed: 0

直接保存为 npy 文件

保存为特殊的二进制格式(pkl) [效率很快]

先随便用 numpy 生成一个二位数组 a = np.random.rand(500000, 50),查看大小约 200MB print( a._sizeof_()),执行时间 236ms(内存里当然快) 转为一个 df df = pd.DataFrame(a)

  • test1: 直接用 df.to_csv('test1.csv'),保存时间 35.7 秒,文件大小 474MB
  • test2: 用 df.to_pickle('test2.pkl'),保存时间 402ms,几乎是瞬间完成,文件大小 195MB
  • test3: 用 df.to_hdf(‘test3.h5’, ‘table’),用时 1.72 秒,文件大小 199MB

增加操作

data['d'] = 0
  • 按条件赋值的列
df['x1'] = df.apply(lambda x: x.amount if x.name != "" else 0, axis=1)

删除操作

df_clear = df.drop(df[df['x']<0.01].index)
<em># 也可以使用多个条件
</em>df_clear = df.drop(df[(df['x']<0.01) | (df['x']>10)].index) <em>#删除x小于0.01或大于10的行</em>
  • 删除特定列
small_df = df.drop(columns=['Unnamed: 0', 'index', 'job_name', 'task_name', 
                'group', 'start_time_j', 'end_time_j', 'start_time_i', 
                'end_time_i', 'run_time_i', 'duration_min', 'wait_time',
                'start_date'])

统计 value 操作

df['收入'].value_counts(ascending=True) # 统计整一列中的每个特定值的数量
df['收入'].counts("123") # 计算一列中特定值的数量

向 DataFrame 中增加数据

整列操作

  • 【风险:可能会引发链式操作,导致一些奇怪的 bug 的产生】map 操作:传入一个 lambda 函数进行执行即可:
small_df['plan_gpu'] = small_df['plan_gpu'].map(lambda x: x / 100.0)
  • 修改方案:直接使用 loc 进行更改比较好!
sub_test_df.loc[sub_test_df['label'] == 1, 'label'] = 0
sub_test_df.loc[sub_test_df['label'] == 2, 'label'] = 1

Groupby 操作

list_drop_items = small_df['job_id'].drop_duplicates().values.tolist()
job_map = {}
idx = 0
for item in list_drop_items:
    job_map[item] = idx
    idx += 1
list_drop_items = []
copy_small_df = small_df.copy()
small_df['job_id'] = small_df['job_id'].map(lambda x: job_map[x])
print(small_df)
job_map = {}

修改列顺序


表头信息与表头重命名

  • 获取表头的方法:
df.columns # 返回表头的信息
df.rename(columns={'a': 'beijing_count'})
  • 读取文件后给表头赋值

排序操作

df.sort_value(by=["???"])

采样操作

privacy_origin_df.sample(n=100000, replace=False)

链式操作的 bug 解决 [SettingwithCopy 解决]

TensorFlow 一些语法

TensorFlow 的版本依赖关系

静态图获取变量

Session

  • 参考文档:https://zhuanlan.zhihu.com/p/32869210
  • session 是客户端与整个 TensorFlow 系统交互的接口

    • session 的创建:session 拥有和管理物理资源 CPUGPU、网络连接的功能,它最典型的使用方式是作为上下文管理器使用
#  创建本地 session
with tf.Session() as sess:
  # ...

# 创建远程 session
with tf.Session("grpc://example.org:2222"):
  # ...
  • session 的参数:可以见下一节 配置 Config
  • session 的运行:tf.Session.run 是运行 OP 和获取 tensor 的值的主要方式,可以一次性传入多个 OP 和 tensor 给它,然后 TensorFlow 会自动执行所有需要的 OP 来得到结果
x = tf.constant([[37.0, -23.0], [1.0, 4.0]])
w = tf.Variable(tf.random_uniform([2, 2]))
y = tf.matmul(x, w)
output = tf.nn.softmax(y)
init_op = w.initializer

with tf.Session() as sess:
  # Run the initializer on `w`.
  sess.run(init_op)

  # 虽然只计算图了 output,但是所有相关的x,w,y 都会被执行,最后返回 NumPy 数组
  print(sess.run(output))

  # 这里既执行了 y,又执行了output,可能有人会问,y 是不是会被执行两次,实际并不是,
  # y 只会执行一次,然后作为 output 的输入
  y_val, output_val = sess.run([y, output])

配置 Config

  • 参考文档:https://zhuanlan.zhihu.com/p/32869210
  • tf.ConfigProto(): session 运行配置

    • allow_soft_placement 如果指定为 TRUE,那么 session 就会自动把不适合在 GPU 上运行的 OP 全部放到 CPU 上运行。
    • gpu_options.allow_growth 设置会使得程序在开始时候逐步的增长 GPU 显存使用量,而不是一开始就最大化的使用所有显存。

初始化

全局变量初始化

  • tf.global_variables_initializer()
    • 添加节点用于初始化全局变量(GraphKeys.GLOBAL_VARIABLES)。返回一个初始化所有全局变量的操作(Op)。在你构建完整个模型并在会话中加载模型后,运行这个节点。
    • 能够将所有的变量一步到位的初始化,非常的方便。通过 feed_dict, 你也可以将指定的列表传递给它,只初始化列表中的变量。
sess.run(tf.global_variables_initializer(), 
    feed_dict= {
        learning_rate_dis: learning_rate_val_dis,
        adam_beta1_d_tf: adam_beta1_d,
        learning_rate_proj: learning_rate_val_proj,
        lambda_ratio_tf: lambda_ratio,
        lambda_l2_tf: lambda_l2,
        lambda_latent_tf: lambda_latent,
        lambda_img_tf: lambda_img,
        lambda_de_tf: lambda_de,
        adam_beta1_g_tf: adam_beta1_g,
    }
) 
# learning_rate_dis为设置的变量,learning_rate_val_dis为我设置的具体的值。后续同理

Summary 和 Tensorboard

  • 参考文档:https://zhuanlan.zhihu.com/p/102776848
  • tf.summary()提供了各类方法(支持各种多种格式)用于保存训练过程中产生的数据(比如 loss_value、accuracy、整个 variable),这些数据以日志文件的形式保存到指定的文件夹中。
tensorboard --logdir=./ # 默认localhost和6006
tensorboard --logdir=./ --host 服务器ip --port 23445

Saver

模型保存,先要创建一个 Saver 对象:

saver=tf.train.Saver()

保存操作

saver.save(sess,'ckpt/mnist.ckpt',global_step=step)

在创建这个 Saver 对象的时候,有一个参数我们经常会用到,就是 max_to_keep 参数,这个是用来设置保存模型的个数,默认为 5,即 max_to_keep=5,保存最近的 5 个模型。如果你想每训练一代(epoch)就想保存一次模型,则可以将 max_to_keep 设置为 None 或者 0。

恢复操作

model_file=tf.train.latest_checkpoint('ckpt/')
saver.restore(sess,model_file)

Multinomial 操作

tf.GradientTape

with tf.GradientTape() as tape:
    predictions = model(images)
    loss = loss_object(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)

tf.losses 函数

sparse_softmax_cross_entropy

Coordinator

###

Torch 一些语法

Torch sparse 安装失败原因

torch 实现图神经网络

  • 参考文献:https://zhuanlan.zhihu.com/p/94491664
  • PyG(PyTorch Geometric Library)

    • 基于 PyTorch 的用于处理不规则数据(比如图)的库,或者说是一个用于在图等数据上快速实现表征学习的框架。它的运行速度很快,训练模型速度可以达到 DGL(Deep Graph Library )v0.2 的 40 倍(数据来自论文)。
    • 集成了很多论文中提出的方法(GCN,SGC,GAT,SAGE 等等)和常用数据集。
  • Data 数据结构

    • 节点的属性/特征
x = torch.tensor([[2,1],[5,6],[3,7],[12,0]], dtype=torch.float) # 特征
y = torch.tensor([0,1,0,1], dtype=torch.float) # label
  • 邻接/边连接信息:图的节点连接信息要以 COO 格式进行存储。在 COO 格式中,COO list 是一个 2*E 维的 list。第一个维度的节点是源节点(source nodes),第二个维度中是目标节点(target nodes),连接方式是由源节点指向目标节点。对于无向图来说,存贮的 source nodes 和 target node 是成对存在的。
edge_index = torch.tensor([[0,1,2,0,3],
                          [1,0,1,3,2]],dtype=torch,long)

model.train()和 model.eval()

  • 参考文献:https://blog.csdn.net/qq_38410428/article/details/101102075
    • 如果模型中有 BN 层(Batch Normalization)和 Dropout,需要在训练时添加 model.train()。model.train()是保证 BN 层能够用到每一批数据的均值和方差。对于 Dropout,model.train()是随机取一部分网络连接来训练更新参数。
    • 如果模型中有 BN 层(Batch Normalization)和 Dropout,在测试时添加 model.eval()。model.eval()是保证 BN 层能够用全部训练数据的均值和方差,即测试过程中要保证 BN 层的均值和方差不变。对于 Dropout,model.eval()是利用到了所有网络连接,即不进行随机舍弃神经元。
    • 训练完 train 样本后,生成的模型 model 要用来测试样本。在 model(test)之前,需要加上 model.eval(),否则的话,有输入数据,即使不训练,它也会改变权值。这是 model 中含有 BN 层和 Dropout 所带来的的性质。
    • 如果不在意显存大小和计算时间的话,仅仅使用 model.eval() 已足够得到正确的 validation/test 的结果;而 with torch.no_grad() 则是更进一步加速和节省 gpu 空间(因为不用计算和存储梯度),从而可以更快计算,也可以跑更大的 batch 来测试。

学习率 lr_scheduler

保存模型与加载模型

# 加载模型
seq2seq_model.load_state_dict(torch.load(cfg.lstm_model_path))

Categorical

  • 参考文献:https://www.cnblogs.com/pprp/p/14285062.html
    • distributions 包主要是实现了参数化的概率分布和采样函数。参数化是为了让模型能够具有反向传播的能力,这样才可以用随机梯度下降的方法来进行优化。

Detach 函数

  • 参考文献:https://zhuanlan.zhihu.com/p/410199046
  • 作用:detach()就是返回一个新的 tensor,并且这个 tensor 是从当前的计算图中分离出来的。但是返回的 tensor 和原来的 tensor 是共享内存空间的。
import torch

a = torch.tensor([1.0, 2.0, 3.0], requires_grad = True)
a = a.detach() # 会将requires_grad 属性设置为False
print(a.requires_grad)

问题修复: 同时 backward 两个网络

  • 参考文献:https://blog.csdn.net/weixin_44058333/article/details/99701876
  • retain_graph 这个参数在平常中我们是用不到的,但是在特殊的情况下我们会用到它:

    • 一个网络有两个 output 分别执行 backward 进行回传的时候: output1.backward(), output2.backward().
    • 一个网络有两个 loss 需要分别执行 backward 进行回传的时候: loss1.backward(), loss1.backward().
#两个网络的情况需要分别为两个网络分别定义optimizer
optimizer1= torch.optim.SGD(net1.parameters(), learning_rate, momentum,weight_decay)
optimizer2= torch.optim.SGD(net2.parameters(), learning_rate, momentum,weight_decay)

#train 部分的loss回传处理
loss1 = loss()
loss2 = loss()

optimizer1.zero_grad() #set the grade to zero
loss1.backward(retain_graph=True) #保留backward后的中间参数。
optimizer1.step()

optimizer2.zero_grad() #set the grade to zero
loss2.backward() 
optimizer2.step()

ge(num)函数

chunk()函数

  • 参考文献:https://cloud.tencent.com/developer/article/1794326
  • 函数会将输入张量(input)沿着指定维度(dim)均匀的分割成特定数量的张量块(chunks),并返回元素为张量块的元组tuple。torch.chunk 函数有三个参数:

    • input(Tensor)- 待分割的输入张量
    • chunks(int)- 均匀分割张量块的数量
    • dim(int)- 进行分割的维度
import torch
A = torch.randint(0, 255, (128, 3, 32, 32))

result = torch.chunk(input=A,
                     chunks=14,
                     dim=0)

print(len(result))
# 13
print(result[0].size())
# torch.Size([10, 3, 32, 32])
print(result[-1].size())
# torch.Size([8, 3, 32, 32])
  • 注意,最后如果要重新变成 Tensor,可以执行这个函数
B = torch.randn((1185024, 2))
res = torch.stack(B.chunk(64,dim=0))
print(res.shape) # torch.Size([64, 18516, 2])

masked_select

  • 参考文献:https://zhuanlan.zhihu.com/p/348035584
  • torch.masked_select(input, mask, out=None) 函数返回一个根据布尔掩码 (boolean mask) 索引输入张量的 1D 张量,其中布尔掩码和输入张量就是 torch.masked_select(input, mask, out = None) 函数的两个关键参数,函数的参数有:

    • input(Tensor) - 需要进行索引操作的输入张量;
    • mask(BoolTensor) - 要进行索引的布尔掩码;
    • out(Tensor, optional) - 指定输出的张量。比如执行 torch.zeros([2, 2], out = tensor_a),相当于执行 tensor_a = torch.zeros([2, 2]);
import torch

x = torch.randn([3, 4])
print(x)
# 将x中的每一个元素与0.5进行比较
# 当元素大于等于0.5返回True,否则返回False
mask = x.ge(0.5)
print(mask)
print(torch.masked_select(x, mask))

'''
tensor([[ 1.2001,  1.2968, -0.6657, -0.6907],
        [-2.0099,  0.6249, -0.5382,  1.4458],
        [ 0.0684,  0.4118,  0.1011, -0.5684]])
tensor([[ True,  True, False, False],
        [False,  True, False,  True],
        [False, False, False, False]])
tensor([1.2001, 1.2968, 0.6249, 1.4458])
'''

损失函数 Loss Function

Squeeze 和 Unsqueeze 函数

  • 参考文献: https://blog.csdn.net/xiexu911/article/details/80820028
  • torch.squeeze(): 这个函数主要对数据的维度进行压缩,去掉维数为 1 的的维度,比如是一行或者一列这种,一个一行三列(1,3)的数去掉第一个维数为一的维度之后就变成(3)行。

    • a.squeeze(N) 就是在 a 中指定位置 N 加上一个维数为 1 的维度。还有一种形式就是 b=torch.squeeze(a,N) a 就是在 a 中指定位置 N 加上一个维数为 1 的维度。
  • torch.unsqueeze(): 这个函数主要是对数据维度进行扩充。给指定位置加上维数为一的维度,比如原本有个三行的数据(3),在 0 的位置加了一维就变成一行三列(1,3)。

拼接 Tensor

torch.stack(sequence, dim=0)

判断 Tensor/Model 是否在外部设备上

import torch
import torchvision.models as models
model=models.vgg11(pretrained=False)
print(next(model.parameters()).is_cuda) #False

data=torch.ones((5,10))
print(data.device) #cpu

只打印数值,不打印设备信息

print('batch_loss: {:.3f} batch acc: {:.3f}'.format(loss.data, acc.data))

梯度传播因为更改的时候 RuntimeError

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_dim, action_dim),
            nn.Tanh(),
        ) 

    def forward(self, state):
        raw_action = self.actor(state)
        raw_action[:, 0:3] = F.softmax(raw_action[:, 0:3], dim=1) # 会报错!!!
        return raw_action

gather()

获得向量中特定值所在的 index

DataLoader 和 Subset 和自定义 Dataset

import torch 
import numpy as np 
from torch.utils.data import TensorDataset, DataLoader

my_x = [np.array([[1.0,2],[3,4]]),np.array([[5.,6],[7,8]])] # a list of numpy arrays 
my_y = [np.array([4.]), np.array([2.])] # another list of numpy arrays (targets) 
tensor_x = torch.Tensor(my_x) # transform to torch tensor 
tensor_y = torch.Tensor(my_y) 
my_dataset = TensorDataset(tensor_x,tensor_y) # create your datset 
my_dataloader = DataLoader(my_dataset) # create your dataloader

cuda 操作的异步性和手动同步方案

在执行 torch.matmul(x, y) 这样的 CUDA 操作后需要进行同步的原因是为了确保操作的完成。CUDA 操作通常是异步的,这意味着当你调用一个 CUDA 操作时,它可能会立即返回并开始在 GPU 上执行,而不会等待操作的实际完成。这是为了充分利用 GPU 的并行性能。

但在某些情况下,你可能需要确保 CUDA 操作已经完成,才能继续执行后续的操作,例如:

  1. 测量操作执行时间:如果你想测量某个 CUDA 操作的执行时间,那么在执行操作后立即同步可以确保计时的准确性。如果不同步,计时可能会在操作实际完成之前结束。
  2. 操作顺序的依赖性:如果你的后续操作依赖于前面的 CUDA 操作的结果,那么同步可以确保前面的操作已经完成,从而避免数据依赖性错误。
  3. 结果一致性:在某些情况下,你可能需要确保 CUDA 操作的结果在继续执行后续操作之前已经准备好,以避免数据不一致性。

因此,同步 CUDA 设备是一种控制操作执行顺序和确保操作已经完成的方式。你可以使用 torch.cuda.synchronize() 函数来实现同步,确保在继续执行后续操作之前等待 CUDA 操作的完成。这对于需要精确控制和管理 CUDA 操作的应用非常重要。

import torch

# 创建两个张量并将它们移到 CUDA 设备上
x = torch.randn(1000, 1000).cuda()
y = torch.randn(1000, 1000).cuda()

# 执行一些 CUDA 操作
result = torch.matmul(x, y)

# 同步 CUDA 设备
torch.cuda.synchronize()

# 执行其他操作
z = torch.randn(1000, 1000).cuda()

torch.cuda.Stream()

  • Steam 是 CUDA 命令线性执行的抽象形式,分配给设备的 CUDA 命令按照入队序列的顺序执行。每个设备都有一个默认的 Steam,也可以通过 torch.cuda.Stream()创建新的 Stream。如果不同 Stream 中的命令交互执行,那么就不能保证命令绝对按顺序执行。下面的这个例子不同的 Stream 就可能会产生错误。
cuda = torch.device("cuda")
# 创建默认的stream,A就是使用的默认stream
s = torch.cuda.Stream()
A = torch.randn((1,10), device=cuda)
for i in range(100):
    # 在新的stream上对默认的stream上创建的tensor进行求和
    with torch.cuda.stream(s):
        # 存在的问题是:torch.sum()可能会在torch.randn()之前执行
        B = torch.sum(A)
        print(B)
  • 这个例子存在的问题是 torch.sum()可能会在 torch.randn()之前就执行。为了保证 Stream 中的命令绝对按顺序执行,接下来使用 Synchronize 同步方法解决上面例子的问题:
cuda = torch.device("cuda")
s = torch.cuda.Stream()
A = torch.randn((1,10), device=cuda)
default_stream = torch.cuda.current_stream()
print("Default Stream: {}".format(default_stream))
# 等待创建A的stream执行完毕
torch.cuda.Stream.synchronize(default_stream)
for i in range(100):
    # 在新的stream上对默认的stream上创建的tensor进行求和
    with torch.cuda.stream(s):
        print("current stream: {}".format(torch.cuda.current_stream()))
        B = torch.sum(A)
        print(B)

固定缓冲区与 pin_memory

  • 缓存就是当计算机内存不足的时候,就会把内存中的数据存储到硬盘上。固定缓冲区就是说常驻内存,不能把这部分数据缓存到硬盘上。可以直接使用 pin_memory 方法或在 Tensor 上直接调用 pin_memory 方法将 Tensor 复制到固定缓冲区。为什么要做固定缓冲区呢?目的只有一个,就是把 CPU 上的固定缓冲区拷贝到 GPU 上时速度快。Tensor 上的 is_pinned 方法可以查看该 Tensor 是否加载到固定缓冲区中。
from torch.utils.data._utils.pin_memory import pin_memory
x = torch.Tensor([[1,2,4], [5, 7, 9], [3, 7, 10]])
# 通过pin_memory()方法将x复制到固定缓冲区
y = pin_memory(x)
# 在tensor上直接调用pin_memory()方法将tensor复制到固定缓冲区
z = x.pin_memory()
# id()方法返回tensor的内存地址,pin_memory()返回tensor对象的拷贝,因此内存地址是不同的
print("id: {}".format(id(x)))
print("id: {}".format(id(y)))
print("id: {}".format(id(z)))
# 当tensor放入固定缓冲区后,就可以异步将数据复制到gpu设备上了
a = z.cuda(non_blocking=True)
print(a)
print("is_pinned: {}/{}".format(x.is_pinned(), z.is_pinned()))

# id: 1605289350472
# id: 1605969660408
# id: 1605969660248
# tensor([[ 1.,  2.,  4.],
#         [ 5.,  7.,  9.],
#         [ 3.,  7., 10.]], device='cuda:0')
# is_pinned: False/True

Pytorch 原理 - 如何把数据从 CPU 传到 GPU 中(dispatcher, cuda, 调度)

cuda、cudnn、cuda toolkit 的区别

  • 参考文献:【精选】一文讲清楚 CUDA、CUDA toolkit、CUDNN、NVCC 关系_健 0000 的博客-CSDN 博客
  • 区分

    • CUDA:为“GPU 通用计算”构建的运算平台。
    • cudnn:为深度学习计算设计的软件库。
    • CUDA Toolkit (nvidia): CUDA 完整的工具安装包,其中提供了 Nvidia 驱动程序、开发 CUDA 程序相关的开发工具包等可供安装的选项。包括 CUDA 程序的编译器、IDE、调试器等,CUDA 程序所对应的各式库文件以及它们的头文件。
    • CUDA Toolkit (Pytorch): CUDA 不完整的工具安装包,其主要包含在使用 CUDA 相关的功能时所依赖的动态链接库。不会安装驱动程序。
    • NVCC 是 CUDA 的编译器,只是 CUDA Toolkit 中的一部分
  • 注:CUDA Toolkit 完整和不完整的区别:在安装了 CUDA Toolkit (Pytorch)后,只要系统上存在与当前的 cudatoolkit 所兼容的 Nvidia 驱动,则已经编译好的 CUDA 相关的程序就可以直接运行,不需要重新进行编译过程。如需要为 Pytorch 框架添加 CUDA 相关的拓展时(Custom C++ and CUDA Extensions),需要对编写的 CUDA 相关的程序进行编译等操作,则需安装完整的 Nvidia 官方提供的 CUDA Toolkit。
  • pytorch 运行时的 CUDA 版本
# Pytorch 实际使用的运行时的 cuda 目录
import torch.utils.cpp_extension
torch.utils.cpp_extension.CUDA_HOME
# '/usr/local/cuda-11.6'
# 编译该 Pytorch release 版本时使用的 cuda 版本
import torch
torch.version.cuda 
# '10.2'
  • Pytorch 寻找 cuda 的流程
    • 环境变量 CUDA_HOME 或 CUDA_PATH
    • /usr/local/cuda
    • which nvcc 的上级上级目录(which nvcc 会在环境变量 PATH 中找)
    • 如果上述都不存在,则 torch.utils.cpp_extension.CUDA_HOME 为 None,会使用 conda 安装的 cudatoolkit,其路径为 cudart 库文件目录的上级目录(此时可能是通过 conda 安装的 cudatoolkit,一般直接用 conda install cudatoolkit,就是在这里搜索到 cuda 库的)。

进度条库 tqdm

import time
from tqdm import tqdm, trange

#trange(i)是tqdm(range(i))的一种简单写法
for i in trange(100):
    time.sleep(0.05)

for i in tqdm(range(100), desc='Processing'):
    time.sleep(0.05)

dic = ['a', 'b', 'c', 'd', 'e']
pbar = tqdm(dic)
for i in pbar:
    pbar.set_description('Processing '+i)
    time.sleep(0.2)
100%|██████████| 100/100 [00:06<00:00, 16.04it/s]
Processing: 100%|██████████| 100/100 [00:06<00:00, 16.05it/s]
Processing e: 100%|██████████| 5/5 [00:01<00:00,  4.69it/s]

Label 的 bug

opencv2 的一些实用

将视频中某时间段内的所有帧提取出来

import cv2

# 定义视频文件路径
video_path = "testdownload.mp4"

# 定义提取的时间范围(单位:秒)
start_time = 1 * 60 + 17  # 1分17秒
end_time = 1 * 60 + 19  # 1分19秒

# 打开视频文件
cap = cv2.VideoCapture(video_path)

# 获取视频的帧率
fps = cap.get(cv2.CAP_PROP_FPS)

# 计算要提取的帧的起始和结束帧索引
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)

# 设置当前帧为起始帧
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

# 循环读取视频帧并保存
frame_count = start_frame
while frame_count <= end_frame:
    # 读取视频帧
    ret, frame = cap.read()

    # 如果读取成功,则保存帧图像
    if ret:
        frame_file_path = f"frame_{frame_count}.jpg"
        cv2.imwrite(frame_file_path, frame)
        print(f"Saved frame {frame_count}")

    frame_count += 1

    # 如果到达结束帧,则跳出循环
    if frame_count > end_frame:
        break

# 关闭视频文件
cap.release()

超分辨率

强化学习

A2C 的一个很好的例子

loss 函数的问题

概率与采样

probs = self.policy_net(state)
m = Bernoulli(probs)
# 公式6的实现:本质上是个分类问题,负号是为了将梯度上升转为梯度下降
loss = -m.log_prob(action) * reward  # Negtive score function x reward

策略网络的训练问题

matplot 画图

log 坐标轴与刻度

import matplotlib.pyplot as plt

x = [5,10,15,20,25,30]
y = [1,2,3,4,5,6]

fig=plt.figure()
ax = fig.add_subplot(111)
ax.plot(x, y, marker='v', color='r', label='example')
ax.legend( loc=(1.36/5,0.05/5),ncol=3) #fontsize=10,loc是图例左下角所在的点的x和y值,ncol是图例分几列显示
ax.grid() # 网格

ax.set_xlabel('x', fontsize=13) # 坐标轴标签
ax.set_xlim(5,30) # x轴只显示5到30区间
ax.set_xticks(x) # x轴上的刻度显示,可以把x改成字符串列表

ax.set_ylabel('y', fontsize=13)
ax.set_ylim(0.25,52) # y 轴只显示0.25到52区间
ax.set_yscale("log",base=4,subs=[2,3]) # y 轴上以4为底数呈对数显示,2、3表示会标记出2倍、3倍的位置
# ax.set_yticks([0,2,8,32]) # 用了log就不能用这个
ax.set_yticklabels(['0','0.25','1', '4', '16']) # 使y轴上刻度显示为这几个数,其中第一个数必须是0
ax.set_zorder(0)

legend 相关问题

File 读写文件

mmap 内存映射文件

Mmap 的原理和底层 C 函数介绍

利用 mmap 写一个布隆过滤器

Logging

import logging

def get_logger(logging_file, enable_multiprocess, showing_stdout_level=logging.INFO):
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    
    file_ch = logging.FileHandler(logging_file)
    file_ch.setLevel(logging.DEBUG)
    
    if enable_multiprocess:
        file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] at [process_id: %(process)d] %(filename)s,%(lineno)d: %(message)s', 
                                            datefmt='%Y-%m-%d(%a)%H:%M:%S')
    else:
        file_formatter = logging.Formatter('%(asctime)s [%(levelname)s] at %(filename)s,%(lineno)d: %(message)s', 
                                            datefmt='%Y-%m-%d(%a)%H:%M:%S')
    file_ch.setFormatter(file_formatter)
    logger.addHandler(file_ch)

    #将大于或等于INFO级别的日志信息输出到StreamHandler(默认为标准错误)
    console = logging.StreamHandler()
    console.setLevel(showing_stdout_level) 
    formatter = logging.Formatter('[%(levelname)-8s] %(message)s')
    console.setFormatter(formatter)
    logger.addHandler(console)
    return logger

Profiler 专题

时间 Profiler

空间 Profiler