主要流程如下
- 预上传
- 上传
- 开始上传
- 分片上传
- 上传完成)
- 标签
- 分区预测
- 获取标签
- 发布
preupload 预上传
url
from urllib.parse import urlencode, quote, unquote
url_query = urlencode(url_query)
lk = f"https://member.bilibili.com/preupload"
lk = f"{lk}?{url_query}"
url_query = {
"probe_version": "20221109", # "20200224"
"upcdn": "qn",
"zone": "cs",
"name": quote(self._name),
"r": "upos",
"profile": "ugcfx/bup",
"ssl": "0",
"version": "2.14.0.0", # "2.7.1"
"build": "2140000", # "2070100"
"size": self.size,
# "webVersion": "2.14.0",
}
result = requests.get(lk, headers=self._headers, verify=False)
上传
upload 开始上传
url_query = {
"uploads": None,
"output": "json",
"profile": "ugcfx/bup",
"filesize": self.size,
"partsize": self.chunk_size,
"biz_id": self.biz_id,
# "meta_upos_uri": self.upos_uri,
# "webVersion": "2.14.0",
}
url_query = urlencode(url_query)
temp_path = self.upos_uri.replace("upos://","")
lk_base = f"https://upos-cs-upcdnqn.bilivideo.com/{temp_path}"
lk = f"{lk_base}?{url_query}"
data = "uploads&output=json"
self._headers.update({"x-upos-auth": self.auth})
result = requests.post(lk, data=data, headers=self._headers, verify=False)
视频分片上传(视频片段)
start = (part_num-1) * self.chunk_size
url_query = {
"partNumber": part_num, # 当前视频片段 index(从1开始)
"uploadId": self.upload_id,
"chunk": part_num-1, # 前一视频片段 index(从0开始)
"chunks": part_number, # 视频片段总数
"partsize": self.chunk_size,
"start": start, # 视频片段开始位置
"end": part_num * self.chunk_size, # 视频片段结束位置
"total": self.size, # 视频总大小
}
url_query = urlencode(url_query)
lk_base = f"https://upos-cs-upcdnqn.bilivideo.com/{temp_path}"
lk = f"{lk_base}?{url_query}"
open_file.seek(start)
content_data = open_file.read(self.chunk_size)
result = requests.put(lk, data=content_data, headers=self._headers, verify=False)
视频上传完成
url_query = {
"partNumber": part_num,
"output": "json",
"name": quote(self._name),
"profile": "ugcfx/bup",
"uploadId": self.upload_id,
"biz_id": self.biz_id,
}
url_query = urlencode(url_query)
lk = f"{lk_base}?{url_query}"
payload_data = {
"parts": parts_list
}
payload_data = json.dumps(payload_data)
result = requests.post(lk, data=payload_data, headers=self._headers, verify=False)
标签
分区预测
def archive_types_predict(self):
"""
分区预测
TODO self.filename,self.title,self.subtype_id
"""
lk = "https://member.bilibili.com/x/vupre/web/archive/types/predict"
t = int(time.time()* 1000) # 毫秒时间戳
url_query = {
"t": t,
"csrf": self._csrf,
}
url_query = urlencode(url_query)
# 获取 filename title
self.filename = re.split("[/\.]",self.upos_uri)[-2]
self.title = self._name.replace(".mp4", "")
lk = f"{lk}?{url_query}"
payload_data = {
"upload_id": self.upload_id,
"title": self.title, # "title": self._name[:self._name.find(".")],
"filename": self.filename,
}
result = requests.post(lk, data=payload_data, headers=self._headers, verify=False)
result_dict = None
if result.status_code == 200:
result_dict = json.loads(result.text)
# 获取 subtype_id
self.subtype_id = result_dict["data"][0]["id"]
else:
print("="*20)
print(f"ERROR: {result.status_code}")
print("="*20)
return
if not result_dict:
print("result_dict is error")
return
获取标签
def get_tag(self):
"""
获取标签
TODO self.tags
"""
lk = "https://member.bilibili.com/x/vupre/web/tag/recommend"
t = int(time.time()* 1000) # 毫秒时间戳
import re
filename = re.split("[/\.]",self.upos_uri)[-2]
title = self._name.replace(".mp4", "")
url_query = {
"upload_id": self.upload_id,
"subtype_id": self.subtype_id,
"title": title, # "title": self._name[:self._name.find(".")],
"filename": filename,
# "description": "",
# "cover_url": "",
"t": t
}
url_query = urlencode(url_query)
lk = f"{lk}?{url_query}"
result = requests.get(lk, headers=self._headers, verify=False)
result_dict = None
if result.status_code == 200:
result_dict = json.loads(result.text)
tag_list = [i["tag"] for i in result_dict["data"]]
self.tags = ",".join(tag_list)
else:
print("="*20)
print(f"ERROR: {result.status_code}")
print("="*20)
return
if not result_dict:
print("result_dict is error")
return
print()
发布
def release(self):
"""
发布
"""
lk = "https://member.bilibili.com/x/vu/web/add/v3"
t = int(time.time()* 1000) # 毫秒时间戳
url_query = {
"t": t,
"csrf": self._csrf,
}
url_query = urlencode(url_query)
lk = f"{lk}?{url_query}"
payload_data = {
# "cover": "https://archive.biliimg.com/bfs/archive/f0f2a39ece5ece59d4d60e40d9a3595c2673d1b7.jpg", # 封面
"title": self.title,
"copyright": 1, # 版权类型:1自制,2转载
"tid": self.subtype_id,
"tag": self.tags,
# "desc_format_id": 9999,
# "desc": "",
# "dtime": 1703405438, # 定时发布
# "recreate": -1,
# "dynamic": "",
# "interactive": 0,
"videos": [
{
"filename": self.filename,
"title": self.title,
"desc": "",
"cid": self.biz_id
}
],
# "act_reserve_create": 0,
# "no_disturbance": 0,
# "no_reprint": 1,
# "subtitle": {
# "open": 0,
# "lan": ""
# },
# "open_elec": 1,
# "dolby": 0,
# "lossless_music": 0,
# "up_selection_reply": false,
# "up_close_reply": false,
# "up_close_danmu": false,
# "web_os": 2,
"csrf": self._csrf
}
payload_data = json.dumps(payload_data)
result = requests.post(lk, data=payload_data, headers=self._headers, verify=False)
result_dict = None
if result.status_code == 200:
result_dict = json.loads(result.text)
if result_dict["code"] == 601:
# TODO 验证码
#
# self.v_voucher = result_dict["data"]["v_voucher"]
# self.register()
return
else:
print("="*20)
print(f"ERROR: {result.status_code}")
print("="*20)
return
if not result_dict:
print("result_dict is error")
return
其他猜测
# 参与话题
https://member.bilibili.com/x/vupre/web/topic/type?type_id=122&pn=0&ps=6&title=&t=1703380719968
# 封面上传
https://member.bilibili.com/x/vu/web/cover/up?t=1703385286457
- 查询参数
- t: 1703385286457
- body
- 二进制图片数据
在此感谢 菜鸟才能学的更多 的文章帮我省了很多的事
到此结 DragonFangQy 2023.12.24