Examples

Where to get examples

Get feed list

Here is an example (with zh-CN comments!) about how to:

  1. initialize a login manager and an API instance

  2. register hooks

  3. start a fetch process and block until all feeds are processed

 1import asyncio
 2import logging
 3from os import environ as env
 4
 5from aiohttp import ClientSession
 6from aioqzone.api import UpLoginConfig, UpLoginManager
 7from tenacity import RetryError
 8
 9from aioqzone_feed.api import FeedApi
10
11n_dropped = 0
12log = logging.getLogger(__name__)
13
14
15async def amain():
16    async with ClientSession() as client:
17        # 实例化一个账密登录器。目前来说二维码登录器要更稳妥一些。
18        # 如果不改动代码的话,那么需要设置 uin 和 pwd 两个环境变量。
19        loginable = UpLoginManager(client, UpLoginConfig.model_validate(env))
20        # 实例化一个API对象
21        api = FeedApi(client, loginable)
22
23        # 接收舍弃的动态。程序中有一些内建的规则,您也可以重载 drop_rule. 舍弃的动态包含的字段要少一些。
24        api.feed_dropped.add_impl(log_dropped_feeds)
25        # 回调可以有多个,同步异步函数都可以,会按顺序调用。异常的回调会被忽略,之后的继续执行。
26        api.feed_dropped.add_impl(drop_statistic)
27
28        # 当然,也可以用装饰器的写法。
29        @api.feed_processed.add_impl
30        async def send_to_user(bid: int, feed):
31            if bid != batch_id:
32                # 如果不知道怎么处理的话,丢掉就好了
33                return
34
35            # 这里你就得到一条动态了,可以做你想做的操作
36            await send_feed(feed)
37
38        # =======================================================
39        # =======================================================
40        # 以上都是准备工作,可以在初始化的时候完成。下面是每次爬取都要做的。
41
42        # 获取一个 batch id,可以用来判断一个回调是哪次爬取触发的
43        # 这是一个可选的步骤。不过还是推荐这样做。
44        batch_id = api.new_batch()
45
46        try:
47            # 爬取三天内的动态
48            n = await api.get_feeds_by_second(3 * 86400)
49            # 爬取到了多少动态可以马上返回,但动态的处理结果是通过回调下发的
50            # 也就是说,n 表示总共爬取了多少条动态
51            # 但动态内容需要注册 feed_processed 来获取
52        except RetryError as e:
53            log.error("登录错误", exc_info=e.last_attempt.exception())
54            return
55
56        # 调用 wait 表示阻塞等待所有动态处理完毕
57        # 在此期间 feed_processed 和 feed_dropped 两种信号会不断被发送,直到所有动态处理完毕
58        await api.wait()
59        # 到这里 一个爬取流程就结束了
60
61
62async def log_dropped_feeds(bid: int, feed):
63    log.debug(feed)
64
65
66def drop_statistic(bid: int, feed):
67    global n_dropped
68    n_dropped += 1
69
70
71if __name__ == "__main__":
72    asyncio.run(amain())