Upload 模块 API 文档¶
Upload 模块提供完整的媒体上传功能,支持图片和视频上传,采用 Twitter 官方的三阶段上传流程。
目录¶
模块概述¶
Upload 模块支持两种使用方式:
方式 1: 通过 Twitter 主入口访问(推荐)¶
from x_api_rs import Twitter
client = Twitter(cookies)
result = await client.upload.image(image_bytes, "tweet_image")
方式 2: 独立创建 UploadClient¶
from x_api_rs.upload import UploadClient
upload_client = UploadClient(cookies, proxy_url="http://proxy:8080")
result = await upload_client.image(image_bytes, "tweet_image")
支持的媒体类别¶
| 类别 | 值 | 用途 |
|---|---|---|
| 帖子图片 | tweet_image |
发布带图片的帖子 |
| 私信图片 | dm_image |
发送带图片的私信 |
| 背景图片 | banner_image |
用户资料背景图 |
| 帖子视频 | amplify_video |
发布带视频的帖子 |
| 私信视频 | dm_video |
发送带视频的私信 |
UploadClient 类¶
构造函数¶
创建新的 Upload 客户端实例。
参数:
cookies(str): Twitter 账号的 cookies 字符串proxy_url(str | None): 可选的代理服务器 URL
返回: UploadClient 实例
异常: RuntimeError - 初始化失败
示例:
# 基础使用
client = UploadClient(cookies)
# 使用代理
client = UploadClient(cookies, proxy_url="http://proxy:8080")
upload¶
async def upload(
data: bytes,
media_category: str,
options: UploadOptions | None = None
) -> UploadResult
统一上传接口,根据媒体类别自动选择上传流程。
参数:
data(bytes): 媒体二进制数据media_category(str): 媒体类别(见上方表格)options(UploadOptions | None): 上传选项(视频上传必需提供duration_ms)
返回: UploadResult 对象
异常: RuntimeError - 上传失败
示例:
# 上传图片
result = await client.upload.upload(image_bytes, "tweet_image")
# 上传视频(需要 duration_ms)
from x_api_rs.upload import UploadOptions
options = UploadOptions(duration_ms=10819.0)
result = await client.upload.upload(video_bytes, "amplify_video", options)
if result.success:
print(f"Media ID: {result.media_id_string}")
if result.processing_info:
print(f"Processing state: {result.processing_info.state}")
image¶
上传图片的便捷方法。
参数:
image_bytes(bytes): 图片二进制数据media_category(str): 媒体类别,可选值:tweet_image,dm_image,banner_image
返回: UploadResult 对象
异常: RuntimeError - 上传失败或使用了视频类别
示例:
with open("image.jpg", "rb") as f:
image_bytes = f.read()
result = await client.upload.image(image_bytes, "dm_image")
if result.success:
print(f"Media ID: {result.media_id_string}")
video¶
async def video(
video_bytes: bytes,
media_category: str,
duration_ms: float,
processing_timeout: int | None = None
) -> UploadResult
上传视频的便捷方法。
参数:
video_bytes(bytes): 视频二进制数据media_category(str): 媒体类别,可选值:amplify_video,dm_videoduration_ms(float): 视频时长(毫秒),必需processing_timeout(int | None): 处理超时(秒),默认 300
返回: UploadResult 对象,包含 processing_info
异常: RuntimeError - 上传失败或使用了图片类别
示例:
with open("video.mp4", "rb") as f:
video_bytes = f.read()
result = await client.upload.video(
video_bytes,
"amplify_video",
duration_ms=10819.0,
processing_timeout=600 # 10 分钟超时
)
if result.success:
print(f"Media ID: {result.media_id_string}")
print(f"Processing state: {result.processing_info.state}")
image_multiple_times¶
async def image_multiple_times(
image_bytes: bytes,
media_category: str,
count: int
) -> BatchUploadResult
批量上传同一张图片多次,获取多个独立的 media_id。
通过添加随机扰动,每次上传都会获得不同的 media_id,适用于批量发送私信时为每个用户使用独立的媒体附件。
参数:
image_bytes(bytes): 图片二进制数据media_category(str): 媒体类别count(int): 上传次数(必须 > 0)
返回: BatchUploadResult 对象
异常: RuntimeError - 批量上传失败
示例:
# 上传 3 次,获得 3 个不同的 media_id
result = await client.upload.image_multiple_times(
image_bytes,
"dm_image",
count=3
)
print(f"成功: {result.success_count}, 失败: {result.failure_count}")
print(f"Media IDs: {result.media_ids}")
# 配合批量私信使用
dm_result = await client.dm.send_batch(
user_ids=["123", "456", "789"],
text="请看这张图片",
media_ids=result.media_ids
)
类型定义¶
UploadResult¶
单次上传结果。
属性:
success(bool): 是否上传成功media_id(int | None): 媒体 ID(数字形式)media_id_string(str | None): 媒体 ID(字符串形式,推荐使用)error_msg(str): 错误消息(失败时有值)processing_info(ProcessingInfo | None): 视频处理信息(仅视频上传时有值)
方法:
is_video()-> bool: 是否为视频上传结果
示例:
result = await client.upload.image(image_bytes, "tweet_image")
print(result.success) # True
print(result.media_id) # 1234567890123456789
print(result.media_id_string) # "1234567890123456789"
print(result.is_video()) # False
BatchUploadResult¶
批量上传结果。
属性:
success_count(int): 成功上传的数量failure_count(int): 失败上传的数量media_ids(list[str]): 成功上传的 media_id 列表results(list[UploadResult]): 详细结果列表
示例:
result = await client.upload.image_multiple_times(image_bytes, "dm_image", 5)
print(f"成功: {result.success_count}")
print(f"失败: {result.failure_count}")
print(f"Media IDs: {result.media_ids}")
for r in result.results:
if r.success:
print(f"✓ {r.media_id_string}")
else:
print(f"✗ {r.error_msg}")
UploadOptions¶
上传选项,用于视频上传。
属性:
duration_ms(float | None): 视频时长(毫秒),视频上传必需processing_timeout(int | None): 处理超时(秒),默认 300
示例:
from x_api_rs.upload import UploadOptions
# 基础使用
options = UploadOptions(duration_ms=10819.0)
# 设置较长的超时时间
options = UploadOptions(
duration_ms=60000.0, # 1 分钟视频
processing_timeout=600 # 10 分钟超时
)
result = await client.upload.upload(video_bytes, "amplify_video", options)
ProcessingInfo¶
视频处理信息(仅视频上传时返回)。
属性:
state(ProcessingState): 处理状态check_after_secs(int | None): 建议等待时间(秒)progress_percent(int | None): 处理进度(百分比)error(ProcessingError | None): 处理错误信息(仅失败时有值)
示例:
result = await client.upload.video(video_bytes, "amplify_video", duration_ms=10000.0)
if result.processing_info:
info = result.processing_info
print(f"State: {info.state}")
print(f"Progress: {info.progress_percent}%")
print(f"Check after: {info.check_after_secs}s")
ProcessingState¶
视频处理状态。
属性:
state(str): 状态字符串,可能的值:"pending": 等待处理"in_progress": 处理中"succeeded": 处理完成"failed": 处理失败
方法:
is_succeeded()-> bool: 是否处理成功is_failed()-> bool: 是否处理失败is_pending()-> bool: 是否仍在处理中
示例:
if result.processing_info:
state = result.processing_info.state
if state.is_succeeded():
print("视频处理完成")
elif state.is_failed():
print("视频处理失败")
elif state.is_pending():
print("视频处理中...")
ProcessingError¶
视频处理错误信息。
属性:
code(int): 错误代码name(str): 错误名称message(str): 错误消息
示例:
if result.processing_info and result.processing_info.error:
error = result.processing_info.error
print(f"Error {error.code}: {error.name}")
print(f"Message: {error.message}")
使用示例¶
基础图片上传¶
import asyncio
from x_api_rs import Twitter
async def main():
client = Twitter(cookies)
# 读取图片
with open("image.jpg", "rb") as f:
image_bytes = f.read()
# 上传图片
result = await client.upload.image(image_bytes, "tweet_image")
if result.success:
print(f"上传成功!Media ID: {result.media_id_string}")
else:
print(f"上传失败: {result.error_msg}")
asyncio.run(main())
视频上传¶
async def upload_video():
client = Twitter(cookies)
with open("video.mp4", "rb") as f:
video_bytes = f.read()
# 上传视频(需要提供时长)
result = await client.upload.video(
video_bytes,
"amplify_video",
duration_ms=10819.0
)
if result.success:
print(f"上传成功!Media ID: {result.media_id_string}")
# 检查处理状态
if result.processing_info:
state = result.processing_info.state
print(f"处理状态: {state}")
if state.is_succeeded():
print("视频已可用")
elif state.is_pending():
print("视频仍在处理中")
批量上传用于私信¶
async def batch_dm_with_images():
client = Twitter(cookies)
# 读取图片
with open("promo.jpg", "rb") as f:
image_bytes = f.read()
# 批量上传获取多个 media_id
upload_result = await client.upload.image_multiple_times(
image_bytes,
"dm_image",
count=3
)
if upload_result.success_count == 3:
# 批量发送私信
user_ids = ["123", "456", "789"]
dm_result = await client.dm.send_batch(
user_ids,
"请查看这张推广图片",
media_ids=upload_result.media_ids
)
print(f"发送成功: {dm_result.success_count}")
上传后发帖¶
async def tweet_with_image():
client = Twitter(cookies)
with open("photo.jpg", "rb") as f:
image_bytes = f.read()
# 上传图片
upload_result = await client.upload.image(image_bytes, "tweet_image")
if upload_result.success:
# 发布带图帖子
tweet_result = await client.posts.create_tweet(
text="分享一张照片 📸",
media_ids=[upload_result.media_id_string]
)
if tweet_result.success:
print(f"发帖成功!Tweet ID: {tweet_result.tweet_id}")
上传后发私信¶
async def dm_with_image():
client = Twitter(cookies)
with open("gift.png", "rb") as f:
image_bytes = f.read()
# 上传图片(使用 dm_image 类别)
upload_result = await client.upload.image(image_bytes, "dm_image")
if upload_result.success:
# 发送带图私信
dm_result = await client.dm.send_message(
"123456",
"送你一张图片 🎁",
media_id=upload_result.media_id_string
)
if dm_result.success:
print(f"发送成功!Event ID: {dm_result.event_id}")
最佳实践¶
1. 选择正确的媒体类别¶
# 发帖用 tweet_image
await client.upload.image(image_bytes, "tweet_image")
# 私信用 dm_image
await client.upload.image(image_bytes, "dm_image")
# 背景图用 banner_image
await client.upload.image(image_bytes, "banner_image")
# 错误:私信时使用 tweet_image 可能导致问题
# ❌ await client.upload.image(image_bytes, "tweet_image") # 用于私信
2. 验证上传结果¶
result = await client.upload.image(image_bytes, "tweet_image")
# 始终检查 success 状态
if not result.success:
print(f"上传失败: {result.error_msg}")
return
# 使用 media_id_string 而不是 media_id(避免精度问题)
media_id = result.media_id_string # ✓ 推荐
# media_id = str(result.media_id) # ✗ 可能有精度问题
3. 处理视频上传¶
# 视频上传必须提供 duration_ms
result = await client.upload.video(
video_bytes,
"amplify_video",
duration_ms=10819.0 # 必需参数
)
# 检查处理状态
if result.success and result.processing_info:
if result.processing_info.state.is_pending():
# 视频仍在处理中,等待后再使用
print("视频处理中,请稍后...")
4. 批量上传优化¶
# 批量上传会并发执行,效率高
result = await client.upload.image_multiple_times(
image_bytes,
"dm_image",
count=10 # 一次获取 10 个 media_id
)
# 对于大量上传,分批执行
total = 100
batch_size = 20
all_media_ids = []
for i in range(0, total, batch_size):
count = min(batch_size, total - i)
result = await client.upload.image_multiple_times(
image_bytes, "dm_image", count
)
all_media_ids.extend(result.media_ids)
await asyncio.sleep(1) # 避免限流
5. 错误处理¶
try:
result = await client.upload.image(image_bytes, "tweet_image")
if result.success:
# 成功处理
return result.media_id_string
else:
# 业务失败
log_error(f"上传失败: {result.error_msg}")
return None
except Exception as e:
# 异常处理
log_exception(f"上传异常: {e}")
return None
常见问题¶
Q1: 图片上传支持哪些格式?¶
支持 JPEG、PNG、GIF、WebP 等常见图片格式。建议使用 JPEG 或 PNG 格式。
Q2: 视频上传支持哪些格式?¶
支持 MP4(H.264 编码)格式。建议:
- 视频编码:H.264
- 音频编码:AAC
- 最大文件大小:512MB
- 最大时长:140 秒(帖子视频)
Q3: 为什么需要批量上传同一张图片?¶
Twitter 要求每条私信使用独立的 media_id。如果多条私信使用相同的 media_id,只有第一条会显示图片。使用 image_multiple_times 可以为同一张图片生成多个独立的 media_id。
Q4: 上传时出现 "无效的媒体类别" 错误?¶
确保使用正确的类别字符串:
# 正确的类别值
"tweet_image" # 帖子图片
"dm_image" # 私信图片
"banner_image" # 背景图片
"amplify_video" # 帖子视频
"dm_video" # 私信视频
# 错误示例
"image" # ✗ 错误
"tweetImage" # ✗ 错误
"TWEET_IMAGE" # ✗ 错误
Q5: 视频上传后状态一直是 "processing"?¶
视频上传后需要 Twitter 服务器处理,这可能需要几秒到几分钟。可以增加 processing_timeout 参数:
result = await client.upload.video(
video_bytes,
"amplify_video",
duration_ms=60000.0,
processing_timeout=600 # 10 分钟超时
)
Q6: 如何获取视频时长?¶
可以使用第三方库如 moviepy 或 ffprobe 获取视频时长:
from moviepy.editor import VideoFileClip
clip = VideoFileClip("video.mp4")
duration_ms = clip.duration * 1000 # 转换为毫秒
clip.close()
result = await client.upload.video(video_bytes, "amplify_video", duration_ms)
下一步¶
- 查看 DM 模块文档 了解如何发送带媒体的私信
- 查看 Posts 模块文档 了解如何发布带媒体的帖子
- 查看 示例代码 了解更多用法