Inbox 模块 API 文档¶
Inbox 模块提供 Twitter 私信收件箱查询功能,支持获取消息更新、分页查询和用户信息获取。
目录¶
模块概述¶
Inbox 模块支持两种使用方式:
方式 1: 通过 Twitter 主入口访问(推荐)¶
from x_api_rs import Twitter
client = await Twitter.create(cookies)
result = await client.inbox.get_user_updates()
方式 2: 独立创建 InboxClient¶
from x_api_rs.inbox import InboxClient
inbox_client = InboxClient(cookies, proxy_url="http://proxy:8080")
result = await inbox_client.get_user_updates()
功能列表¶
| 功能 | 方法 | 说明 |
|---|---|---|
| 获取消息更新 | get_user_updates() |
获取私信收件箱的最新消息(旧版 REST API) |
| 获取对话列表 | get_conversations() |
获取 XChat 对话列表(GraphQL API,支持分页) |
| 获取对话总数 | get_conversation_count() |
自动翻页统计对话总数量 |
InboxClient 类¶
构造函数¶
创建新的 Inbox 客户端实例。
参数:
cookies(str): Twitter 账号的 cookies 字符串proxy_url(str | None): 可选的代理服务器 URL
返回: InboxClient 实例
异常: RuntimeError - 初始化失败
示例:
# 基础使用
client = InboxClient(cookies)
# 使用代理
client = InboxClient(cookies, proxy_url="http://proxy:8080")
get_user_updates¶
async def get_user_updates(
active_conversation_id: str | None = None,
cursor: str | None = None
) -> UserUpdatesResult
获取用户私信更新。
参数:
active_conversation_id(str | None): 可选的活跃会话 ID,用于指定特定会话cursor(str | None): 可选的分页游标,用于获取更多历史消息
返回: UserUpdatesResult 对象,包含消息列表和用户信息
异常: RuntimeError - 获取失败
示例:
# 获取最新消息
result = await client.inbox.get_user_updates()
if result.success:
print(f"数据来源: {result.data_source}")
print(f"消息数量: {len(result.entries)}")
print(f"用户数量: {len(result.users)}")
# 遍历消息
for entry in result.entries:
if entry.message:
msg = entry.message.message_data
print(f"[{msg.time}] {msg.sender_id}: {msg.text}")
# 分页获取更多消息
if result.cursor:
more_result = await client.inbox.get_user_updates(cursor=result.cursor)
# 获取特定会话的消息
result = await client.inbox.get_user_updates(
active_conversation_id="conversation_id_here"
)
get_conversations¶
async def get_conversations(
cursor: InboxCursor | None = None,
conversation_limit: int | None = None
) -> ConversationListResult
获取 XChat 对话列表(单页)。使用 GraphQL API,返回结构化的对话数据,包含参与者信息、最近消息和已读状态。
参数:
cursor(InboxCursor | None): 分页游标,首次调用传 None,后续翻页传上一页返回的 cursorconversation_limit(int | None): 每页对话数量,默认 20
返回: ConversationListResult 对象
异常: RuntimeError - 请求失败
示例:
# 获取首页对话
result = await client.inbox.get_conversations()
if result.success:
print(f"获取到 {result.count} 个对话")
for conv in result.conversations:
print(f"对话 {conv.conversation_id}:")
for p in conv.participants:
print(f" - @{p.screen_name}")
if conv.latest_message and conv.latest_message.text:
print(f" 最近: {conv.latest_message.text[:50]}")
# 翻页
if result.cursor:
page2 = await client.inbox.get_conversations(cursor=result.cursor)
# 自定义每页数量
result = await client.inbox.get_conversations(conversation_limit=10)
get_conversation_count¶
获取对话总数量。自动翻页直到获取所有对话,返回总数量。
返回: ConversationCountResult 对象
异常: RuntimeError - 请求失败
示例:
result = await client.inbox.get_conversation_count()
if result.success:
print(f"对话总数: {result.total_count}")
print(f"翻页次数: {result.pages_fetched}")
类型定义¶
UserUpdatesResult¶
获取用户更新的结果。
属性:
success(bool): 请求是否成功error_msg(str): 错误消息http_status(int): HTTP 状态码data_source(str | None): 数据来源标识cursor(str | None): 分页游标,用于获取更多数据entries(list[InboxEntry]): 消息条目列表users(dict[str, InboxUser]): 用户信息映射(键为用户 ID)
示例:
result = await client.inbox.get_user_updates()
if result.success:
# 访问消息
for entry in result.entries:
if entry.message:
print(entry.message.message_data.text)
# 访问用户信息
for user_id, user in result.users.items():
print(f"@{user.screen_name}: {user.name}")
# 分页
if result.cursor:
print(f"还有更多数据,游标: {result.cursor}")
InboxEntry¶
收件箱消息条目。
属性:
message(InboxMessage | None): 消息对象(可能为空)
示例:
InboxMessage¶
收件箱消息。
属性:
id(str): 消息 IDtime(str): 消息时间戳conversation_id(str): 会话 IDmessage_data(InboxMessageData): 消息详细数据affects_sort(bool): 是否影响排序request_id(str | None): 请求 ID
示例:
message = entry.message
print(f"消息 ID: {message.id}")
print(f"会话 ID: {message.conversation_id}")
print(f"时间: {message.time}")
print(f"内容: {message.message_data.text}")
InboxMessageData¶
消息详细数据。
属性:
id(str): 消息数据 IDtime(str): 时间戳sender_id(str): 发送者用户 IDrecipient_id(str): 接收者用户 IDtext(str): 消息文本内容entities(MessageEntities): 消息实体(链接、话题标签、@提及)
示例:
data = message.message_data
print(f"发送者: {data.sender_id}")
print(f"接收者: {data.recipient_id}")
print(f"内容: {data.text}")
# 检查实体
if data.entities.urls:
print("包含链接:")
for url in data.entities.urls:
print(f" - {url.expanded_url}")
MessageEntities¶
消息实体集合。
属性:
urls(list[UrlEntity]): URL 实体列表hashtags(list[HashtagEntity]): 话题标签实体列表user_mentions(list[UserMentionEntity]): 用户提及实体列表
示例:
entities = message.message_data.entities
# 提取所有链接
for url in entities.urls:
print(f"链接: {url.expanded_url}")
# 提取所有话题标签
for hashtag in entities.hashtags:
print(f"话题: #{hashtag.text}")
# 提取所有 @ 提及
for mention in entities.user_mentions:
print(f"提及: @{mention.screen_name}")
UrlEntity¶
URL 实体。
属性:
url(str): 短链接expanded_url(str): 展开后的完整 URLdisplay_url(str): 显示用的 URL
HashtagEntity¶
话题标签实体。
属性:
text(str): 话题标签文本(不含 # 符号)
UserMentionEntity¶
用户提及实体。
属性:
id_str(str | None): 用户 IDscreen_name(str): 用户名name(str | None): 显示名称
InboxUser¶
收件箱用户信息。
属性:
id_str(str): 用户 ID(字符串形式)name(str): 显示名称screen_name(str): 用户名(不含 @)profile_image_url(str | None): 头像 URLdescription(str | None): 个人简介followers_count(int): 粉丝数friends_count(int): 关注数verified(bool): 是否认证is_blue_verified(bool): 是否蓝V认证protected(bool): 是否私密账号
示例:
for user_id, user in result.users.items():
print(f"用户 ID: {user.id_str}")
print(f"用户名: @{user.screen_name}")
print(f"显示名: {user.name}")
print(f"粉丝数: {user.followers_count}")
print(f"蓝V: {user.is_blue_verified}")
print("---")
InboxCursor¶
XChat 分页游标,用于对话列表翻页。
属性:
cursor_id(str): 游标 IDgraph_snapshot_id(str): 图快照 ID
构造函数:
from x_api_rs.inbox import InboxCursor
cursor = InboxCursor(cursor_id="xxx", graph_snapshot_id="yyy")
ConversationParticipant¶
对话参与者的用户信息。
属性:
rest_id(str): 用户 IDname(str): 显示名称screen_name(str): 用户名(不含 @)avatar_url(str | None): 头像 URLis_blue_verified(bool): 是否蓝V认证verified(bool): 是否认证protected(bool): 是否私密账号suspended(bool): 是否被封禁can_dm(bool): 是否可以私信created_at_ms(int | None): 账号创建时间(毫秒时间戳)
ReadEvent¶
已读事件,表示某参与者已读到的消息位置。
属性:
participant_id(str): 参与者用户 IDevent_id(str | None): 已读到的事件/消息 IDtimestamp(str | None): 已读时间戳
LatestMessage¶
对话中的最近消息(从 Thrift 二进制解码)。
属性:
event_id(str | None): 消息事件 IDsender_id(str | None): 发送者用户 IDconversation_id(str | None): 所属对话 IDtimestamp(str | None): 发送时间戳text(str | None): 消息文本内容
ConversationItem¶
单个对话条目。
属性:
conversation_id(str): 对话 IDis_muted(bool): 是否静音has_more(bool): 是否还有更多历史消息participants(list[ConversationParticipant]): 参与者列表latest_message(LatestMessage | None): 最近消息read_events(list[ReadEvent]): 已读事件列表
示例:
for conv in result.conversations:
print(f"对话: {conv.conversation_id}")
print(f"参与者: {[p.screen_name for p in conv.participants]}")
if conv.latest_message and conv.latest_message.text:
print(f"最近消息: {conv.latest_message.text}")
for re in conv.read_events:
print(f"已读: {re.participant_id} -> {re.event_id}")
ConversationListResult¶
对话列表查询结果。
属性:
success(bool): 请求是否成功error_msg(str): 错误消息http_status(int): HTTP 状态码has_message_requests(bool): 是否有待处理的消息请求conversations(list[ConversationItem]): 对话列表cursor(InboxCursor | None): 下一页游标(None 表示已到最后一页)count(int): 当前页对话数量
ConversationCountResult¶
对话数量统计结果。
属性:
success(bool): 请求是否成功error_msg(str): 错误消息total_count(int): 对话总数量pages_fetched(int): 翻页次数
使用示例¶
获取最新私信¶
import asyncio
from x_api_rs import Twitter
async def main():
client = await Twitter.create(cookies)
result = await client.inbox.get_user_updates()
if result.success:
print(f"获取到 {len(result.entries)} 条消息")
for entry in result.entries:
if entry.message:
msg = entry.message.message_data
# 获取发送者信息
sender = result.users.get(msg.sender_id)
sender_name = sender.screen_name if sender else msg.sender_id
print(f"[@{sender_name}]: {msg.text}")
else:
print(f"获取失败: {result.error_msg}")
asyncio.run(main())
分页获取所有消息¶
async def get_all_messages(max_pages: int = 5):
client = await Twitter.create(cookies)
all_entries = []
all_users = {}
cursor = None
for page in range(max_pages):
result = await client.inbox.get_user_updates(cursor=cursor)
if not result.success:
print(f"获取第 {page + 1} 页失败: {result.error_msg}")
break
all_entries.extend(result.entries)
all_users.update(result.users)
print(f"第 {page + 1} 页: 获取 {len(result.entries)} 条消息")
if not result.cursor:
print("没有更多数据")
break
cursor = result.cursor
await asyncio.sleep(1) # 避免限流
print(f"总计获取 {len(all_entries)} 条消息")
return all_entries, all_users
获取对话列表并显示摘要¶
import asyncio
from x_api_rs import Twitter
async def show_conversations():
client = await Twitter.create(cookies)
result = await client.inbox.get_conversations()
if result.success:
print(f"共 {result.count} 个对话")
for conv in result.conversations:
# 获取对方用户名
names = [p.screen_name for p in conv.participants]
muted = " [静音]" if conv.is_muted else ""
# 显示最近消息预览
preview = ""
if conv.latest_message and conv.latest_message.text:
preview = conv.latest_message.text[:40]
print(f" {', '.join(names)}{muted}: {preview}")
asyncio.run(show_conversations())
统计对话总数¶
async def count_conversations():
client = await Twitter.create(cookies)
result = await client.inbox.get_conversation_count()
if result.success:
print(f"对话总数: {result.total_count}")
print(f"请求页数: {result.pages_fetched}")
asyncio.run(count_conversations())
遍历所有对话(手动分页)¶
async def list_all_conversations():
client = await Twitter.create(cookies)
all_conversations = []
cursor = None
page = 0
while True:
page += 1
result = await client.inbox.get_conversations(cursor=cursor)
if not result.success:
print(f"第 {page} 页获取失败: {result.error_msg}")
break
all_conversations.extend(result.conversations)
print(f"第 {page} 页: {result.count} 个对话")
if not result.cursor:
break
cursor = result.cursor
print(f"总计: {len(all_conversations)} 个对话")
return all_conversations
查找特定用户的消息¶
async def find_messages_from_user(target_user_id: str):
client = await Twitter.create(cookies)
result = await client.inbox.get_user_updates()
if result.success:
user_messages = []
for entry in result.entries:
if entry.message:
msg_data = entry.message.message_data
if msg_data.sender_id == target_user_id:
user_messages.append(msg_data)
# 获取用户信息
user = result.users.get(target_user_id)
user_name = user.screen_name if user else target_user_id
print(f"来自 @{user_name} 的消息:")
for msg in user_messages:
print(f" [{msg.time}] {msg.text}")
return user_messages
提取消息中的链接¶
async def extract_links():
client = await Twitter.create(cookies)
result = await client.inbox.get_user_updates()
if result.success:
links = []
for entry in result.entries:
if entry.message:
entities = entry.message.message_data.entities
for url in entities.urls:
links.append({
"short_url": url.url,
"expanded_url": url.expanded_url,
"display_url": url.display_url,
"sender_id": entry.message.message_data.sender_id
})
print(f"找到 {len(links)} 个链接:")
for link in links:
print(f" - {link['expanded_url']}")
return links
统计会话信息¶
async def analyze_conversations():
client = await Twitter.create(cookies)
result = await client.inbox.get_user_updates()
if result.success:
# 按会话分组
conversations = {}
for entry in result.entries:
if entry.message:
conv_id = entry.message.conversation_id
if conv_id not in conversations:
conversations[conv_id] = []
conversations[conv_id].append(entry.message)
print(f"共有 {len(conversations)} 个会话:")
for conv_id, messages in conversations.items():
print(f" 会话 {conv_id[:8]}...: {len(messages)} 条消息")
# 统计用户
print(f"\n涉及 {len(result.users)} 个用户:")
for user_id, user in result.users.items():
verified = "✓" if user.is_blue_verified else ""
print(f" @{user.screen_name} {verified}: {user.followers_count} 粉丝")
最佳实践¶
1. 验证操作结果¶
result = await client.inbox.get_user_updates()
if result.success:
# 处理数据
for entry in result.entries:
if entry.message: # 检查 message 是否存在
process_message(entry.message)
else:
print(f"Error: {result.error_msg}")
2. 正确使用分页¶
# 使用游标分页获取更多数据
cursor = None
while True:
result = await client.inbox.get_user_updates(cursor=cursor)
if not result.success:
break
process_entries(result.entries)
if not result.cursor:
break # 没有更多数据
cursor = result.cursor
await asyncio.sleep(1) # 添加延迟
3. 高效查找用户信息¶
# 用户信息以字典形式提供,可以高效查找
result = await client.inbox.get_user_updates()
for entry in result.entries:
if entry.message:
sender_id = entry.message.message_data.sender_id
# O(1) 时间复杂度查找
sender = result.users.get(sender_id)
if sender:
print(f"@{sender.screen_name}: {entry.message.message_data.text}")
4. 处理消息实体¶
def process_message_entities(msg_data):
"""处理消息中的实体"""
entities = msg_data.entities
# 替换短链接为完整链接
text = msg_data.text
for url in entities.urls:
text = text.replace(url.url, url.expanded_url)
# 高亮话题标签
for hashtag in entities.hashtags:
text = text.replace(f"#{hashtag.text}", f"[#{hashtag.text}]")
return text
5. 缓存用户信息¶
class InboxCache:
def __init__(self):
self.users = {}
async def get_updates(self, client):
result = await client.inbox.get_user_updates()
if result.success:
# 合并用户信息到缓存
self.users.update(result.users)
return result
def get_user(self, user_id: str):
return self.users.get(user_id)
常见问题¶
Q1: 为什么 entries 列表为空?¶
可能的原因:
- 收件箱中没有私信
- 认证 cookies 已过期
- 账号被限制访问私信功能
Q2: 如何获取特定会话的消息?¶
使用 active_conversation_id 参数:
Q3: cursor 是什么?什么时候使用?¶
cursor 是分页游标,用于获取更多历史消息:
# 第一次请求
result1 = await client.inbox.get_user_updates()
# 使用游标获取更多
if result1.cursor:
result2 = await client.inbox.get_user_updates(cursor=result1.cursor)
Q4: 为什么 entry.message 可能为 None?¶
收件箱条目不一定都是消息,可能包含其他类型的事件(如系统通知)。始终检查:
Q5: 用户 ID 为什么是字符串?¶
为避免 Python 处理大整数时的精度问题,用户 ID 以字符串形式返回。在比较时使用字符串比较:
# ✅ 正确
if msg.sender_id == "44196397":
print("Message from Elon")
# ❌ 避免转换为整数
# if int(msg.sender_id) == 44196397:
Q6: 如何判断消息是收到的还是发出的?¶
比较 sender_id 和当前用户 ID:
my_user_id = "your_user_id" # 当前登录用户的 ID
for entry in result.entries:
if entry.message:
msg = entry.message.message_data
if msg.sender_id == my_user_id:
print(f"发出: {msg.text}")
else:
print(f"收到: {msg.text}")
Q7: get_user_updates() 和 get_conversations() 有什么区别?¶
| 特性 | get_user_updates() |
get_conversations() |
|---|---|---|
| API 类型 | 旧版 REST API | XChat GraphQL API |
| 返回数据 | 消息列表 + 用户信息 | 对话列表 + 参与者 + 最近消息 |
| 分页方式 | 字符串 cursor | InboxCursor 对象 |
| 消息详情 | 完整消息内容 | 最近一条消息摘要(Thrift 解码) |
| 推荐场景 | 获取具体消息内容 | 获取对话概览、统计对话数 |
Q8: InboxCursor 和普通字符串 cursor 有什么区别?¶
InboxCursor 包含两个字段 cursor_id 和 graph_snapshot_id,用于 XChat GraphQL API 分页。普通字符串 cursor 用于旧版 REST API。两者不可互换。
# XChat API 分页
result = await client.inbox.get_conversations()
if result.cursor: # InboxCursor 对象
next_page = await client.inbox.get_conversations(cursor=result.cursor)
# 旧版 REST API 分页
result = await client.inbox.get_user_updates()
if result.cursor: # 字符串
next_page = await client.inbox.get_user_updates(cursor=result.cursor)