跳转至

Inbox 收件箱模块示例

本文档提供 Inbox 收件箱模块的完整使用示例。

目录

前置准备

安装依赖

maturin develop

创建客户端

import x_api

cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
twitter = x_api.Twitter(cookies)

# 或使用代理
twitter = x_api.Twitter(cookies, "http://127.0.0.1:7890")

获取最新私信

获取收件箱中的最新消息。

import asyncio
import x_api

async def get_latest_messages():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        print(f"📬 收件箱更新")
        print(f"   数据来源: {result.data_source}")
        print(f"   消息数量: {len(result.entries)}")
        print(f"   用户数量: {len(result.users)}")

        # 遍历消息
        print(f"\n📨 最新消息:")
        for entry in result.entries:
            if entry.message:
                msg = entry.message.message_data

                # 获取发送者信息
                sender = result.users.get(msg.sender_id)
                sender_name = f"@{sender.screen_name}" if sender else msg.sender_id

                print(f"   [{msg.time}] {sender_name}: {msg.text[:50]}...")
    else:
        print(f"❌ 获取失败: {result.error_msg}")
        print(f"   HTTP 状态码: {result.http_status}")

asyncio.run(get_latest_messages())

输出示例:

📬 收件箱更新
   数据来源: rpc
   消息数量: 15
   用户数量: 8

📨 最新消息:
   [1705123456789] @user1: 你好!最近怎么样?...
   [1705123456000] @user2: 收到,我明天给你回复...
   [1705123400000] @user3: 感谢你的帮助!...

简单的消息列表

import asyncio
import x_api

async def list_messages():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        for i, entry in enumerate(result.entries[:10], 1):
            if entry.message:
                msg = entry.message.message_data
                sender = result.users.get(msg.sender_id)
                name = sender.screen_name if sender else "Unknown"
                print(f"{i}. @{name}: {msg.text}")

asyncio.run(list_messages())

分页获取历史消息

使用游标分页获取更多历史消息。

import asyncio
import x_api

async def get_all_messages():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    all_entries = []
    all_users = {}
    cursor = None
    max_pages = 5

    print(f"📥 开始获取收件箱消息...")

    for page in range(max_pages):
        result = await twitter.inbox.get_user_updates(cursor=cursor)

        if not result.success:
            print(f"❌ 第 {page + 1} 页获取失败: {result.error_msg}")
            break

        # 累积数据
        all_entries.extend(result.entries)
        all_users.update(result.users)

        print(f"   第 {page + 1} 页: {len(result.entries)} 条消息")

        # 检查是否还有更多
        if not result.cursor:
            print("   ✅ 已获取全部消息")
            break

        cursor = result.cursor
        await asyncio.sleep(1)  # 避免限流

    print(f"\n📊 总计:")
    print(f"   消息: {len(all_entries)} 条")
    print(f"   用户: {len(all_users)} 人")

    return all_entries, all_users

asyncio.run(get_all_messages())

输出示例:

📥 开始获取收件箱消息...
   第 1 页: 20 条消息
   第 2 页: 20 条消息
   第 3 页: 15 条消息
   ✅ 已获取全部消息

📊 总计:
   消息: 55 条
   用户: 23 人

获取特定会话消息

获取与特定用户的会话消息。

import asyncio
import x_api

async def get_conversation_messages():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    # 指定会话 ID
    conversation_id = "123456789-987654321"

    result = await twitter.inbox.get_user_updates(
        active_conversation_id=conversation_id
    )

    if result.success:
        print(f"💬 会话 {conversation_id[:16]}... 的消息:")

        for entry in result.entries:
            if entry.message:
                if entry.message.conversation_id == conversation_id:
                    msg = entry.message.message_data
                    sender = result.users.get(msg.sender_id)
                    name = sender.screen_name if sender else msg.sender_id
                    print(f"   @{name}: {msg.text}")
    else:
        print(f"❌ 获取失败: {result.error_msg}")

asyncio.run(get_conversation_messages())

查找特定用户的消息

import asyncio
import x_api

async def find_messages_from_user(target_user_id: str):
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        # 筛选来自目标用户的消息
        user_messages = []

        for entry in result.entries:
            if entry.message:
                msg_data = entry.message.message_data
                if msg_data.sender_id == target_user_id:
                    user_messages.append({
                        "time": msg_data.time,
                        "text": msg_data.text,
                        "conversation_id": entry.message.conversation_id
                    })

        # 获取用户信息
        user = result.users.get(target_user_id)
        user_name = user.screen_name if user else target_user_id

        print(f"📨 来自 @{user_name} 的消息 ({len(user_messages)} 条):")
        for msg in user_messages[:10]:
            print(f"   [{msg['time']}] {msg['text'][:50]}...")

        return user_messages

asyncio.run(find_messages_from_user("44196397"))

解析消息内容

解析消息的详细信息。

import asyncio
import x_api
from datetime import datetime

async def parse_message_details():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        for entry in result.entries[:5]:
            if entry.message:
                message = entry.message
                msg_data = message.message_data

                print(f"\n{'='*50}")
                print(f"📝 消息详情")
                print(f"{'='*50}")

                # 基本信息
                print(f"消息 ID: {message.id}")
                print(f"会话 ID: {message.conversation_id}")

                # 时间处理
                try:
                    timestamp = int(msg_data.time) / 1000
                    dt = datetime.fromtimestamp(timestamp)
                    print(f"发送时间: {dt.strftime('%Y-%m-%d %H:%M:%S')}")
                except:
                    print(f"时间戳: {msg_data.time}")

                # 发送者和接收者
                sender = result.users.get(msg_data.sender_id)
                recipient = result.users.get(msg_data.recipient_id)

                sender_name = sender.screen_name if sender else msg_data.sender_id
                recipient_name = recipient.screen_name if recipient else msg_data.recipient_id

                print(f"发送者: @{sender_name}")
                print(f"接收者: @{recipient_name}")

                # 消息内容
                print(f"内容: {msg_data.text}")

                # 其他属性
                print(f"影响排序: {message.affects_sort}")
                if message.request_id:
                    print(f"请求 ID: {message.request_id}")

asyncio.run(parse_message_details())

输出示例:

==================================================
📝 消息详情
==================================================
消息 ID: 1234567890123456789
会话 ID: 44196397-123456789
发送时间: 2024-01-13 15:30:45
发送者: @elonmusk
接收者: @myaccount
内容: Hello! How are you?
影响排序: True

提取消息实体

提取消息中的链接、话题标签和 @ 提及。

提取链接

import asyncio
import x_api

async def extract_links():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        all_links = []

        for entry in result.entries:
            if entry.message:
                entities = entry.message.message_data.entities
                sender_id = entry.message.message_data.sender_id
                sender = result.users.get(sender_id)

                for url in entities.urls:
                    all_links.append({
                        "sender": sender.screen_name if sender else sender_id,
                        "short_url": url.url,
                        "expanded_url": url.expanded_url,
                        "display_url": url.display_url
                    })

        print(f"🔗 找到 {len(all_links)} 个链接:")
        for link in all_links[:10]:
            print(f"   @{link['sender']}: {link['expanded_url']}")

        return all_links

asyncio.run(extract_links())

提取话题标签

import asyncio
import x_api
from collections import Counter

async def extract_hashtags():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        all_hashtags = []

        for entry in result.entries:
            if entry.message:
                entities = entry.message.message_data.entities
                for hashtag in entities.hashtags:
                    all_hashtags.append(hashtag.text)

        # 统计热门话题
        hashtag_counts = Counter(all_hashtags)

        print(f"#️⃣ 话题标签统计:")
        for tag, count in hashtag_counts.most_common(10):
            print(f"   #{tag}: {count} 次")

        return hashtag_counts

asyncio.run(extract_hashtags())

提取 @ 提及

import asyncio
import x_api

async def extract_mentions():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        all_mentions = []

        for entry in result.entries:
            if entry.message:
                entities = entry.message.message_data.entities
                msg_text = entry.message.message_data.text

                for mention in entities.user_mentions:
                    all_mentions.append({
                        "user_id": mention.id_str,
                        "screen_name": mention.screen_name,
                        "name": mention.name,
                        "context": msg_text[:50]
                    })

        print(f"👥 用户提及 ({len(all_mentions)} 个):")
        for mention in all_mentions[:10]:
            print(f"   @{mention['screen_name']}: {mention['context']}...")

        return all_mentions

asyncio.run(extract_mentions())

综合提取所有实体

import asyncio
import x_api

async def extract_all_entities():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        print(f"📊 消息实体分析\n")

        for entry in result.entries[:5]:
            if entry.message:
                msg = entry.message.message_data
                entities = msg.entities
                sender = result.users.get(msg.sender_id)

                print(f"{'='*40}")
                print(f"发送者: @{sender.screen_name if sender else msg.sender_id}")
                print(f"内容: {msg.text[:60]}...")

                # 链接
                if entities.urls:
                    print(f"🔗 链接 ({len(entities.urls)}):")
                    for url in entities.urls:
                        print(f"   - {url.display_url}")

                # 话题
                if entities.hashtags:
                    tags = [f"#{h.text}" for h in entities.hashtags]
                    print(f"#️⃣ 话题: {', '.join(tags)}")

                # 提及
                if entities.user_mentions:
                    mentions = [f"@{m.screen_name}" for m in entities.user_mentions]
                    print(f"👥 提及: {', '.join(mentions)}")

                print()

asyncio.run(extract_all_entities())

获取用户信息

访问与消息相关的用户信息。

import asyncio
import x_api

async def get_inbox_users():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        print(f"👥 收件箱中的用户 ({len(result.users)} 人):\n")

        # 按粉丝数排序
        sorted_users = sorted(
            result.users.values(),
            key=lambda u: u.followers_count,
            reverse=True
        )

        for user in sorted_users:
            # 认证标识
            badges = []
            if user.is_blue_verified:
                badges.append("✓蓝V")
            if user.verified:
                badges.append("✓认证")
            if user.protected:
                badges.append("🔒私密")

            badge_str = f" [{', '.join(badges)}]" if badges else ""

            print(f"@{user.screen_name}{badge_str}")
            print(f"   显示名: {user.name}")
            print(f"   粉丝: {user.followers_count:,} | 关注: {user.friends_count:,}")
            if user.description:
                print(f"   简介: {user.description[:50]}...")
            print()

asyncio.run(get_inbox_users())

输出示例:

👥 收件箱中的用户 (8 人):

@elonmusk [✓蓝V]
   显示名: Elon Musk
   粉丝: 210,345,678 | 关注: 876
   简介: Read @America...

@user123
   显示名: John Doe
   粉丝: 1,234 | 关注: 567
   简介: Developer | Tech enthusiast...

快速查找用户

import asyncio
import x_api

async def lookup_user_from_inbox(target_username: str):
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        # 在用户列表中查找
        for user_id, user in result.users.items():
            if user.screen_name.lower() == target_username.lower():
                print(f"✅ 找到用户 @{user.screen_name}")
                print(f"   ID: {user.id_str}")
                print(f"   名称: {user.name}")
                print(f"   粉丝: {user.followers_count:,}")
                print(f"   蓝V: {'是' if user.is_blue_verified else '否'}")
                return user

        print(f"❌ 未在收件箱中找到 @{target_username}")
        return None

asyncio.run(lookup_user_from_inbox("elonmusk"))

统计和分析

对收件箱数据进行统计分析。

会话统计

import asyncio
import x_api
from collections import defaultdict

async def analyze_conversations():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        # 按会话分组
        conversations = defaultdict(list)

        for entry in result.entries:
            if entry.message:
                conv_id = entry.message.conversation_id
                conversations[conv_id].append(entry.message)

        print(f"📊 会话统计")
        print(f"   总会话数: {len(conversations)}")
        print(f"   总消息数: {len(result.entries)}")
        print(f"\n💬 各会话详情:")

        for conv_id, messages in sorted(
            conversations.items(),
            key=lambda x: len(x[1]),
            reverse=True
        )[:5]:
            # 获取会话参与者
            participants = set()
            for msg in messages:
                participants.add(msg.message_data.sender_id)
                participants.add(msg.message_data.recipient_id)

            # 获取参与者名称
            participant_names = []
            for pid in participants:
                user = result.users.get(pid)
                if user:
                    participant_names.append(f"@{user.screen_name}")

            print(f"\n   会话: {conv_id[:16]}...")
            print(f"   消息数: {len(messages)}")
            print(f"   参与者: {', '.join(participant_names)}")

asyncio.run(analyze_conversations())

消息频率统计

import asyncio
import x_api
from collections import Counter
from datetime import datetime

async def analyze_message_frequency():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    result = await twitter.inbox.get_user_updates()

    if result.success:
        # 统计每个用户的消息数
        sender_counts = Counter()
        hourly_counts = Counter()

        for entry in result.entries:
            if entry.message:
                msg = entry.message.message_data

                # 统计发送者
                sender = result.users.get(msg.sender_id)
                sender_name = sender.screen_name if sender else msg.sender_id
                sender_counts[sender_name] += 1

                # 统计时间分布
                try:
                    timestamp = int(msg.time) / 1000
                    hour = datetime.fromtimestamp(timestamp).hour
                    hourly_counts[hour] += 1
                except:
                    pass

        print(f"📊 消息频率分析\n")

        print(f"👤 发送者排行:")
        for name, count in sender_counts.most_common(5):
            print(f"   @{name}: {count} 条")

        print(f"\n🕐 时间分布:")
        for hour in range(24):
            count = hourly_counts.get(hour, 0)
            bar = "█" * (count // 2) if count > 0 else ""
            print(f"   {hour:02d}:00 {bar} ({count})")

asyncio.run(analyze_message_frequency())

实时监控收件箱

定期检查新消息。

import asyncio
import x_api
from datetime import datetime

async def monitor_inbox(interval_seconds: int = 30, duration_minutes: int = 5):
    """
    监控收件箱新消息

    Args:
        interval_seconds: 检查间隔(秒)
        duration_minutes: 监控时长(分钟)
    """
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    seen_message_ids = set()
    end_time = datetime.now().timestamp() + duration_minutes * 60

    print(f"👀 开始监控收件箱 (每 {interval_seconds} 秒检查一次)")
    print(f"   持续时间: {duration_minutes} 分钟")
    print(f"   按 Ctrl+C 停止\n")

    try:
        while datetime.now().timestamp() < end_time:
            result = await twitter.inbox.get_user_updates()

            if result.success:
                new_messages = []

                for entry in result.entries:
                    if entry.message:
                        msg_id = entry.message.id
                        if msg_id not in seen_message_ids:
                            seen_message_ids.add(msg_id)
                            new_messages.append(entry.message)

                if new_messages:
                    print(f"\n🔔 [{datetime.now().strftime('%H:%M:%S')}] "
                          f"收到 {len(new_messages)} 条新消息:")

                    for msg in new_messages:
                        sender = result.users.get(msg.message_data.sender_id)
                        sender_name = sender.screen_name if sender else "Unknown"
                        print(f"   @{sender_name}: {msg.message_data.text[:40]}...")
                else:
                    print(f"⏳ [{datetime.now().strftime('%H:%M:%S')}] 无新消息", end="\r")

            await asyncio.sleep(interval_seconds)

    except KeyboardInterrupt:
        print("\n\n⏹️ 监控已停止")

    print(f"\n📊 监控期间共发现 {len(seen_message_ids)} 条消息")

asyncio.run(monitor_inbox(interval_seconds=30, duration_minutes=5))

新消息回调处理

import asyncio
import x_api
from typing import Callable

class InboxMonitor:
    """收件箱监控器"""

    def __init__(self, cookies: str, proxy_url: str | None = None):
        self.twitter = x_api.Twitter(cookies, proxy_url)
        self.seen_ids = set()
        self.callbacks = []

    def on_new_message(self, callback: Callable):
        """注册新消息回调"""
        self.callbacks.append(callback)

    async def start(self, interval: int = 30):
        """开始监控"""
        print("🚀 收件箱监控已启动")

        while True:
            try:
                result = await self.twitter.inbox.get_user_updates()

                if result.success:
                    for entry in result.entries:
                        if entry.message and entry.message.id not in self.seen_ids:
                            self.seen_ids.add(entry.message.id)

                            # 触发所有回调
                            for callback in self.callbacks:
                                await callback(entry.message, result.users)

                await asyncio.sleep(interval)

            except Exception as e:
                print(f"❌ 监控错误: {e}")
                await asyncio.sleep(interval)


async def handle_new_message(message, users):
    """处理新消息"""
    sender = users.get(message.message_data.sender_id)
    sender_name = sender.screen_name if sender else "Unknown"

    print(f"📬 新消息 from @{sender_name}: {message.message_data.text}")


async def main():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"

    monitor = InboxMonitor(cookies)
    monitor.on_new_message(handle_new_message)

    # 运行 2 分钟
    await asyncio.wait_for(monitor.start(interval=15), timeout=120)

asyncio.run(main())

错误处理

处理收件箱操作中的常见错误。

import asyncio
import x_api

async def handle_inbox_errors():
    cookies = "ct0=xxx; auth_token=yyy; twid=u%3D123456789"
    twitter = x_api.Twitter(cookies)

    try:
        result = await twitter.inbox.get_user_updates()

        if result.success:
            print(f"✅ 获取成功: {len(result.entries)} 条消息")

            # 安全遍历
            for entry in result.entries:
                if entry.message:  # 始终检查 message 是否存在
                    process_message(entry.message, result.users)
        else:
            # 处理 API 返回的错误
            match result.http_status:
                case 400:
                    print("❌ 请求参数错误")
                case 401:
                    print("❌ 认证失败,请检查 cookies")
                case 403:
                    print("❌ 权限不足,可能被限制访问私信")
                case 429:
                    print("❌ 请求过于频繁,请稍后重试")
                case _:
                    print(f"❌ 获取失败: {result.error_msg}")

    except x_api.TwitterError as e:
        print(f"❌ 客户端错误: {e}")
    except Exception as e:
        print(f"❌ 未知错误: {e}")


def process_message(message, users):
    """安全处理消息"""
    try:
        msg_data = message.message_data
        sender = users.get(msg_data.sender_id)

        if sender:
            print(f"@{sender.screen_name}: {msg_data.text}")
        else:
            print(f"[{msg_data.sender_id}]: {msg_data.text}")

    except AttributeError as e:
        print(f"⚠️ 消息解析错误: {e}")

asyncio.run(handle_inbox_errors())

常见错误码

HTTP 状态码 说明 处理建议
400 请求参数错误 检查 cursor 格式
401 认证失败 检查 cookies 是否过期
403 权限不足 账号可能被限制私信功能
429 请求频率限制 降低请求频率
500 服务器错误 稍后重试

完整示例

#!/usr/bin/env python3
"""
Inbox 收件箱模块完整示例
"""

import asyncio
import os
import sys
from pathlib import Path
from datetime import datetime
from collections import defaultdict

try:
    import x_api
except ImportError:
    print("❌ 请先安装 x_api: maturin develop")
    sys.exit(1)


class InboxDemo:
    """收件箱演示类"""

    def __init__(self, cookies: str, proxy_url: str | None = None):
        self.twitter = x_api.Twitter(cookies, proxy_url)

    async def get_messages(self, max_pages: int = 1):
        """获取消息,返回 (entries, users)"""
        all_entries = []
        all_users = {}
        cursor = None

        for _ in range(max_pages):
            result = await self.twitter.inbox.get_user_updates(cursor=cursor)

            if not result.success:
                break

            all_entries.extend(result.entries)
            all_users.update(result.users)

            if not result.cursor:
                break

            cursor = result.cursor
            await asyncio.sleep(1)

        return all_entries, all_users

    async def get_conversation_summary(self):
        """获取会话摘要"""
        entries, users = await self.get_messages(max_pages=3)

        conversations = defaultdict(list)
        for entry in entries:
            if entry.message:
                conv_id = entry.message.conversation_id
                conversations[conv_id].append(entry.message)

        return {
            "total_conversations": len(conversations),
            "total_messages": len(entries),
            "total_users": len(users),
            "conversations": dict(conversations)
        }

    async def find_links(self):
        """提取所有链接"""
        entries, users = await self.get_messages()

        links = []
        for entry in entries:
            if entry.message:
                for url in entry.message.message_data.entities.urls:
                    links.append(url.expanded_url)

        return links


async def main():
    # 读取配置
    cookies = os.getenv("TWITTER_COOKIES")
    if not cookies:
        cookies_file = Path("cookies.txt")
        if cookies_file.exists():
            cookies = cookies_file.read_text().strip()
        else:
            print("❌ 请设置 TWITTER_COOKIES 或创建 cookies.txt")
            return

    demo = InboxDemo(cookies)

    # 演示获取消息
    print("=== 获取收件箱消息 ===")
    entries, users = await demo.get_messages()
    print(f"✅ 获取到 {len(entries)} 条消息, {len(users)} 个用户")

    # 显示前 3 条
    for entry in entries[:3]:
        if entry.message:
            sender = users.get(entry.message.message_data.sender_id)
            name = sender.screen_name if sender else "Unknown"
            text = entry.message.message_data.text[:40]
            print(f"   @{name}: {text}...")

    # 会话摘要
    print("\n=== 会话摘要 ===")
    summary = await demo.get_conversation_summary()
    print(f"总会话数: {summary['total_conversations']}")
    print(f"总消息数: {summary['total_messages']}")


if __name__ == "__main__":
    asyncio.run(main())

相关链接