Python 3.15:没有成为头条新闻的功能

2026-05-21 1 阅读 rbanffy
又到了一年中的这个时候,Python 的新版本即将到来。随着 Python 3.15.0b1 功能冻结,我们知道今年晚些时候 Python 将会发生什么。即将推出许多重要功能,包括惰性导入和我之前介绍过的快子分析器。去年,我非常喜欢研究 Python 3.14 的较小功能。我发现其中许多功能与大型 PEP 一样有趣,值得更多关注。今年的情况也不例外。 Asyncio 任务组取消 此版本中没有太多 Asyncio 更改。这里的主要功能是能够优雅地取消任务组。 TaskGroup 是结构化并发的一种形式,它使开发人员能够以干净的方式创建多个并发任务。与 asyncio 异步。 TaskGroup() 作为 tg : tg 。创建任务(运行())tg。 create_task ( run ()) # 等待所有任务完成 假设我们想在后台等待某种信号来中断任务组的执行,这在 asyncio 中看起来很简单,但实际上这样做有点尴尬。类中断(异常):...与抑制(中断):异步与异步。 TaskGroup() 作为 tg : tg 。创建任务(运行())tg。 create_task(run()) ifawaitwait_for_signal():raiseInterrupt() 这之所以有效,是因为任务组内引发的异常会导致其他任务取消。自定义中断异常作为 ExceptionGroup 的一部分引发,然后由 contextlib.suppress 过滤,从而正常退出。抑制与 ExceptionGroup 一起工作的方式是 3.12 中另一个被忽视的功能。这是我在研究本文时偶然了解到的一个更改。新的 TaskGroup.cancel 使这个过程变得更加容易: async 与 asyncio 。 TaskGroup() 作为 tg : tg 。创建任务(运行())tg。 create_task ( run ()) 如果等待 wait_for_signal (): tg 。 cancel() 与以前不同,它如此简单,几乎没有任何解释的意义。它只是取消该组而不引发任何异常。上下文管理器的改进 装饰器非常难编写,以至于它成为了面试的首选问题。但您知道上下文管理器也可以兼作装饰器吗? @contextmanager def period ( message : str ) -> 迭代器 [ None ]: start = time 。 perf_counter () try :ieldfinally:print(f"{message}elapsed{time.perf_counter()-start:.2f}Seconds") 这里我有一个非常常用的上下文管理器来打印块中花费的持续时间。从 Python 3.3 开始,我们也可以直接使用它作为装饰器: @duration ( 'workload' ) def Workload (): ... # 或者简单地作为包装器持续时间 ( 'stuff' )( other_workload )( ... ) 但虽然它很方便,但在某些情况下它根本不起作用: @duration ( 'async Workload' ) async def async_workload (): ... @duration ( 'generator Workload' ) defworkload(): whileTrue:yield... 迭代器、异步函数和异步迭代器在这里不能很好地工作,因为它们与标准函数具有不同的语义。当您调用它们时,它们立即分别返回生成器对象、协程函数和异步生成器对象。因此,装饰器立即完成,而不是它所包装的整个生命周期。这是我多次遇到的一个不幸的问题,而且对于普通的装饰器来说这也经常是一个问题。但这种情况在 3.15 中发生了变化,现在 ContextDecorator 将检查它所包装的函数的类型,并确保装饰器覆盖整个生命周期。在我看来,这使得上下文管理器成为创建装饰器的最佳方式!它避免了一些常见的枪法并提供了更清晰的语法。我建议更多的人开始以这种方式使用它。线程安全迭代器迭代器是现代Python 的基础之一。迭代器类型允许我们将数据源与数据消费者分开,如下所示,从而产生更清晰的抽象: 从输入中惰性导入 Iterator def stream_events ( ... ) -> Iterator [ str ]: while True : Yieldblocking_get_event ( ... ) events = Stream_events ( ... ) for event in events : Consumer ( event ) 但是,当使用线程或自由线程时,这种抽象会中断。默认情况下,迭代器不是线程安全的,因此我们可能会看到跳过的值或只是损坏的内部迭代器状态。这个问题在 3.15 中通过 threading.serialize_iterator 解决了,我们只需用它包装我们的原始迭代器,瞧: import threading events = threading 。 Serialize_iterator (stream_events(...)) 以 ThreadPoolExecutor() 作为执行器:fut1 = 执行器。提交(消费,事件)fut2=执行者。还有一个 threading.synchronized_iterator 装饰器,它只将 threading.serialize_iterator 应用于生成器函数的结果。最后,我们还有 threading.concurrent_tee,它不会分割值,而是在多个迭代器之间复制值:source1