「学习笔记」Python进阶编程(下)

文章目录

1、设计模式

设计模式 是解决软件设计中常见问题的通用解决方案。设计模式提供了一套经过验证的最佳实践,帮助开发者编写可维护、可扩展和可复用的代码。

1.1 工厂模式

封装对象创建逻辑,通过统一接口创建不同类型的对象。

from abc import ABC, abstractmethod

# 抽象产品接口
class LLMProvider(ABC):
    """LLM提供者抽象基类"""
    
    @abstractmethod
    def generate(self, prompt: str) -> str:
        """生成文本响应"""
        pass

# 具体产品:OpenAI
class OpenAIProvider(LLMProvider):
    """OpenAI LLM提供者"""
    
    def generate(self, prompt: str) -> str:
        return f"OpenAI: {prompt}"

# 具体产品:Anthropic
class AnthropicProvider(LLMProvider):
    """Anthropic LLM提供者"""
    
    def generate(self, prompt: str) -> str:
        return f"Anthropic: {prompt}"

# 工厂类
class LLMFactory:
    """ LLM提供者工厂类:根据类型创建对应的LLM提供者实例"""
    # 注册的提供者映射
    _providers = {
        "openai": OpenAIProvider,
        "anthropic": AnthropicProvider
    }
    
    @classmethod
    def create(cls, provider_type: str) -> LLMProvider:
        """ 创建LLM提供者实例
                Args:
                    provider_type: 提供者类型("openai"或"anthropic")
                Returns:
                    LLMProvider实例
                Raises:
                    ValueError: 未知的提供者类型
        """
        provider_class = cls._providers.get(provider_type)
        if not provider_class:
            raise ValueError(f"Unknown provider: {provider_type}")
        return provider_class()

# 使用示例
llm = LLMFactory.create("openai")  # 创建OpenAI提供者
# llm = LLMFactory.create("anthropic")  # 创建Anthropic提供者

1.2 策略模式

定义一系列算法,封装每个算法,并使它们可以互换。

# 策略接口
class RAGStrategy(ABC):
    """RAG检索策略抽象基类"""
    @abstractmethod
    def retrieve(self, query: str, top_k: int):
        """ 检索相关文档
                Args:
                    query: 查询文本
                    top_k: 返回的文档数量
                Returns:
                    检索到的文档列表
        """
        pass

# 具体策略1:向量搜索
class VectorSearchStrategy(RAGStrategy):
    """向量搜索策略"""
    def __init__(self, vector_store):
        """初始化向量搜索策略"""
        self.vector_store = vector_store  # 向量数据库
    
    def retrieve(self, query: str, top_k: int):
        """使用向量数据库进行语义搜索"""
        return self.vector_store.search(query, top_k)

# 具体策略2:混合搜索(可选扩展)
# class HybridSearchStrategy(RAGStrategy):
#     def retrieve(self, query: str, top_k: int):
#         # 结合关键词搜索和向量搜索
#         pass

# 上下文类:使用策略
class RAGPipeline:
    """RAG管道:使用策略模式动态切换检索策略"""
    def __init__(self, strategy: RAGStrategy):
        """初始化RAG管道"""
        self.strategy = strategy  # 注入策略
    
    def set_strategy(self, strategy: RAGStrategy):
        """动态切换策略"""
        self.strategy = strategy
    
    def query(self, question: str):
        """执行RAG查询"""
        # 使用当前策略检索文档
        docs = self.strategy.retrieve(question, top_k=5)
        # 生成最终回答
        return self.generate_answer(question, docs)
    
    def generate_answer(self, question: str, docs: list):
        """根据检索到的文档生成回答"""
        return f"Answer based on {len(docs)} documents"

1.3 观察者模式

定义对象间的一对多依赖,当一个对象状态改变时,所有依赖者都会收到通知。

from typing import List, Callable

class EventEmitter:
    """ 事件发射器:实现观察者模式的核心类
            职责:
            1. 维护事件与监听器的映射
            2. 提供注册/取消注册监听器的接口
            3. 触发事件并通知所有监听器
    """
    def __init__(self):
        """初始化事件发射器"""
        # 事件注册表:事件名 -> 回调函数列表
        self._listeners: dict = {}
    
    def on(self, event: str, callback: Callable):
        """ 注册事件监听器
            Args:
                event: 事件名称
                callback: 事件触发时调用的回调函数
        """
        if event not in self._listeners:
            self._listeners[event] = []
        self._listeners[event].append(callback)
    
    def off(self, event: str, callback: Callable):
        """ 移除事件监听器
            Args:
                event: 事件名称
                callback: 要移除的回调函数
        """
        if event in self._listeners:
            self._listeners[event].remove(callback)
    
    def emit(self, event: str, *args, **kwargs):
        """ 触发事件,通知所有监听器
            Args:
                event: 事件名称
                *args: 传递给回调的位置参数
                **kwargs: 传递给回调的关键字参数
        """
        if event in self._listeners:
            # 遍历所有监听器并调用
            for callback in self._listeners[event]:
                callback(*args, **kwargs)

# 使用示例
emitter = EventEmitter()

def on_message(message):
    """消息事件处理器"""
    print(f"收到消息: {message}")

# 注册监听器
emitter.on("message", on_message)

# 触发事件
emitter.emit("message", "Hello, World!")  # 输出: 收到消息: Hello, World!

# 应用场景:
# 1. GUI事件处理
# 2. 消息队列/事件总线
# 3. 实时数据更新通知
# 4. 插件系统的钩子机制

2、元编程

元编程 是指编写能够操作代码的代码。在Python中,元编程允许我们在运行时动态创建、修改或检查类和函数。

2.1 元类基础

元类是创建类的类,type是Python中所有类的元类。

class SingletonMeta(type):
    """单例元类:控制类的实例创建,确保类只有一个实例"""
    # 类变量:存储所有单例实例
    _instances = {}
    
    def __call__(cls, *args, **kwargs):
        """当类被调用时(如 Database()),此方法被执行
                Args:
                    cls: 被实例化的类
                    args/kwargs: 传递给__init__的参数
        """
        # 检查该类是否已有实例
        if cls not in cls._instances:
            # 使用父类type的__call__创建实例
            cls._instances[cls] = super().__call__(*args, **kwargs)
        # 返回已存在的实例
        return cls._instances[cls]

# 使用单例元类创建单例类
class Database(metaclass=SingletonMeta):
    """数据库连接类(单例)"""
    def __init__(self):
        self.connection = "connected"

# 获取实例
db1 = Database()
db2 = Database()
print(db1 is db2)  # True,两个变量指向同一个实例

2.2 动态类生成

使用 type() 动态创建类,无需先定义类的代码。

  • type() 的三个参数:
    1. class_name: 类名
    2. bases: 父类元组
    3. attributes: 类属性字典(方法和属性)
def create_class(class_name: str, attributes: dict):
    return type(class_name, (object,), attributes)

# 创建一个动态类
Person = create_class("Person", {
    # __init__ 方法:初始化对象
    "__init__": lambda self, name: setattr(self, "name", name),
    # greet 方法:返回问候语
    "greet": lambda self: f"Hello, {self.name}"
})

# 使用动态创建的类
person = Person("Alice")
print(person.greet())  # 输出: Hello, Alice

2.3 类装饰器进阶

类装饰器可以动态为类添加方法,无需修改类的定义;参数为类对象,返回修改后的类对象(添加了新方法)。

def add_method(cls):
    # 定义要添加的方法
    def new_method(self):
        return f"Added method for {self.__class__.__name__}"
    
    # 将方法添加到类中
    cls.new_method = new_method
    return cls

# 使用类装饰器
@add_method
class MyClass:
    """原始类(没有new_method方法)"""
    pass

# 创建实例并调用新增的方法
obj = MyClass()
print(obj.new_method())  # 输出: Added method for MyClass

3、并发编程

在基础篇中我们学习了多线程编程,并发编程 是指同时处理多个任务的编程方式。在Python中,并发可以通过多种方式实现:

  • 多线程:在单个进程中运行多个线程
  • 多进程:运行多个独立进程
  • 协程:在单个线程内实现任务切换

3.1 线程池

import threading

"""简单线程池实现:管理多个工作线程处理任务"""
class ThreadPool:
    def __init__(self, num_threads: int = 4):
        """初始化线程池"""
        self.num_threads = num_threads    # 线程数量
        self._threads = []                # 存储线程对象
        self._lock = threading.Lock()     # 互斥锁:保护任务队列
        self._tasks = []                  # 任务队列
    
    def add_task(self, func, *args, **kwargs):
        """添加任务到队列"""
        with self._lock:  # 获取锁,确保线程安全
            self._tasks.append((func, args, kwargs))
    
    def _worker(self):
        """工作线程:循环从队列中获取并执行任务"""
        while True:
            with self._lock:
                # 队列为空则退出
                if not self._tasks:
                    break
                # 从队列头部取出任务
                func, args, kwargs = self._tasks.pop(0)
            
            # 在锁外执行任务(避免长时间持有锁)
            try:
                func(*args, **kwargs)
            except Exception as e:
                print(f"Task failed: {e}")
    
    def start(self):
        """启动所有工作线程"""
        for _ in range(self.num_threads):
            thread = threading.Thread(target=self._worker)
            thread.start()
            self._threads.append(thread)
    
    def join(self):
        """等待所有线程完成"""
        for thread in self._threads:
            thread.join()

# 使用示例
def process_item(item):
    """处理单个项目"""
    print(f"Processing {item}")

# 创建线程池(3个工作线程)
pool = ThreadPool(num_threads=3)

# 添加10个任务
for i in range(10):
    pool.add_task(process_item, i)

# 启动线程池并等待完成
pool.start()
pool.join()

3.2 多进程

import multiprocessing
from typing import List

def parallel_compute(data: List[int]) -> int:
    """并行计算:计算数据平方和"""
    return sum(x * x for x in data)

def chunk_data(data: List[int], num_chunks: int) -> List[List[int]]:
    """ 将数据分割成多个块
            Args:
                data: 原始数据
                num_chunks: 分割数量
            Returns:
                分割后的数据块列表
    """
    chunk_size = len(data) // num_chunks
    chunks = []
    for i in range(num_chunks):
        start = i * chunk_size
        # 最后一块包含剩余所有数据
        end = start + chunk_size if i < num_chunks - 1 else len(data)
        chunks.append(data[start:end])
    return chunks

if __name__ == "__main__":
    # 注意:多进程代码必须放在 if __name__ == "__main__" 块中
    
    # 大数据集(1000万个整数)
    data = list(range(10_000_000))
    
    # 获取CPU核心数作为进程数
    num_processes = multiprocessing.cpu_count()
    print(f"Using {num_processes} processes")
    
    # 将数据分割成对应数量的块
    chunks = chunk_data(data, num_processes)
    
    # 创建进程池并执行
    with multiprocessing.Pool(num_processes) as pool:
        # pool.map: 将任务分配给各进程并行执行
        results = pool.map(parallel_compute, chunks)
    
    # 汇总各进程结果
    total = sum(results)
    print(f"Total: {total}")

3.3 线程安全数据结构

import threading

class ThreadSafeQueue:
    """ 线程安全队列:使用锁和条件变量实现线程安全的生产者-消费者模式
            特点:
            - 使用 Lock 保护队列访问
            - 使用 Condition 实现阻塞等待(队列为空时get阻塞)
    """
    def __init__(self):
        self._queue = []                          # 底层队列
        self._lock = threading.Lock()             # 互斥锁
        self._not_empty = threading.Condition(self._lock)  # 条件变量
    
    def put(self, item):
        """添加元素到队列(线程安全)"""
        with self._lock:
            self._queue.append(item)
            # 通知等待的线程:队列不再为空
            self._not_empty.notify()
    
    def get(self):
        """获取元素(阻塞等待直到队列非空)"""
        with self._lock:
            # 循环检查:防止虚假唤醒
            while not self._queue:
                self._not_empty.wait()  # 释放锁并等待通知
            return self._queue.pop(0)   # 取出队首元素
    
    def empty(self):
        """检查队列是否为空(线程安全)"""
        with self._lock:
            return len(self._queue) == 0

# 使用示例(生产者-消费者模式):
# queue = ThreadSafeQueue()
# 
# def producer():
#     for i in range(10):
#         queue.put(i)
# 
# def consumer():
#     while True:
#         item = queue.get()
#         print(f"Consumed: {item}")

4、内存管理

内存管理 是程序运行时对内存资源的分配和释放进行管理的过程。Python使用自动内存管理机制,主要包括:

  • 引用计数:跟踪对象的引用数量
  • 垃圾回收:处理循环引用等复杂情况
  • 内存池:优化小对象的分配

4.1 引用计数

import sys

class MyObject:
    """自定义对象,包含__del__方法观察销毁时机"""
    def __del__(self):
        """对象被销毁时调用(引用计数归零时)"""
        print("Object destroyed")

# 创建对象,引用计数为1
obj = MyObject()

# getrefcount() 会临时增加引用计数,所以显示为2
print(sys.getrefcount(obj))  # 输出: 2 (变量obj + getrefcount参数)

# 创建新引用,引用计数变为2
obj2 = obj
print(sys.getrefcount(obj))  # 输出: 3

# 删除引用,引用计数变为1
del obj2
print(sys.getrefcount(obj))  # 输出: 2

# 当obj超出作用域或被del删除时,引用计数归0,对象被销毁

4.2 弱引用

import weakref

class Data:
    """示例数据类"""
    def __init__(self, value):
        self.value = value

# 创建对象
data = Data(42)

# 创建弱引用:不增加引用计数
ref = weakref.ref(data)

# 通过弱引用访问对象
print(ref())  # 输出: <__main__.Data object at ...>

# 删除强引用
del data

# 对象已被垃圾回收,弱引用返回None
print(ref())  # 输出: None

# 弱引用的用途:
# 1. 缓存场景:不阻止对象被回收
# 2. 避免循环引用
# 3. 观察者模式中避免内存泄漏

4.3 垃圾回收

import gc

# ========== 垃圾回收调试模式 ==========
# gc.DEBUG_LEAK 启用泄漏检测(会打印详细信息)
gc.set_debug(gc.DEBUG_LEAK)

# ========== 创建循环引用 ==========
# 循环引用无法通过引用计数回收,需要垃圾回收器处理
class Node:
    def __init__(self):
        self.next = None

a = Node()
b = Node()
a.next = b  # a引用b
b.next = a  # b引用a(形成循环)

# 删除变量引用
del a
del b

# 此时a和b的引用计数都是1(互相引用),无法通过引用计数回收
# 需要垃圾回收器检测并清理循环引用

# ========== 手动触发垃圾回收 ==========
gc.collect()  # 检测并回收循环引用对象

# ========== 关闭调试模式 ==========
gc.set_debug(0)  # 关闭调试输出

# 垃圾回收器的主要作用:
# 1. 检测和回收循环引用
# 2. 处理容器对象(list, dict, class等)的循环引用
# 3. 定期自动运行(可通过 gc.set_threshold() 配置)

5、性能优化

性能优化 是改进程序执行效率和资源使用的过程。在Python中,性能优化通常包括:

  • 算法优化:选择更高效的算法和数据结构
  • 代码优化:优化代码结构和执行流程
  • 工具辅助:使用性能分析工具定位瓶颈
  • 并行计算:利用多线程、多进程或协程

5.1 性能分析

import cProfile
import pstats

def slow_function():
    """慢函数:计算100万个数的平方和"""
    result = 0
    for i in range(1_000_000):
        result += i * i
    return result

# ========== 使用cProfile进行性能分析 ==========
# 创建性能分析器
profiler = cProfile.Profile()

# 启用性能分析
profiler.enable()

# 执行待分析的代码
slow_function()

# 禁用性能分析
profiler.disable()

# 分析结果
# sort_stats('cumulative') - 按累计时间排序
stats = pstats.Stats(profiler).sort_stats('cumulative')

# 打印前10个耗时最多的函数
stats.print_stats(10)

# cProfile输出字段说明:
# ncalls - 调用次数
# tottime - 函数内部耗时(不包括子函数)
# percall - 每次调用耗时(tottime / ncalls)
# cumtime - 累计耗时(包括子函数)
# percall - 每次调用累计耗时(cumtime / ncalls)
# filename:lineno(function) - 函数位置

5.2 优化技巧

# ========== 优化技巧1:使用生成器替代列表 ==========
# 生成器按需生成值,节省内存
def generate_numbers(n):
    for i in range(n):
        yield i  # 每次调用返回一个值,而不是一次性生成所有值

# 对比:列表会一次性生成所有值,占用更多内存
# def generate_numbers_list(n):
#     return list(range(n))

# ========== 优化技巧2:使用内置函数 ==========
# 内置函数用C实现,比Python循环快得多
import math
result = sum(math.sqrt(i) for i in range(1_000_000))

# ========== 优化技巧3:使用lru_cache缓存 ==========
from functools import lru_cache

@lru_cache(maxsize=128)  # 缓存最近128个调用结果
def fibonacci(n):
    """计算斐波那契数(递归方式)"""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 调用示例
fibonacci(100)  # 第一次计算较慢
fibonacci(100)  # 第二次直接返回缓存结果,非常快

# 其他优化技巧:
# 1. 使用局部变量(访问速度更快)
# 2. 避免在循环中进行重复计算
# 3. 使用合适的数据结构(如set用于快速查找)
# 4. 对于CPU密集型任务,使用多进程或C扩展

6、单元测试

单元测试 是软件开发中的关键实践,用于验证代码的正确性和可靠性。单元测试关注代码的最小可测试单元(通常是函数或方法),确保其在各种情况下都能正确工作。

Python中常用的测试框架包括pytestunittest,其中pytest以其简洁的语法和强大的功能而广受欢迎。

6.1 pytest基础

import pytest

# 待测试的函数
def add(a: int, b: int) -> int:
    """简单的加法函数"""
    return a + b

# ========== 基本测试 ==========
# pytest自动发现以 test_ 开头的函数
def test_add_basic():
    """测试基本加法"""
    assert add(1, 2) == 3  # assert断言:验证结果是否符合预期

def test_add_negative():
    """测试负数加法"""
    assert add(-1, -1) == -2

# ========== 参数化测试 ==========
# @pytest.mark.parametrize 装饰器:用多组参数运行同一测试
@pytest.mark.parametrize("a, b, expected", [  # 参数名列表
    (1, 2, 3),    # 测试用例1
    (0, 0, 0),    # 测试用例2
    (-1, 1, 0),   # 测试用例3
])
def test_add_parametrized(a, b, expected):
    """参数化测试:多组输入验证"""
    assert add(a, b) == expected

# ========== 异常测试 ==========
def test_divide_by_zero():
    """测试异常抛出"""
    # pytest.raises 上下文管理器:验证是否抛出指定异常
    with pytest.raises(ZeroDivisionError):
        1 / 0  # 此操作应抛出 ZeroDivisionError

6.2 Fixtures

import pytest

# ========== Fixture:测试夹具 ==========
# @pytest.fixture 装饰器定义可复用的测试资源
@pytest.fixture
def sample_data():
    """提供测试数据的fixture"""
    return {
        "users": [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"}
        ]
    }

@pytest.fixture
def db_connection():
    """数据库连接fixture(带清理逻辑)"""
    # 在 yield 之前的代码:setup阶段
    conn = create_test_db()  # 创建测试数据库连接
    
    # yield 返回值给测试函数
    yield conn
    
    # 在 yield 之后的代码:teardown阶段(无论测试成功或失败都会执行)
    conn.cleanup()  # 清理资源

# ========== 使用Fixtures ==========
# 测试函数通过参数名自动获取fixture实例
def test_users(db_connection, sample_data):
    """测试数据库查询(使用两个fixtures)"""
    users = db_connection.query("SELECT * FROM users")
    assert len(users) == 2

def test_data_structure(sample_data):
    """测试数据结构"""
    assert len(sample_data["users"]) == 2

6.3 异步测试

import pytest
import asyncio

# ========== 异步测试 ==========
# @pytest.mark.asyncio 装饰器:标记异步测试函数
@pytest.mark.asyncio
async def test_async_function():
    """测试异步函数"""
    # await 调用异步操作
    result = await asyncio.sleep(0.1, result=42)
    assert result == 42  # 验证异步操作结果

# ========== 自定义事件循环 ==========
@pytest.fixture
def event_loop():
    """自定义事件循环fixture"""
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    yield loop                       # 提供给测试使用
    loop.close()                     # 清理:关闭事件循环
END .

相关系列文章

×