「学习笔记」Python进阶编程(上)

文章目录

1、类型注解

在基础篇中我们学习了Python的动态类型特性,类型注解 是Python 3.5+引入的特性,允许为变量、函数参数和返回值添加类型提示。

类型注解提供了以下好处:

  • 代码可读性:明确说明变量和函数的预期类型
  • IDE支持:提供更好的代码补全和错误提示
  • 类型检查:可以使用mypy等工具进行静态类型检查
  • 文档生成:自动生成API文档

1.1 基本类型

# ========== 基本类型注解 ==========
# 变量类型注解:变量名: 类型 = 值
name: str = "Alice"       # 字符串类型
age: int = 30             # 整数类型
is_student: bool = False  # 布尔类型
scores: list = [95, 88, 91]  # 列表类型

# 函数类型注解:参数: 类型, 返回值 -> 类型
def greet(name: str) -> str:
    """向用户打招呼"""
    return f"Hello, {name}"

# 注意:
# - 类型注解只是提示,Python解释器不会强制检查
# - 需要使用第三方工具(如mypy)进行类型检查
# - 类型注解可以提高代码质量,但不会影响运行时行为

1.2 typing模块

使用 typing 模块可以定义更复杂的类型。

# ========== typing模块常用类型 ==========
from typing import List, Dict, Tuple, Set, Optional, Union

# ========== 泛型容器类型 ==========
numbers: List[int] = [1, 2, 3, 4, 5]              # 整数列表
student_scores: Dict[str, int] = {"Alice": 95, "Bob": 88}  # 字符串到整数的字典
person_info: Tuple[int, str, bool] = (25, "Alice", True)  # 元组(整数、字符串、布尔)
unique_names: Set[str] = {"Alice", "Bob", "Charlie"}       # 字符串集合

# ========== 特殊类型 ==========
# Optional[T]: 表示值可以是 T 类型或 None
name: Optional[str] = "Alice"  # name 可以是 str 或 None

# Union[T1, T2, ...]: 表示值可以是多种类型之一
def process_input(data: Union[str, int, List[int]]) -> None:
    """处理多种类型的输入"""
    if isinstance(data, str):
        print(f"字符串: {data}")
    elif isinstance(data, int):
        print(f"整数: {data}")
    elif isinstance(data, list):
        print(f"列表: {data}")

# ========== typing模块常用类型说明 ==========
# List[T]: 泛型列表,T是元素类型
# Dict[K, V]: 泛型字典,K是键类型,V是值类型
# Tuple[T1, T2, ...]: 元组,每个位置有指定类型
# Set[T]: 泛型集合
# Optional[T]: T | None (Python 3.10+ 可以用 | 语法)
# Union[T1, T2]: T1 | T2 (Python 3.10+ 可以用 | 语法)
# Any: 任意类型
# NoReturn: 函数永不返回
# Callable: 可调用对象(函数、方法等)

# Python 3.10+ 简化语法:
# old: Union[str, int]
# new: str | int
# old: Optional[str]
# new: str | None

1.3 泛型

泛型允许我们创建可重用的组件,同时保持类型安全。定义类型变量T用于表示任意类型,在使用时指定具体类型。

# ========== 泛型 ==========
from typing import TypeVar, Generic

# 定义类型变量 T
# TypeVar 用于定义泛型类型参数
T = TypeVar('T')  # T 可以是任意类型

""" 创建泛型缓存类 """
class Cache(Generic[T]):
    def __init__(self):
        # 使用 T 作为值的类型
        self._cache: Dict[str, T] = {}
    
    def get(self, key: str) -> T:
        """获取缓存值"""
        return self._cache.get(key)
    
    def set(self, key: str, value: T) -> None:
        """设置缓存值"""
        self._cache[key] = value

# 使用泛型类(指定具体类型)
int_cache: Cache[int] = Cache()  # 整数类型的缓存
int_cache.set("count", 42)

str_cache: Cache[str] = Cache()  # 字符串类型的缓存
str_cache.set("name", "Alice")

# 泛型的好处:
# 1. 类型安全:编译器可以检查类型兼容性
# 2. 代码复用:同一个类可以处理不同类型
# 3. 文档清晰:明确说明类/函数支持的类型

# 多个类型参数示例:
# K = TypeVar('K')
# V = TypeVar('V')
# class Map(Generic[K, V]):
#     def __setitem__(self, key: K, value: V) -> None:
#         pass

1.4 结构子类型(Protocol)

基础篇中的鸭子类型(Duck Typing)就是一种动态类型的概念(只要具备约定的行为,就可以归为同一类型)。 Protocol 是结构子类型(鸭子类型)的类型安全实现。 任何类只要实现了协议中定义的方法,就被认为是该协议的实现。结构化(structural)类型系统的原生支持——只要满足协议,无需任何继承关系。

# ========== Protocol(结构子类型)==========
from typing import Protocol, runtime_checkable

""" 向量器协议(定义接口契约) """
# @runtime_checkable 装饰器:使协议可以用于 isinstance 检查
@runtime_checkable
class Vectorizer(Protocol):

    def vectorize(self, text: str) -> List[float]:
        """将单篇文本向量化"""
        ...  # 省略号表示抽象方法(不需要实现)
    
    def batch_vectorize(self, texts: List[str]) -> List[List[float]]:
        """将多篇文本批量向量化"""
        ...

# 实现协议的类(不需要显式继承)
class TFIDFVectorizer:
    """TF-IDF向量器"""
    
    def vectorize(self, text: str) -> List[float]:
        """将单篇文本向量化"""
        return [0.1, 0.2, 0.3]  # 模拟向量
    
    def batch_vectorize(self, texts: List[str]) -> List[List[float]]:
        """将多篇文本批量向量化"""
        return [[0.1, 0.2, 0.3] for _ in texts]

# 使用协议作为类型注解
def process_documents(vectorizer: Vectorizer, docs: List[str]):
    """处理文档(接受任何实现 Vectorizer 协议的对象)"""
    return vectorizer.batch_vectorize(docs)

# 验证协议匹配
tfidf = TFIDFVectorizer()
print(isinstance(tfidf, Vectorizer))  # True(因为 @runtime_checkable)

# 调用函数
result = process_documents(tfidf, ["document1", "document2"])

# Protocol vs 抽象基类:
# - Protocol:结构子类型(鸭子类型的类型安全版本)
# - ABC(抽象基类):名义子类型(需要显式继承)
# - Protocol 更灵活,适合解耦接口定义和实现

2、上下文管理器

在基础篇中我们学习了文件操作时使用 with 语句,上下文管理器是Python中用于资源管理的重要机制,通过 with 语句实现资源的自动获取和释放。上下文管理器确保资源在使用完毕后被正确清理,即使发生异常也能保证资源释放。

在AI应用中,上下文管理器常用于:

  • 数据库连接:自动打开和关闭连接
  • 文件操作:自动打开和关闭文件
  • 锁管理:自动获取和释放锁
  • 事务管理:自动提交或回滚事务

2.1 类形式上下文管理器

类形式上下文管理器 通过实现两个特殊方法来工作:

  • __enter__():进入上下文时调用,返回上下文对象
  • __exit__():退出上下文时调用,处理资源清理和异常
# ========== 类形式上下文管理器 ==========
"""数据库事务管理器"""
class DatabaseTransaction:
    def __init__(self, connection):
        """初始化事务管理器"""
        self.conn = connection  # 数据库连接
    
    def __enter__(self):
        """进入上下文:开始事务"""
        self.conn.begin()  # 开始数据库事务
        return self        # 返回上下文对象供 with 语句使用
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """ 退出上下文:提交或回滚事务
                Args:
                    exc_type: 异常类型(None表示没有异常)
                    exc_val: 异常值
                    exc_tb: 异常追踪信息
                Returns:
                    True: 异常已处理,不再向上传播
                    False: 异常未处理,继续向上传播
        """
        if exc_type is None:
            # 无异常,提交事务
            self.conn.commit()
            print("Transaction committed")
        else:
            # 有异常,回滚事务
            self.conn.rollback()
            print(f"Transaction rolled back: {exc_val}")

        return False  # 让异常继续传播(如果有)

# 使用示例
# with DatabaseTransaction(db) as tx:
#     tx.conn.execute("INSERT INTO users ...")
#     tx.conn.execute("UPDATE orders ...")
# 执行流程:
# 1. with 语句调用 __enter__,开始事务
# 2. 执行 with 块内的代码
# 3. 如果正常结束,__exit__ 提交事务
# 4. 如果发生异常,__exit__ 回滚事务

# 使用场景:
#    - 数据库事务管理(自动提交或回滚)
#    - 文件操作(自动关闭)
#    - 锁管理(自动释放)

2.2 生成器形式

使用 @contextmanager 装饰器可以更简洁地创建上下文管理器。

# ========== 生成器形式上下文管理器 ==========
from contextlib import contextmanager

""" 资源管理器(生成器形式)
        使用 @contextmanager 装饰器将生成器转换为上下文管理器:
        - yield 之前的代码:进入上下文时执行(相当于 __enter__)
        - yield 语句:返回值给 with 语句
        - yield 之后的代码:退出上下文时执行(相当于 __exit__,无论是否发生异常)
"""
@contextmanager
def managed_resource(name):
    # ========== 进入上下文(__enter__ 部分)==========
    print(f"Acquiring {name}")
    resource = f"Resource:{name}"
    
    try:
        # 返回资源给 with 语句
        yield resource
    finally:
        # ========== 退出上下文(__exit__ 部分)==========
        # 无论是否发生异常,都会执行这里
        print(f"Releasing {name}")

# 使用示例
with managed_resource("GPU") as res:
    print(f"Using {res}")

# 输出:
# Acquiring GPU
# Using Resource:GPU
# Releasing GPU

# 执行流程:
# 1. with 语句调用生成器
# 2. 执行到 yield 之前的代码(获取资源)
# 3. yield 返回资源,赋值给 res
# 4. 执行 with 块内的代码
# 5. 执行 finally 块(释放资源)

2.3 嵌套上下文管理器

使用 ExitStack 可以管理多个上下文管理器。

# ========== 嵌套上下文管理器 ==========
from contextlib import ExitStack

def process_files(filenames):
    """ 处理多个文件(使用 ExitStack 管理多个上下文)
            ExitStack 的作用:
            - 管理多个上下文管理器
            - 自动调用所有上下文的 __exit__ 方法
            - 即使某个操作失败,也能保证所有已获取的资源被正确释放
    """
    # 创建 ExitStack
    with ExitStack() as stack:
        # 使用 enter_context 注册多个上下文管理器
        # 所有文件会在 with 块结束时自动关闭
        files = [
            stack.enter_context(open(fname, 'r'))
            for fname in filenames
        ]
        # 处理所有文件
        for f in files:
            print(f.read())
    # 退出 with 块后,所有文件已自动关闭

# 使用示例
# process_files(['file1.txt', 'file2.txt', 'file3.txt'])

# ExitStack 工作原理:
# 1. 创建 ExitStack 对象
# 2. 调用 enter_context 时,记录上下文管理器
# 3. 退出 with 块时,按相反顺序调用所有上下文的 __exit__
# 4. 如果某个 __exit__ 失败,会继续处理其他资源的清理

# 使用场景:
#    - 动态数量的资源管理
#    - 不确定需要打开多少个文件/连接
#    - 需要同时管理多个资源
#    - 临时文件管理、据库连接池、网络连接管理

3、装饰器(Decorator)

装饰器是Python中一种强大的元编程工具,允许在不修改函数或类源代码的情况下,动态地增强其功能。装饰器的核心优势在于代码复用关注点分离,将横切关注点(如日志、缓存)与业务逻辑解耦。

在AI应用中,装饰器常用于:

  • 缓存:缓存函数返回值,避免重复计算
  • 日志记录:自动记录函数调用信息
  • 性能监控:统计函数执行时间
  • 权限验证:检查用户权限
  • 重试机制:自动重试失败的操作

3.1 基础装饰器

基础装饰器 是最简单的装饰器形式,直接接受一个函数作为参数,并返回一个包装函数。包装函数会在原函数执行前后添加额外逻辑。

使用 @functools.wraps 装饰器可以保留原函数的元信息(如函数名、文档字符串),这对于调试和自省非常重要。

import functools
import time

# ========== 基础装饰器 ==========
""" 计时装饰器:记录函数执行时间
        Args: 被装饰的函数
        Returns: 包装后的函数
"""
def timer(func):
    # @functools.wraps(func) 保留原函数的元信息(函数名、文档字符串等)
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        """包装函数:在原函数执行前后添加计时逻辑"""
        start = time.time()              # 记录开始时间
        result = func(*args, **kwargs)   # 调用原函数,保存返回值
        elapsed = time.time() - start    # 计算耗时
        print(f"{func.__name__} took {elapsed:.2f}s")  # 输出耗时信息
        return result                   # 返回原函数的结果
    return wrapper

# 使用装饰器
@timer
def slow_function():
    """示例慢函数"""
    time.sleep(1)  # 模拟耗时操作
    return "Done"

# 调用被装饰后的函数
slow_function()
# 输出: slow_function took 1.00s

# 装饰器工作原理:
# 1. @timer 等价于 slow_function = timer(slow_function)
# 2. timer 返回 wrapper 函数
# 3. 调用 slow_function() 实际调用的是 wrapper()

3.2 带参数的装饰器

带参数的装饰器 允许在装饰时传入额外的配置参数,使装饰器更加灵活。带参数的装饰器实际上是一个装饰器工厂函数,它接受参数并返回一个装饰器。

# ========== 带参数的装饰器 ==========
""" 重试装饰器工厂函数
        Args: 最大重试次数(默认3次),重试间隔(默认1秒)
        Returns: 装饰器函数
"""
def retry(max_attempts=3, delay=1):
    def decorator(func):
        """装饰器:接受被装饰的函数"""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            """包装函数:实现重试逻辑"""
            for attempt in range(max_attempts):
                try:
                    # 尝试调用原函数
                    return func(*args, **kwargs)
                except Exception as e:
                    # 检查是否是最后一次尝试
                    if attempt == max_attempts - 1:
                        # 最后一次失败,抛出异常
                        raise
                    # 打印失败信息并等待
                    print(f"Attempt {attempt + 1} failed: {e}")
                    time.sleep(delay)
        return wrapper
    return decorator

# 使用带参数的装饰器
@retry(max_attempts=3, delay=2)
def unreliable_api_call():
    """模拟不稳定的API调用"""
    import random
    # 70%概率抛出异常
    if random.random() < 0.7:
        raise ConnectionError("Network error")
    return "Success"

# 调用被装饰后的函数
unreliable_api_call()
# 输出: Attempt 1 failed: Network error
# 输出: Attempt 2 failed: Network error
# 输出: Attempt 3 failed: Network error
# 输出: Success

# 带参数装饰器的工作流程:
# 1. @retry(max_attempts=3, delay=2) 调用 retry(3, 2)
# 2. retry 返回 decorator 函数
# 3. decorator(unreliable_api_call) 返回 wrapper
# 4. unreliable_api_call = wrapper

3.3 类装饰器

类装饰器 是使用类实现的装饰器,核心是通过实现 __call__ 方法使其可调用,该方法接受被装饰的类或函数作为参数并返回修改后的对象。

# ========== 类装饰器 ==========
""" 类装饰器:实现单例模式
        1. 实现 __call__ 方法使类实例可调用
        2. 在 __call__ 中检查类是否已有实例
        3. 如果没有则创建并缓存,否则返回缓存的实例
"""
class Singleton:
    # 类变量:存储所有单例实例
    _instances = {}
    def __call__(self, cls):
        """ 当装饰器被应用时调用
                Args:
                    cls: 被装饰的类
                Returns:
                    该类的唯一实例
        """
        # 检查该类是否已有实例
        if cls not in self._instances:
            # 创建实例并缓存
            self._instances[cls] = cls()
        # 返回已存在的实例
        return self._instances[cls]

# 使用类装饰器(注意需要调用 Singleton())
@Singleton()
class Database:
    """数据库连接类(单例)"""
    def __init__(self):
        self.connection = "connected"

# 创建实例
db1 = Database()
db2 = Database()

# 验证单例:两个变量指向同一个实例
print(db1 is db2)  # 输出: True

# 类装饰器与函数装饰器的区别:
# 1. 类装饰器使用类实现,函数装饰器使用函数实现
# 2. 类装饰器可以维护状态(通过实例变量)
# 3. 类装饰器更适合需要复杂逻辑或状态管理的场景

3.4 缓存装饰器

对于计算密集型操作(如向量计算、模型推理),缓存可以显著提高性能。

# ========== 缓存装饰器 ==========
""" 缓存装饰器:缓存函数返回值,避免重复计算 """
def cached(func):
    # 缓存字典:key -> 函数返回值
    cache = {}
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        """包装函数:实现缓存逻辑"""
        # 生成缓存键(基于参数)
        key = str(args) + str(kwargs)
        # 检查缓存是否命中
        if key not in cache:
            # 缓存未命中,调用原函数并缓存结果
            cache[key] = func(*args, **kwargs)
        # 返回缓存的值
        return cache[key]
    
    # 为包装函数添加额外属性
    wrapper.cache = cache                     # 暴露缓存字典(查看缓存内容)
    wrapper.clear_cache = lambda: cache.clear()  # 清空缓存的方法
    
    return wrapper

# 使用缓存装饰器
@cached
def compute_embedding(text: str) -> list:
    """计算文本向量(模拟)"""
    # 模拟耗时计算
    return [hash(text) % 100 for _ in range(10)]

# 使用示例
result1 = compute_embedding("Hello")  # 第一次调用,计算并缓存
result2 = compute_embedding("Hello")  # 第二次调用,直接返回缓存

# 缓存大小(如果有多个不同输入)
# print(len(compute_embedding.cache))
# 清空缓存
# compute_embedding.clear_cache()

# 适用场景:
# - 计算密集型操作(如向量计算、模型推理)
# - 函数返回值只依赖于输入参数
# - 函数没有副作用(不会修改外部状态)

4、异步编程(Asyncio)

在基础篇中我们学习了多线程编程,异步编程是另一种并发编程方式,更适合IO密集型任务。异步编程的核心优势在于非阻塞IO,当一个任务等待IO操作时,可以切换到其他任务执行,从而提高整体吞吐量。

在AI应用中,特别适用于以下场景:

  • WebSocket实时通信:语音助手的双向数据流
  • 流式处理:ASR/TTS的实时音频流
  • 并发API调用:同时调用多个LLM服务
  • IO密集型任务:数据库查询、网络请求

4.1 协程

协程(Coroutine) 是一种轻量级的并发编程方式,允许在单个线程内实现多个任务的切换执行。与线程不同,协程的调度由程序员控制,不需要操作系统介入,因此开销更小、效率更高。

Python中使用 async def 定义协程函数,使用 await 关键字挂起协程执行。协程本质上是可暂停的函数,可以在暂停点保存状态,并在恢复时继续执行。

import asyncio

# ========== 协程定义 ==========
# 使用 async def 定义协程函数
async def task1():
    # await 关键字:挂起当前协程,执行其他任务
    await asyncio.sleep(1)  # 模拟1秒IO操作(非阻塞)
    return "Task 1 done"    # 返回结果

async def task2():
    await asyncio.sleep(2)  # 模拟2秒IO操作
    return "Task 2 done"

# ========== 并发执行 ==========
async def concurrent_demo():
    # asyncio.gather():并发执行多个协程,等待所有完成后收集结果
    results = await asyncio.gather(
        task1(),           # 启动协程1
        task2(),           # 启动协程2(与task1并发执行)
        return_exceptions=True  # 异常不中断其他任务,异常作为结果返回
    )
    print(results)  # 输出: ['Task 1 done', 'Task 2 done']

# ========== 程序入口 ==========
# asyncio.run():运行主协程,管理事件循环
asyncio.run(concurrent_demo())

# 执行流程说明:
# 1. asyncio.run() 创建事件循环并运行 concurrent_demo
# 2. concurrent_demo 调用 asyncio.gather(task1(), task2())
# 3. task1 和 task2 并发执行(不是真正并行,而是在IO等待时切换)
# 4. task1 先完成(1秒),task2 后完成(2秒)
# 5. gather 收集结果并返回

4.2 异步上下文管理器

异步上下文管理器 是上下文管理器的异步版本,允许在异步环境中安全地管理资源。它使用 async with 语句,在进入和退出上下文时执行异步操作。

异步上下文管理器需要实现两个特殊方法:

  • __aenter__():进入上下文时调用,返回上下文对象
  • __aexit__():退出上下文时调用,处理资源清理(无论是否发生异常)
"""实现异步上下文管理器 - 异步数据库连接"""
class AsyncDatabaseConnection:
    
    async def __aenter__(self):
        """ 进入异步上下文时调用:建立数据库连接 """
        await self.connect()  # 异步连接数据库
        return self           # 返回上下文对象供 async with 语句使用
    
    async def __aexit__(self, exc_type, exc_val, exc_tb): # exc_type: 异常类型(如果有),exc_val: 异常值(如果有),exc_tb: 异常追踪信息(如果有)
        """ 退出异步上下文时调用:关闭数据库连接 """
        await self.close()    # 异步关闭连接(确保资源释放)
    
    async def connect(self):
        """建立数据库连接(模拟)"""
        print("Connecting...")
        await asyncio.sleep(0.5)  # 模拟网络延迟
        self.connected = True     # 标记连接状态
    
    async def close(self):
        """关闭数据库连接(模拟)"""
        print("Closing...")
        self.connected = False    # 标记连接已关闭

"""使用异步数据库连接"""
async def use_connection():
    # async with 自动调用 __aenter__ 和 __aexit__
    async with AsyncDatabaseConnection() as conn:
        print(f"Connected: {conn.connected}")
        # 执行数据库操作(在 async with 块内)
    
    # 退出 async with 块后,__aexit__ 自动被调用,连接已关闭
asyncio.run(use_connection())

# 输出:
# Connecting...
# Connected: True
# Closing...

# 应用场景:
# 1. 异步数据库连接的自动管理
# 2. 异步文件操作
# 3. 异步锁的获取和释放

4.3 异步迭代器

异步迭代器 允许在异步环境中进行迭代操作,是处理流式数据的重要工具。

异步迭代器需要实现两个特殊方法:

  • __aiter__():返回迭代器对象本身
  • __anext__():返回下一个元素的协程,当迭代结束时抛出 StopAsyncIteration
"""实现异步迭代器 - 异步数据流"""
class AsyncDataStream:

    def __init__(self, max_items=10):
        """初始化数据流"""
        self.items = list(range(max_items))  # 生成模拟数据
        self.index = 0                       # 当前迭代位置
    
    def __aiter__(self):
        """返回迭代器对象本身"""
        return self
    
    async def __anext__(self):
        """异步返回下一个元素"""
        # 检查是否到达末尾
        if self.index >= len(self.items):
            raise StopAsyncIteration  # 抛出异常表示迭代结束
        
        # 获取当前元素
        item = self.items[self.index]
        self.index += 1
        
        # 模拟异步数据获取延迟(如网络请求)
        await asyncio.sleep(0.1)
        
        return item

"""消费异步数据流"""
async def consume_stream():
    # async for: 异步迭代,每次迭代时等待 __anext__ 完成
    async for item in AsyncDataStream(5):
        print(f"Received: {item}")

asyncio.run(consume_stream())

# 输出(每隔0.1秒输出一行):
# Received: 0
# Received: 1
# Received: 2
# Received: 3
# Received: 4

# 应用场景:
# 1. 处理流式API返回的数据
# 2. 处理实时音频/视频流
# 3. 处理大数据集的分批加载

5、日志系统

Python的logging模块提供了灵活的日志配置能力,支持多种日志级别、格式和输出目标。

5.1 基础配置

# ========== 日志系统基础配置 ==========
import logging

# 配置日志系统
logging.basicConfig(
    level=logging.INFO,  # 设置日志级别(DEBUG < INFO < WARNING < ERROR < CRITICAL)
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  # 日志格式
    handlers=[
        logging.FileHandler('app.log'),  # 输出到文件
        logging.StreamHandler()          # 输出到控制台
    ]
)

# 获取日志记录器(使用模块名作为日志器名称)
logger = logging.getLogger(__name__)

# 使用日志记录器
logger.debug("This is a debug message")   # 低于 INFO 级别,不会输出
logger.info("Application started")        # 输出到文件和控制台
logger.warning("This is a warning")       # 警告级别
logger.error("This is an error")         # 错误级别

# 日志级别说明:
# - DEBUG: 详细的调试信息
# - INFO: 程序正常运行的信息
# - WARNING: 警告信息(可能的问题)
# - ERROR: 错误信息(功能无法正常执行)
# - CRITICAL: 严重错误(程序可能无法继续运行)

# 日志格式说明:
# - %(asctime)s: 时间戳
# - %(name)s: 日志器名称
# - %(levelname)s: 日志级别
# - %(message)s: 日志消息
# - %(funcName)s: 函数名
# - %(lineno)d: 行号

5.2 高级配置

# ========== 日志系统高级配置 ==========
import logging
from logging.handlers import RotatingFileHandler
import logging.config

# 字典形式的日志配置
LOGGING_CONFIG = {
    'version': 1,  # 配置版本(必须为1)
    'disable_existing_loggers': False,  # 是否禁用已存在的日志器
    
    # ========== 格式器(Formatters)==========
    # 定义日志输出格式
    'formatters': {
        'detailed': {
            # 详细格式:包含时间、日志器名、级别、函数名、行号、消息
            'format': '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
        },
        'simple': {
            # 简单格式:只包含级别和消息
            'format': '%(levelname)s - %(message)s'
        }
    },
    
    # ========== 处理器(Handlers)==========
    # 定义日志输出目标
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',  # 控制台输出
            'level': 'INFO',                   # 只处理 INFO 及以上级别
            'formatter': 'simple'              # 使用简单格式
        },
        'file': {
            'class': 'logging.handlers.RotatingFileHandler',  # 轮转文件处理器
            'filename': 'app.log',                           # 日志文件名
            'maxBytes': 1024 * 1024 * 10,                   # 单个文件最大10MB
            'backupCount': 5,                               # 保留5个备份文件
            'formatter': 'detailed'                          # 使用详细格式
        }
    },
    
    # ========== 日志器(Loggers)==========
    # 定义具体的日志记录器
    'loggers': {
        'ai': {
            'level': 'DEBUG',             # 日志级别设为 DEBUG
            'handlers': ['console', 'file'],  # 输出到控制台和文件
            'propagate': False            # 不向上传播日志
        }
    }
}

# 应用配置
logging.config.dictConfig(LOGGING_CONFIG)

# 获取特定名称的日志器
ai_logger = logging.getLogger('ai.model')  # 属于 'ai' 日志器的子日志器

# 使用日志器
ai_logger.debug("Model loaded")    # DEBUG 级别,会输出到文件和控制台
ai_logger.info("Processing request")
ai_logger.warning("Deprecated API")

# RotatingFileHandler 特点:
# - 当文件达到 maxBytes 时,自动创建新文件
# - 保留 backupCount 个备份文件
# - 文件命名格式:app.log.1, app.log.2, ..., app.log.5
END .

相关系列文章

×