跳转至

Inbox 模块 API 文档

Inbox 模块提供 Twitter 私信收件箱查询功能,支持获取消息更新、分页查询和用户信息获取。

目录

模块概述

Inbox 模块支持两种使用方式:

方式 1: 通过 Twitter 主入口访问(推荐)

from x_api_rs import Twitter

client = Twitter(cookies)
result = await client.inbox.get_user_updates()

方式 2: 独立创建 InboxClient

from x_api_rs.inbox import InboxClient

inbox_client = InboxClient(cookies, proxy_url="http://proxy:8080")
result = await inbox_client.get_user_updates()

功能列表

功能 方法 说明
获取消息更新 get_user_updates() 获取私信收件箱的最新消息

InboxClient 类

构造函数

InboxClient(
    cookies: str,
    proxy_url: str | None = None
)

创建新的 Inbox 客户端实例。

参数:

  • cookies (str): Twitter 账号的 cookies 字符串
  • proxy_url (str | None): 可选的代理服务器 URL

返回: InboxClient 实例

异常: RuntimeError - 初始化失败

示例:

# 基础使用
client = InboxClient(cookies)

# 使用代理
client = InboxClient(cookies, proxy_url="http://proxy:8080")

get_user_updates

async def get_user_updates(
    active_conversation_id: str | None = None,
    cursor: str | None = None
) -> UserUpdatesResult

获取用户私信更新。

参数:

  • active_conversation_id (str | None): 可选的活跃会话 ID,用于指定特定会话
  • cursor (str | None): 可选的分页游标,用于获取更多历史消息

返回: UserUpdatesResult 对象,包含消息列表和用户信息

异常: RuntimeError - 获取失败

示例:

# 获取最新消息
result = await client.inbox.get_user_updates()

if result.success:
    print(f"数据来源: {result.data_source}")
    print(f"消息数量: {len(result.entries)}")
    print(f"用户数量: {len(result.users)}")

    # 遍历消息
    for entry in result.entries:
        if entry.message:
            msg = entry.message.message_data
            print(f"[{msg.time}] {msg.sender_id}: {msg.text}")

# 分页获取更多消息
if result.cursor:
    more_result = await client.inbox.get_user_updates(cursor=result.cursor)

# 获取特定会话的消息
result = await client.inbox.get_user_updates(
    active_conversation_id="conversation_id_here"
)

类型定义

UserUpdatesResult

获取用户更新的结果。

属性:

  • success (bool): 请求是否成功
  • error_msg (str): 错误消息
  • http_status (int): HTTP 状态码
  • data_source (str | None): 数据来源标识
  • cursor (str | None): 分页游标,用于获取更多数据
  • entries (list[InboxEntry]): 消息条目列表
  • users (dict[str, InboxUser]): 用户信息映射(键为用户 ID)

示例:

result = await client.inbox.get_user_updates()

if result.success:
    # 访问消息
    for entry in result.entries:
        if entry.message:
            print(entry.message.message_data.text)

    # 访问用户信息
    for user_id, user in result.users.items():
        print(f"@{user.screen_name}: {user.name}")

    # 分页
    if result.cursor:
        print(f"还有更多数据,游标: {result.cursor}")

InboxEntry

收件箱消息条目。

属性:

  • message (InboxMessage | None): 消息对象(可能为空)

示例:

for entry in result.entries:
    if entry.message:
        # 处理消息
        print(entry.message.message_data.text)

InboxMessage

收件箱消息。

属性:

  • id (str): 消息 ID
  • time (str): 消息时间戳
  • conversation_id (str): 会话 ID
  • message_data (InboxMessageData): 消息详细数据
  • affects_sort (bool): 是否影响排序
  • request_id (str | None): 请求 ID

示例:

message = entry.message
print(f"消息 ID: {message.id}")
print(f"会话 ID: {message.conversation_id}")
print(f"时间: {message.time}")
print(f"内容: {message.message_data.text}")

InboxMessageData

消息详细数据。

属性:

  • id (str): 消息数据 ID
  • time (str): 时间戳
  • sender_id (str): 发送者用户 ID
  • recipient_id (str): 接收者用户 ID
  • text (str): 消息文本内容
  • entities (MessageEntities): 消息实体(链接、话题标签、@提及)

示例:

data = message.message_data
print(f"发送者: {data.sender_id}")
print(f"接收者: {data.recipient_id}")
print(f"内容: {data.text}")

# 检查实体
if data.entities.urls:
    print("包含链接:")
    for url in data.entities.urls:
        print(f"  - {url.expanded_url}")

MessageEntities

消息实体集合。

属性:

  • urls (list[UrlEntity]): URL 实体列表
  • hashtags (list[HashtagEntity]): 话题标签实体列表
  • user_mentions (list[UserMentionEntity]): 用户提及实体列表

示例:

entities = message.message_data.entities

# 提取所有链接
for url in entities.urls:
    print(f"链接: {url.expanded_url}")

# 提取所有话题标签
for hashtag in entities.hashtags:
    print(f"话题: #{hashtag.text}")

# 提取所有 @ 提及
for mention in entities.user_mentions:
    print(f"提及: @{mention.screen_name}")

UrlEntity

URL 实体。

属性:

  • url (str): 短链接
  • expanded_url (str): 展开后的完整 URL
  • display_url (str): 显示用的 URL

HashtagEntity

话题标签实体。

属性:

  • text (str): 话题标签文本(不含 # 符号)

UserMentionEntity

用户提及实体。

属性:

  • id_str (str | None): 用户 ID
  • screen_name (str): 用户名
  • name (str | None): 显示名称

InboxUser

收件箱用户信息。

属性:

  • id_str (str): 用户 ID(字符串形式)
  • name (str): 显示名称
  • screen_name (str): 用户名(不含 @)
  • profile_image_url (str | None): 头像 URL
  • description (str | None): 个人简介
  • followers_count (int): 粉丝数
  • friends_count (int): 关注数
  • verified (bool): 是否认证
  • is_blue_verified (bool): 是否蓝V认证
  • protected (bool): 是否私密账号

示例:

for user_id, user in result.users.items():
    print(f"用户 ID: {user.id_str}")
    print(f"用户名: @{user.screen_name}")
    print(f"显示名: {user.name}")
    print(f"粉丝数: {user.followers_count}")
    print(f"蓝V: {user.is_blue_verified}")
    print("---")

使用示例

获取最新私信

import asyncio
from x_api_rs import Twitter

async def main():
    client = Twitter(cookies)

    result = await client.inbox.get_user_updates()

    if result.success:
        print(f"获取到 {len(result.entries)} 条消息")

        for entry in result.entries:
            if entry.message:
                msg = entry.message.message_data

                # 获取发送者信息
                sender = result.users.get(msg.sender_id)
                sender_name = sender.screen_name if sender else msg.sender_id

                print(f"[@{sender_name}]: {msg.text}")
    else:
        print(f"获取失败: {result.error_msg}")

asyncio.run(main())

分页获取所有消息

async def get_all_messages(max_pages: int = 5):
    client = Twitter(cookies)

    all_entries = []
    all_users = {}
    cursor = None

    for page in range(max_pages):
        result = await client.inbox.get_user_updates(cursor=cursor)

        if not result.success:
            print(f"获取第 {page + 1} 页失败: {result.error_msg}")
            break

        all_entries.extend(result.entries)
        all_users.update(result.users)

        print(f"第 {page + 1} 页: 获取 {len(result.entries)} 条消息")

        if not result.cursor:
            print("没有更多数据")
            break

        cursor = result.cursor
        await asyncio.sleep(1)  # 避免限流

    print(f"总计获取 {len(all_entries)} 条消息")
    return all_entries, all_users

查找特定用户的消息

async def find_messages_from_user(target_user_id: str):
    client = Twitter(cookies)

    result = await client.inbox.get_user_updates()

    if result.success:
        user_messages = []

        for entry in result.entries:
            if entry.message:
                msg_data = entry.message.message_data
                if msg_data.sender_id == target_user_id:
                    user_messages.append(msg_data)

        # 获取用户信息
        user = result.users.get(target_user_id)
        user_name = user.screen_name if user else target_user_id

        print(f"来自 @{user_name} 的消息:")
        for msg in user_messages:
            print(f"  [{msg.time}] {msg.text}")

        return user_messages

提取消息中的链接

async def extract_links():
    client = Twitter(cookies)

    result = await client.inbox.get_user_updates()

    if result.success:
        links = []

        for entry in result.entries:
            if entry.message:
                entities = entry.message.message_data.entities
                for url in entities.urls:
                    links.append({
                        "short_url": url.url,
                        "expanded_url": url.expanded_url,
                        "display_url": url.display_url,
                        "sender_id": entry.message.message_data.sender_id
                    })

        print(f"找到 {len(links)} 个链接:")
        for link in links:
            print(f"  - {link['expanded_url']}")

        return links

统计会话信息

async def analyze_conversations():
    client = Twitter(cookies)

    result = await client.inbox.get_user_updates()

    if result.success:
        # 按会话分组
        conversations = {}

        for entry in result.entries:
            if entry.message:
                conv_id = entry.message.conversation_id
                if conv_id not in conversations:
                    conversations[conv_id] = []
                conversations[conv_id].append(entry.message)

        print(f"共有 {len(conversations)} 个会话:")
        for conv_id, messages in conversations.items():
            print(f"  会话 {conv_id[:8]}...: {len(messages)} 条消息")

        # 统计用户
        print(f"\n涉及 {len(result.users)} 个用户:")
        for user_id, user in result.users.items():
            verified = "✓" if user.is_blue_verified else ""
            print(f"  @{user.screen_name} {verified}: {user.followers_count} 粉丝")

最佳实践

1. 验证操作结果

result = await client.inbox.get_user_updates()

if result.success:
    # 处理数据
    for entry in result.entries:
        if entry.message:  # 检查 message 是否存在
            process_message(entry.message)
else:
    print(f"Error: {result.error_msg}")

2. 正确使用分页

# 使用游标分页获取更多数据
cursor = None
while True:
    result = await client.inbox.get_user_updates(cursor=cursor)

    if not result.success:
        break

    process_entries(result.entries)

    if not result.cursor:
        break  # 没有更多数据

    cursor = result.cursor
    await asyncio.sleep(1)  # 添加延迟

3. 高效查找用户信息

# 用户信息以字典形式提供,可以高效查找
result = await client.inbox.get_user_updates()

for entry in result.entries:
    if entry.message:
        sender_id = entry.message.message_data.sender_id
        # O(1) 时间复杂度查找
        sender = result.users.get(sender_id)
        if sender:
            print(f"@{sender.screen_name}: {entry.message.message_data.text}")

4. 处理消息实体

def process_message_entities(msg_data):
    """处理消息中的实体"""
    entities = msg_data.entities

    # 替换短链接为完整链接
    text = msg_data.text
    for url in entities.urls:
        text = text.replace(url.url, url.expanded_url)

    # 高亮话题标签
    for hashtag in entities.hashtags:
        text = text.replace(f"#{hashtag.text}", f"[#{hashtag.text}]")

    return text

5. 缓存用户信息

class InboxCache:
    def __init__(self):
        self.users = {}

    async def get_updates(self, client):
        result = await client.inbox.get_user_updates()

        if result.success:
            # 合并用户信息到缓存
            self.users.update(result.users)

        return result

    def get_user(self, user_id: str):
        return self.users.get(user_id)

常见问题

Q1: 为什么 entries 列表为空?

可能的原因:

  • 收件箱中没有私信
  • 认证 cookies 已过期
  • 账号被限制访问私信功能

Q2: 如何获取特定会话的消息?

使用 active_conversation_id 参数:

result = await client.inbox.get_user_updates(
    active_conversation_id="conversation_id_here"
)

Q3: cursor 是什么?什么时候使用?

cursor 是分页游标,用于获取更多历史消息:

# 第一次请求
result1 = await client.inbox.get_user_updates()

# 使用游标获取更多
if result1.cursor:
    result2 = await client.inbox.get_user_updates(cursor=result1.cursor)

Q4: 为什么 entry.message 可能为 None?

收件箱条目不一定都是消息,可能包含其他类型的事件(如系统通知)。始终检查:

for entry in result.entries:
    if entry.message:  # 确保是消息
        process(entry.message)

Q5: 用户 ID 为什么是字符串?

为避免 Python 处理大整数时的精度问题,用户 ID 以字符串形式返回。在比较时使用字符串比较:

# ✅ 正确
if msg.sender_id == "44196397":
    print("Message from Elon")

# ❌ 避免转换为整数
# if int(msg.sender_id) == 44196397:

Q6: 如何判断消息是收到的还是发出的?

比较 sender_id 和当前用户 ID:

my_user_id = "your_user_id"  # 当前登录用户的 ID

for entry in result.entries:
    if entry.message:
        msg = entry.message.message_data
        if msg.sender_id == my_user_id:
            print(f"发出: {msg.text}")
        else:
            print(f"收到: {msg.text}")

下一步