Inbox 收件箱模块示例¶
本文档提供 Inbox 收件箱模块的完整使用示例。
目录¶
前置准备¶
安装依赖¶
创建客户端¶
import x_api
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 或使用代理
twitter = x_api.Twitter(cookies, "http://127.0.0.1:7890")
获取最新私信¶
获取收件箱中的最新消息。
import asyncio
import x_api
async def get_latest_messages():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
print(f"📬 收件箱更新")
print(f" 数据来源: {result.data_source}")
print(f" 消息数量: {len(result.entries)}")
print(f" 用户数量: {len(result.users)}")
# 遍历消息
print(f"\n📨 最新消息:")
for entry in result.entries:
if entry.message:
msg = entry.message.message_data
# 获取发送者信息
sender = result.users.get(msg.sender_id)
sender_name = f"@{sender.screen_name}" if sender else msg.sender_id
print(f" [{msg.time}] {sender_name}: {msg.text[:50]}...")
else:
print(f"❌ 获取失败: {result.error_msg}")
print(f" HTTP 状态码: {result.http_status}")
asyncio.run(get_latest_messages())
输出示例:
📬 收件箱更新
数据来源: rpc
消息数量: 15
用户数量: 8
📨 最新消息:
[1705123456789] @user1: 你好!最近怎么样?...
[1705123456000] @user2: 收到,我明天给你回复...
[1705123400000] @user3: 感谢你的帮助!...
简单的消息列表¶
import asyncio
import x_api
async def list_messages():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
for i, entry in enumerate(result.entries[:10], 1):
if entry.message:
msg = entry.message.message_data
sender = result.users.get(msg.sender_id)
name = sender.screen_name if sender else "Unknown"
print(f"{i}. @{name}: {msg.text}")
asyncio.run(list_messages())
分页获取历史消息¶
使用游标分页获取更多历史消息。
import asyncio
import x_api
async def get_all_messages():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
all_entries = []
all_users = {}
cursor = None
max_pages = 5
print(f"📥 开始获取收件箱消息...")
for page in range(max_pages):
result = await twitter.inbox.get_user_updates(cursor=cursor)
if not result.success:
print(f"❌ 第 {page + 1} 页获取失败: {result.error_msg}")
break
# 累积数据
all_entries.extend(result.entries)
all_users.update(result.users)
print(f" 第 {page + 1} 页: {len(result.entries)} 条消息")
# 检查是否还有更多
if not result.cursor:
print(" ✅ 已获取全部消息")
break
cursor = result.cursor
await asyncio.sleep(1) # 避免限流
print(f"\n📊 总计:")
print(f" 消息: {len(all_entries)} 条")
print(f" 用户: {len(all_users)} 人")
return all_entries, all_users
asyncio.run(get_all_messages())
输出示例:
获取特定会话消息¶
获取与特定用户的会话消息。
import asyncio
import x_api
async def get_conversation_messages():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
# 指定会话 ID
conversation_id = "123456789-987654321"
result = await twitter.inbox.get_user_updates(
active_conversation_id=conversation_id
)
if result.success:
print(f"💬 会话 {conversation_id[:16]}... 的消息:")
for entry in result.entries:
if entry.message:
if entry.message.conversation_id == conversation_id:
msg = entry.message.message_data
sender = result.users.get(msg.sender_id)
name = sender.screen_name if sender else msg.sender_id
print(f" @{name}: {msg.text}")
else:
print(f"❌ 获取失败: {result.error_msg}")
asyncio.run(get_conversation_messages())
查找特定用户的消息¶
import asyncio
import x_api
async def find_messages_from_user(target_user_id: str):
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
# 筛选来自目标用户的消息
user_messages = []
for entry in result.entries:
if entry.message:
msg_data = entry.message.message_data
if msg_data.sender_id == target_user_id:
user_messages.append({
"time": msg_data.time,
"text": msg_data.text,
"conversation_id": entry.message.conversation_id
})
# 获取用户信息
user = result.users.get(target_user_id)
user_name = user.screen_name if user else target_user_id
print(f"📨 来自 @{user_name} 的消息 ({len(user_messages)} 条):")
for msg in user_messages[:10]:
print(f" [{msg['time']}] {msg['text'][:50]}...")
return user_messages
asyncio.run(find_messages_from_user("44196397"))
解析消息内容¶
解析消息的详细信息。
import asyncio
import x_api
from datetime import datetime
async def parse_message_details():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
for entry in result.entries[:5]:
if entry.message:
message = entry.message
msg_data = message.message_data
print(f"\n{'='*50}")
print(f"📝 消息详情")
print(f"{'='*50}")
# 基本信息
print(f"消息 ID: {message.id}")
print(f"会话 ID: {message.conversation_id}")
# 时间处理
try:
timestamp = int(msg_data.time) / 1000
dt = datetime.fromtimestamp(timestamp)
print(f"发送时间: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
except:
print(f"时间戳: {msg_data.time}")
# 发送者和接收者
sender = result.users.get(msg_data.sender_id)
recipient = result.users.get(msg_data.recipient_id)
sender_name = sender.screen_name if sender else msg_data.sender_id
recipient_name = recipient.screen_name if recipient else msg_data.recipient_id
print(f"发送者: @{sender_name}")
print(f"接收者: @{recipient_name}")
# 消息内容
print(f"内容: {msg_data.text}")
# 其他属性
print(f"影响排序: {message.affects_sort}")
if message.request_id:
print(f"请求 ID: {message.request_id}")
asyncio.run(parse_message_details())
输出示例:
==================================================
📝 消息详情
==================================================
消息 ID: 1234567890123456789
会话 ID: 44196397-123456789
发送时间: 2024-01-13 15:30:45
发送者: @elonmusk
接收者: @myaccount
内容: Hello! How are you?
影响排序: True
提取消息实体¶
提取消息中的链接、话题标签和 @ 提及。
提取链接¶
import asyncio
import x_api
async def extract_links():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
all_links = []
for entry in result.entries:
if entry.message:
entities = entry.message.message_data.entities
sender_id = entry.message.message_data.sender_id
sender = result.users.get(sender_id)
for url in entities.urls:
all_links.append({
"sender": sender.screen_name if sender else sender_id,
"short_url": url.url,
"expanded_url": url.expanded_url,
"display_url": url.display_url
})
print(f"🔗 找到 {len(all_links)} 个链接:")
for link in all_links[:10]:
print(f" @{link['sender']}: {link['expanded_url']}")
return all_links
asyncio.run(extract_links())
提取话题标签¶
import asyncio
import x_api
from collections import Counter
async def extract_hashtags():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
all_hashtags = []
for entry in result.entries:
if entry.message:
entities = entry.message.message_data.entities
for hashtag in entities.hashtags:
all_hashtags.append(hashtag.text)
# 统计热门话题
hashtag_counts = Counter(all_hashtags)
print(f"#️⃣ 话题标签统计:")
for tag, count in hashtag_counts.most_common(10):
print(f" #{tag}: {count} 次")
return hashtag_counts
asyncio.run(extract_hashtags())
提取 @ 提及¶
import asyncio
import x_api
async def extract_mentions():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
all_mentions = []
for entry in result.entries:
if entry.message:
entities = entry.message.message_data.entities
msg_text = entry.message.message_data.text
for mention in entities.user_mentions:
all_mentions.append({
"user_id": mention.id_str,
"screen_name": mention.screen_name,
"name": mention.name,
"context": msg_text[:50]
})
print(f"👥 用户提及 ({len(all_mentions)} 个):")
for mention in all_mentions[:10]:
print(f" @{mention['screen_name']}: {mention['context']}...")
return all_mentions
asyncio.run(extract_mentions())
综合提取所有实体¶
import asyncio
import x_api
async def extract_all_entities():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
print(f"📊 消息实体分析\n")
for entry in result.entries[:5]:
if entry.message:
msg = entry.message.message_data
entities = msg.entities
sender = result.users.get(msg.sender_id)
print(f"{'='*40}")
print(f"发送者: @{sender.screen_name if sender else msg.sender_id}")
print(f"内容: {msg.text[:60]}...")
# 链接
if entities.urls:
print(f"🔗 链接 ({len(entities.urls)}):")
for url in entities.urls:
print(f" - {url.display_url}")
# 话题
if entities.hashtags:
tags = [f"#{h.text}" for h in entities.hashtags]
print(f"#️⃣ 话题: {', '.join(tags)}")
# 提及
if entities.user_mentions:
mentions = [f"@{m.screen_name}" for m in entities.user_mentions]
print(f"👥 提及: {', '.join(mentions)}")
print()
asyncio.run(extract_all_entities())
获取用户信息¶
访问与消息相关的用户信息。
import asyncio
import x_api
async def get_inbox_users():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
print(f"👥 收件箱中的用户 ({len(result.users)} 人):\n")
# 按粉丝数排序
sorted_users = sorted(
result.users.values(),
key=lambda u: u.followers_count,
reverse=True
)
for user in sorted_users:
# 认证标识
badges = []
if user.is_blue_verified:
badges.append("✓蓝V")
if user.verified:
badges.append("✓认证")
if user.protected:
badges.append("🔒私密")
badge_str = f" [{', '.join(badges)}]" if badges else ""
print(f"@{user.screen_name}{badge_str}")
print(f" 显示名: {user.name}")
print(f" 粉丝: {user.followers_count:,} | 关注: {user.friends_count:,}")
if user.description:
print(f" 简介: {user.description[:50]}...")
print()
asyncio.run(get_inbox_users())
输出示例:
👥 收件箱中的用户 (8 人):
@elonmusk [✓蓝V]
显示名: Elon Musk
粉丝: 210,345,678 | 关注: 876
简介: Read @America...
@user123
显示名: John Doe
粉丝: 1,234 | 关注: 567
简介: Developer | Tech enthusiast...
快速查找用户¶
import asyncio
import x_api
async def lookup_user_from_inbox(target_username: str):
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
# 在用户列表中查找
for user_id, user in result.users.items():
if user.screen_name.lower() == target_username.lower():
print(f"✅ 找到用户 @{user.screen_name}")
print(f" ID: {user.id_str}")
print(f" 名称: {user.name}")
print(f" 粉丝: {user.followers_count:,}")
print(f" 蓝V: {'是' if user.is_blue_verified else '否'}")
return user
print(f"❌ 未在收件箱中找到 @{target_username}")
return None
asyncio.run(lookup_user_from_inbox("elonmusk"))
统计和分析¶
对收件箱数据进行统计分析。
会话统计¶
import asyncio
import x_api
from collections import defaultdict
async def analyze_conversations():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
# 按会话分组
conversations = defaultdict(list)
for entry in result.entries:
if entry.message:
conv_id = entry.message.conversation_id
conversations[conv_id].append(entry.message)
print(f"📊 会话统计")
print(f" 总会话数: {len(conversations)}")
print(f" 总消息数: {len(result.entries)}")
print(f"\n💬 各会话详情:")
for conv_id, messages in sorted(
conversations.items(),
key=lambda x: len(x[1]),
reverse=True
)[:5]:
# 获取会话参与者
participants = set()
for msg in messages:
participants.add(msg.message_data.sender_id)
participants.add(msg.message_data.recipient_id)
# 获取参与者名称
participant_names = []
for pid in participants:
user = result.users.get(pid)
if user:
participant_names.append(f"@{user.screen_name}")
print(f"\n 会话: {conv_id[:16]}...")
print(f" 消息数: {len(messages)}")
print(f" 参与者: {', '.join(participant_names)}")
asyncio.run(analyze_conversations())
消息频率统计¶
import asyncio
import x_api
from collections import Counter
from datetime import datetime
async def analyze_message_frequency():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
result = await twitter.inbox.get_user_updates()
if result.success:
# 统计每个用户的消息数
sender_counts = Counter()
hourly_counts = Counter()
for entry in result.entries:
if entry.message:
msg = entry.message.message_data
# 统计发送者
sender = result.users.get(msg.sender_id)
sender_name = sender.screen_name if sender else msg.sender_id
sender_counts[sender_name] += 1
# 统计时间分布
try:
timestamp = int(msg.time) / 1000
hour = datetime.fromtimestamp(timestamp).hour
hourly_counts[hour] += 1
except:
pass
print(f"📊 消息频率分析\n")
print(f"👤 发送者排行:")
for name, count in sender_counts.most_common(5):
print(f" @{name}: {count} 条")
print(f"\n🕐 时间分布:")
for hour in range(24):
count = hourly_counts.get(hour, 0)
bar = "█" * (count // 2) if count > 0 else ""
print(f" {hour:02d}:00 {bar} ({count})")
asyncio.run(analyze_message_frequency())
实时监控收件箱¶
定期检查新消息。
import asyncio
import x_api
from datetime import datetime
async def monitor_inbox(interval_seconds: int = 30, duration_minutes: int = 5):
"""
监控收件箱新消息
Args:
interval_seconds: 检查间隔(秒)
duration_minutes: 监控时长(分钟)
"""
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
seen_message_ids = set()
end_time = datetime.now().timestamp() + duration_minutes * 60
print(f"👀 开始监控收件箱 (每 {interval_seconds} 秒检查一次)")
print(f" 持续时间: {duration_minutes} 分钟")
print(f" 按 Ctrl+C 停止\n")
try:
while datetime.now().timestamp() < end_time:
result = await twitter.inbox.get_user_updates()
if result.success:
new_messages = []
for entry in result.entries:
if entry.message:
msg_id = entry.message.id
if msg_id not in seen_message_ids:
seen_message_ids.add(msg_id)
new_messages.append(entry.message)
if new_messages:
print(f"\n🔔 [{datetime.now().strftime('%H:%M:%S')}] "
f"收到 {len(new_messages)} 条新消息:")
for msg in new_messages:
sender = result.users.get(msg.message_data.sender_id)
sender_name = sender.screen_name if sender else "Unknown"
print(f" @{sender_name}: {msg.message_data.text[:40]}...")
else:
print(f"⏳ [{datetime.now().strftime('%H:%M:%S')}] 无新消息", end="\r")
await asyncio.sleep(interval_seconds)
except KeyboardInterrupt:
print("\n\n⏹️ 监控已停止")
print(f"\n📊 监控期间共发现 {len(seen_message_ids)} 条消息")
asyncio.run(monitor_inbox(interval_seconds=30, duration_minutes=5))
新消息回调处理¶
import asyncio
import x_api
from typing import Callable
class InboxMonitor:
"""收件箱监控器"""
def __init__(self, cookies: str, proxy_url: str | None = None):
self.twitter = x_api.Twitter(cookies, proxy_url)
self.seen_ids = set()
self.callbacks = []
def on_new_message(self, callback: Callable):
"""注册新消息回调"""
self.callbacks.append(callback)
async def start(self, interval: int = 30):
"""开始监控"""
print("🚀 收件箱监控已启动")
while True:
try:
result = await self.twitter.inbox.get_user_updates()
if result.success:
for entry in result.entries:
if entry.message and entry.message.id not in self.seen_ids:
self.seen_ids.add(entry.message.id)
# 触发所有回调
for callback in self.callbacks:
await callback(entry.message, result.users)
await asyncio.sleep(interval)
except Exception as e:
print(f"❌ 监控错误: {e}")
await asyncio.sleep(interval)
async def handle_new_message(message, users):
"""处理新消息"""
sender = users.get(message.message_data.sender_id)
sender_name = sender.screen_name if sender else "Unknown"
print(f"📬 新消息 from @{sender_name}: {message.message_data.text}")
async def main():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
monitor = InboxMonitor(cookies)
monitor.on_new_message(handle_new_message)
# 运行 2 分钟
await asyncio.wait_for(monitor.start(interval=15), timeout=120)
asyncio.run(main())
错误处理¶
处理收件箱操作中的常见错误。
import asyncio
import x_api
async def handle_inbox_errors():
cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)
try:
result = await twitter.inbox.get_user_updates()
if result.success:
print(f"✅ 获取成功: {len(result.entries)} 条消息")
# 安全遍历
for entry in result.entries:
if entry.message: # 始终检查 message 是否存在
process_message(entry.message, result.users)
else:
# 处理 API 返回的错误
match result.http_status:
case 400:
print("❌ 请求参数错误")
case 401:
print("❌ 认证失败,请检查 cookies")
case 403:
print("❌ 权限不足,可能被限制访问私信")
case 429:
print("❌ 请求过于频繁,请稍后重试")
case _:
print(f"❌ 获取失败: {result.error_msg}")
except x_api.TwitterError as e:
print(f"❌ 客户端错误: {e}")
except Exception as e:
print(f"❌ 未知错误: {e}")
def process_message(message, users):
"""安全处理消息"""
try:
msg_data = message.message_data
sender = users.get(msg_data.sender_id)
if sender:
print(f"@{sender.screen_name}: {msg_data.text}")
else:
print(f"[{msg_data.sender_id}]: {msg_data.text}")
except AttributeError as e:
print(f"⚠️ 消息解析错误: {e}")
asyncio.run(handle_inbox_errors())
常见错误码¶
| HTTP 状态码 | 说明 | 处理建议 |
|---|---|---|
| 400 | 请求参数错误 | 检查 cursor 格式 |
| 401 | 认证失败 | 检查 cookies 是否过期 |
| 403 | 权限不足 | 账号可能被限制私信功能 |
| 429 | 请求频率限制 | 降低请求频率 |
| 500 | 服务器错误 | 稍后重试 |
完整示例¶
#!/usr/bin/env python3
"""
Inbox 收件箱模块完整示例
"""
import asyncio
import os
import sys
from pathlib import Path
from datetime import datetime
from collections import defaultdict
try:
import x_api
except ImportError:
print("❌ 请先安装 x_api: maturin develop")
sys.exit(1)
class InboxDemo:
"""收件箱演示类"""
def __init__(self, cookies: str, proxy_url: str | None = None):
self.twitter = x_api.Twitter(cookies, proxy_url)
async def get_messages(self, max_pages: int = 1):
"""获取消息,返回 (entries, users)"""
all_entries = []
all_users = {}
cursor = None
for _ in range(max_pages):
result = await self.twitter.inbox.get_user_updates(cursor=cursor)
if not result.success:
break
all_entries.extend(result.entries)
all_users.update(result.users)
if not result.cursor:
break
cursor = result.cursor
await asyncio.sleep(1)
return all_entries, all_users
async def get_conversation_summary(self):
"""获取会话摘要"""
entries, users = await self.get_messages(max_pages=3)
conversations = defaultdict(list)
for entry in entries:
if entry.message:
conv_id = entry.message.conversation_id
conversations[conv_id].append(entry.message)
return {
"total_conversations": len(conversations),
"total_messages": len(entries),
"total_users": len(users),
"conversations": dict(conversations)
}
async def find_links(self):
"""提取所有链接"""
entries, users = await self.get_messages()
links = []
for entry in entries:
if entry.message:
for url in entry.message.message_data.entities.urls:
links.append(url.expanded_url)
return links
async def main():
# 读取配置
cookies = os.getenv("TWITTER_COOKIES")
if not cookies:
cookies_file = Path("cookies.txt")
if cookies_file.exists():
cookies = cookies_file.read_text().strip()
else:
print("❌ 请设置 TWITTER_COOKIES 或创建 cookies.txt")
return
demo = InboxDemo(cookies)
# 演示获取消息
print("=== 获取收件箱消息 ===")
entries, users = await demo.get_messages()
print(f"✅ 获取到 {len(entries)} 条消息, {len(users)} 个用户")
# 显示前 3 条
for entry in entries[:3]:
if entry.message:
sender = users.get(entry.message.message_data.sender_id)
name = sender.screen_name if sender else "Unknown"
text = entry.message.message_data.text[:40]
print(f" @{name}: {text}...")
# 会话摘要
print("\n=== 会话摘要 ===")
summary = await demo.get_conversation_summary()
print(f"总会话数: {summary['total_conversations']}")
print(f"总消息数: {summary['total_messages']}")
if __name__ == "__main__":
asyncio.run(main())