跳转至

Upload 模块 API 文档

Upload 模块提供完整的媒体上传功能,支持图片和视频上传,采用 Twitter 官方的三阶段上传流程。

目录

模块概述

Upload 模块支持两种使用方式:

方式 1: 通过 Twitter 主入口访问(推荐)

from x_api_rs import Twitter

client = Twitter(cookies)
result = await client.upload.image(image_bytes, "tweet_image")

方式 2: 独立创建 UploadClient

from x_api_rs.upload import UploadClient

upload_client = UploadClient(cookies, proxy_url="http://proxy:8080")
result = await upload_client.image(image_bytes, "tweet_image")

支持的媒体类别

类别 用途
帖子图片 tweet_image 发布带图片的帖子
私信图片 dm_image 发送带图片的私信
背景图片 banner_image 用户资料背景图
帖子视频 amplify_video 发布带视频的帖子
私信视频 dm_video 发送带视频的私信

UploadClient 类

构造函数

UploadClient(
    cookies: str,
    proxy_url: str | None = None
)

创建新的 Upload 客户端实例。

参数:

  • cookies (str): Twitter 账号的 cookies 字符串
  • proxy_url (str | None): 可选的代理服务器 URL

返回: UploadClient 实例

异常: RuntimeError - 初始化失败

示例:

# 基础使用
client = UploadClient(cookies)

# 使用代理
client = UploadClient(cookies, proxy_url="http://proxy:8080")

upload

async def upload(
    data: bytes,
    media_category: str,
    options: UploadOptions | None = None
) -> UploadResult

统一上传接口,根据媒体类别自动选择上传流程。

参数:

  • data (bytes): 媒体二进制数据
  • media_category (str): 媒体类别(见上方表格)
  • options (UploadOptions | None): 上传选项(视频上传必需提供 duration_ms

返回: UploadResult 对象

异常: RuntimeError - 上传失败

示例:

# 上传图片
result = await client.upload.upload(image_bytes, "tweet_image")

# 上传视频(需要 duration_ms)
from x_api_rs.upload import UploadOptions

options = UploadOptions(duration_ms=10819.0)
result = await client.upload.upload(video_bytes, "amplify_video", options)

if result.success:
    print(f"Media ID: {result.media_id_string}")
    if result.processing_info:
        print(f"Processing state: {result.processing_info.state}")

image

async def image(
    image_bytes: bytes,
    media_category: str
) -> UploadResult

上传图片的便捷方法。

参数:

  • image_bytes (bytes): 图片二进制数据
  • media_category (str): 媒体类别,可选值: tweet_image, dm_image, banner_image

返回: UploadResult 对象

异常: RuntimeError - 上传失败或使用了视频类别

示例:

with open("image.jpg", "rb") as f:
    image_bytes = f.read()

result = await client.upload.image(image_bytes, "dm_image")

if result.success:
    print(f"Media ID: {result.media_id_string}")

video

async def video(
    video_bytes: bytes,
    media_category: str,
    duration_ms: float,
    processing_timeout: int | None = None
) -> UploadResult

上传视频的便捷方法。

参数:

  • video_bytes (bytes): 视频二进制数据
  • media_category (str): 媒体类别,可选值: amplify_video, dm_video
  • duration_ms (float): 视频时长(毫秒),必需
  • processing_timeout (int | None): 处理超时(秒),默认 300

返回: UploadResult 对象,包含 processing_info

异常: RuntimeError - 上传失败或使用了图片类别

示例:

with open("video.mp4", "rb") as f:
    video_bytes = f.read()

result = await client.upload.video(
    video_bytes,
    "amplify_video",
    duration_ms=10819.0,
    processing_timeout=600  # 10 分钟超时
)

if result.success:
    print(f"Media ID: {result.media_id_string}")
    print(f"Processing state: {result.processing_info.state}")

image_multiple_times

async def image_multiple_times(
    image_bytes: bytes,
    media_category: str,
    count: int
) -> BatchUploadResult

批量上传同一张图片多次,获取多个独立的 media_id

通过添加随机扰动,每次上传都会获得不同的 media_id,适用于批量发送私信时为每个用户使用独立的媒体附件。

参数:

  • image_bytes (bytes): 图片二进制数据
  • media_category (str): 媒体类别
  • count (int): 上传次数(必须 > 0)

返回: BatchUploadResult 对象

异常: RuntimeError - 批量上传失败

示例:

# 上传 3 次,获得 3 个不同的 media_id
result = await client.upload.image_multiple_times(
    image_bytes,
    "dm_image",
    count=3
)

print(f"成功: {result.success_count}, 失败: {result.failure_count}")
print(f"Media IDs: {result.media_ids}")

# 配合批量私信使用
dm_result = await client.dm.send_batch(
    user_ids=["123", "456", "789"],
    text="请看这张图片",
    media_ids=result.media_ids
)

类型定义

UploadResult

单次上传结果。

属性:

  • success (bool): 是否上传成功
  • media_id (int | None): 媒体 ID(数字形式)
  • media_id_string (str | None): 媒体 ID(字符串形式,推荐使用)
  • error_msg (str): 错误消息(失败时有值)
  • processing_info (ProcessingInfo | None): 视频处理信息(仅视频上传时有值)

方法:

  • is_video() -> bool: 是否为视频上传结果

示例:

result = await client.upload.image(image_bytes, "tweet_image")

print(result.success)           # True
print(result.media_id)          # 1234567890123456789
print(result.media_id_string)   # "1234567890123456789"
print(result.is_video())        # False

BatchUploadResult

批量上传结果。

属性:

  • success_count (int): 成功上传的数量
  • failure_count (int): 失败上传的数量
  • media_ids (list[str]): 成功上传的 media_id 列表
  • results (list[UploadResult]): 详细结果列表

示例:

result = await client.upload.image_multiple_times(image_bytes, "dm_image", 5)

print(f"成功: {result.success_count}")
print(f"失败: {result.failure_count}")
print(f"Media IDs: {result.media_ids}")

for r in result.results:
    if r.success:
        print(f"✓ {r.media_id_string}")
    else:
        print(f"✗ {r.error_msg}")

UploadOptions

上传选项,用于视频上传。

属性:

  • duration_ms (float | None): 视频时长(毫秒),视频上传必需
  • processing_timeout (int | None): 处理超时(秒),默认 300

示例:

from x_api_rs.upload import UploadOptions

# 基础使用
options = UploadOptions(duration_ms=10819.0)

# 设置较长的超时时间
options = UploadOptions(
    duration_ms=60000.0,        # 1 分钟视频
    processing_timeout=600     # 10 分钟超时
)

result = await client.upload.upload(video_bytes, "amplify_video", options)

ProcessingInfo

视频处理信息(仅视频上传时返回)。

属性:

  • state (ProcessingState): 处理状态
  • check_after_secs (int | None): 建议等待时间(秒)
  • progress_percent (int | None): 处理进度(百分比)
  • error (ProcessingError | None): 处理错误信息(仅失败时有值)

示例:

result = await client.upload.video(video_bytes, "amplify_video", duration_ms=10000.0)

if result.processing_info:
    info = result.processing_info
    print(f"State: {info.state}")
    print(f"Progress: {info.progress_percent}%")
    print(f"Check after: {info.check_after_secs}s")

ProcessingState

视频处理状态。

属性:

  • state (str): 状态字符串,可能的值:
  • "pending": 等待处理
  • "in_progress": 处理中
  • "succeeded": 处理完成
  • "failed": 处理失败

方法:

  • is_succeeded() -> bool: 是否处理成功
  • is_failed() -> bool: 是否处理失败
  • is_pending() -> bool: 是否仍在处理中

示例:

if result.processing_info:
    state = result.processing_info.state
    if state.is_succeeded():
        print("视频处理完成")
    elif state.is_failed():
        print("视频处理失败")
    elif state.is_pending():
        print("视频处理中...")

ProcessingError

视频处理错误信息。

属性:

  • code (int): 错误代码
  • name (str): 错误名称
  • message (str): 错误消息

示例:

if result.processing_info and result.processing_info.error:
    error = result.processing_info.error
    print(f"Error {error.code}: {error.name}")
    print(f"Message: {error.message}")

使用示例

基础图片上传

import asyncio
from x_api_rs import Twitter

async def main():
    client = Twitter(cookies)

    # 读取图片
    with open("image.jpg", "rb") as f:
        image_bytes = f.read()

    # 上传图片
    result = await client.upload.image(image_bytes, "tweet_image")

    if result.success:
        print(f"上传成功!Media ID: {result.media_id_string}")
    else:
        print(f"上传失败: {result.error_msg}")

asyncio.run(main())

视频上传

async def upload_video():
    client = Twitter(cookies)

    with open("video.mp4", "rb") as f:
        video_bytes = f.read()

    # 上传视频(需要提供时长)
    result = await client.upload.video(
        video_bytes,
        "amplify_video",
        duration_ms=10819.0
    )

    if result.success:
        print(f"上传成功!Media ID: {result.media_id_string}")

        # 检查处理状态
        if result.processing_info:
            state = result.processing_info.state
            print(f"处理状态: {state}")
            if state.is_succeeded():
                print("视频已可用")
            elif state.is_pending():
                print("视频仍在处理中")

批量上传用于私信

async def batch_dm_with_images():
    client = Twitter(cookies)

    # 读取图片
    with open("promo.jpg", "rb") as f:
        image_bytes = f.read()

    # 批量上传获取多个 media_id
    upload_result = await client.upload.image_multiple_times(
        image_bytes,
        "dm_image",
        count=3
    )

    if upload_result.success_count == 3:
        # 批量发送私信
        user_ids = ["123", "456", "789"]
        dm_result = await client.dm.send_batch(
            user_ids,
            "请查看这张推广图片",
            media_ids=upload_result.media_ids
        )
        print(f"发送成功: {dm_result.success_count}")

上传后发帖

async def tweet_with_image():
    client = Twitter(cookies)

    with open("photo.jpg", "rb") as f:
        image_bytes = f.read()

    # 上传图片
    upload_result = await client.upload.image(image_bytes, "tweet_image")

    if upload_result.success:
        # 发布带图帖子
        tweet_result = await client.posts.create_tweet(
            text="分享一张照片 📸",
            media_ids=[upload_result.media_id_string]
        )
        if tweet_result.success:
            print(f"发帖成功!Tweet ID: {tweet_result.tweet_id}")

上传后发私信

async def dm_with_image():
    client = Twitter(cookies)

    with open("gift.png", "rb") as f:
        image_bytes = f.read()

    # 上传图片(使用 dm_image 类别)
    upload_result = await client.upload.image(image_bytes, "dm_image")

    if upload_result.success:
        # 发送带图私信
        dm_result = await client.dm.send_message(
            "123456",
            "送你一张图片 🎁",
            media_id=upload_result.media_id_string
        )
        if dm_result.success:
            print(f"发送成功!Event ID: {dm_result.event_id}")

最佳实践

1. 选择正确的媒体类别

# 发帖用 tweet_image
await client.upload.image(image_bytes, "tweet_image")

# 私信用 dm_image
await client.upload.image(image_bytes, "dm_image")

# 背景图用 banner_image
await client.upload.image(image_bytes, "banner_image")

# 错误:私信时使用 tweet_image 可能导致问题
# ❌ await client.upload.image(image_bytes, "tweet_image")  # 用于私信

2. 验证上传结果

result = await client.upload.image(image_bytes, "tweet_image")

# 始终检查 success 状态
if not result.success:
    print(f"上传失败: {result.error_msg}")
    return

# 使用 media_id_string 而不是 media_id(避免精度问题)
media_id = result.media_id_string  # ✓ 推荐
# media_id = str(result.media_id)  # ✗ 可能有精度问题

3. 处理视频上传

# 视频上传必须提供 duration_ms
result = await client.upload.video(
    video_bytes,
    "amplify_video",
    duration_ms=10819.0  # 必需参数
)

# 检查处理状态
if result.success and result.processing_info:
    if result.processing_info.state.is_pending():
        # 视频仍在处理中,等待后再使用
        print("视频处理中,请稍后...")

4. 批量上传优化

# 批量上传会并发执行,效率高
result = await client.upload.image_multiple_times(
    image_bytes,
    "dm_image",
    count=10  # 一次获取 10 个 media_id
)

# 对于大量上传,分批执行
total = 100
batch_size = 20
all_media_ids = []

for i in range(0, total, batch_size):
    count = min(batch_size, total - i)
    result = await client.upload.image_multiple_times(
        image_bytes, "dm_image", count
    )
    all_media_ids.extend(result.media_ids)
    await asyncio.sleep(1)  # 避免限流

5. 错误处理

try:
    result = await client.upload.image(image_bytes, "tweet_image")
    if result.success:
        # 成功处理
        return result.media_id_string
    else:
        # 业务失败
        log_error(f"上传失败: {result.error_msg}")
        return None
except Exception as e:
    # 异常处理
    log_exception(f"上传异常: {e}")
    return None

常见问题

Q1: 图片上传支持哪些格式?

支持 JPEG、PNG、GIF、WebP 等常见图片格式。建议使用 JPEG 或 PNG 格式。

Q2: 视频上传支持哪些格式?

支持 MP4(H.264 编码)格式。建议:

  • 视频编码:H.264
  • 音频编码:AAC
  • 最大文件大小:512MB
  • 最大时长:140 秒(帖子视频)

Q3: 为什么需要批量上传同一张图片?

Twitter 要求每条私信使用独立的 media_id。如果多条私信使用相同的 media_id,只有第一条会显示图片。使用 image_multiple_times 可以为同一张图片生成多个独立的 media_id

Q4: 上传时出现 "无效的媒体类别" 错误?

确保使用正确的类别字符串:

# 正确的类别值
"tweet_image"    # 帖子图片
"dm_image"       # 私信图片
"banner_image"   # 背景图片
"amplify_video"  # 帖子视频
"dm_video"       # 私信视频

# 错误示例
"image"          # ✗ 错误
"tweetImage"     # ✗ 错误
"TWEET_IMAGE"    # ✗ 错误

Q5: 视频上传后状态一直是 "processing"?

视频上传后需要 Twitter 服务器处理,这可能需要几秒到几分钟。可以增加 processing_timeout 参数:

result = await client.upload.video(
    video_bytes,
    "amplify_video",
    duration_ms=60000.0,
    processing_timeout=600  # 10 分钟超时
)

Q6: 如何获取视频时长?

可以使用第三方库如 moviepyffprobe 获取视频时长:

from moviepy.editor import VideoFileClip

clip = VideoFileClip("video.mp4")
duration_ms = clip.duration * 1000  # 转换为毫秒
clip.close()

result = await client.upload.video(video_bytes, "amplify_video", duration_ms)

下一步