整个自动化流程已经完全走通,代码内增加了多种异常处理
第一版链接(很粗糙):https://www.jianshu.com/p/d837e70eb33a
提醒:有些模块是在我项目文件夹下自己写的,pip安装不了;
from pytube import YouTube
import requests
from fake_useragent import UserAgent
import re
from selenium import webdriver
import ssl
from lxml import etree
import time
from selenium.webdriver.common.keys import Keys
from Google import GoogleTranslate
import redis
opt = webdriver.ChromeOptions()
opt.add_argument('--headless')
#更换头部
cookie = [{'name': 'pt2gguin', 'value': 'o0654921690'}, {'name': 'RK', 'value': 'mSglPLOFQ8'}, {'name': 'ptcz', 'value': '5eb1aeb628b6ab67aa306285a78434959b385f8e39415bda4ff4db3ea7763d75'}, {'name': 'pgv_pvid', 'value': '116706325'}, {'name': 'pgv_pvi', 'value': '1972684800'}, {'name': 'pgv_si', 'value': 's3116448768'}, {'name': 'pgv_info', 'value': 'ssid'}, {'name': 'uin', 'value': 'o0654921690'}, {'name': 'skey', 'value': '@7pMFazBu1'}, {'name': 'ptisp', 'value': 'ctc'}, {'name': 'ts_refer', 'value': 'www.baidu.com/link'}, {'name': 'ts_uid', 'value': '3500005680'}, {'name': 'userid', 'value': '5661917'}, {'name': 'fname', 'value': '%E5%BC%BA%E8%BF%AB%E7%97%87%E7%9A%84%E5%B0%8F%E4%B9%90%E8%B6%A3'}, {'name': 'fimgurl', 'value': 'http%3A%2F%2Finews.gtimg.com%2Fnewsapp_ls%2F0%2F1807099981_200200%2F0'}, {'name': 'omtoken', 'value': '3799d6a2c9'}, {'name': 'omtoken_expire', 'value': '1531158354'}, {'name': 'alertclicked', 'value': '%7C%7C'}, {'name': 'rmod', 'value': '1'}]
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.62 Safari/537.36'
opt.add_argument('user-agent="%s"' % ua)
# opt.add_argument('cookie=%s'% cookie)
driver = webdriver.Chrome(chrome_options=opt)
driver.maximize_window()
redis_cli = redis.Redis(host='127.0.0.1',port=6379)
class QIE:
def __init__(self):
self.ua = UserAgent()
# 上传异常计数,超过阈值放弃当前上传的视频
self.upload_error_handling_num = 0
# 拿到目标视频的链接
def get_video_urls(self):
while True:
# index_page = 'https://www.youtube.com/channel/UC6XKMuLChaxW4Isw0BTZI8A/videos'
index_page = 'https://www.youtube.com/channel/UC6XKMuLChaxW4Isw0BTZI8A/videos?view=0&flow=grid&sort=dd'
headers = {
'User-Agent':self.ua.random
}
proxies = {
'https':'127.0.0.1:1087'
}
response = requests.get(index_page,headers=headers,proxies=proxies).text
list_urls = re.findall(r'"(/watch\?v=.+?)"',response)
if len(list_urls)>0:
list_urls = set(list_urls)
list_urls = list(list_urls)
print(list_urls)
# 取前3个视频
if len(list_urls)>3:
list_urls = list_urls[:3]
return list_urls
else:
print('匹配失败')
print(response)
time.sleep(2)
# 下载所有需要的视频
def download_vides(self):
result = self.get_video_urls()
if result != 'no':
# 文件名列表
self.names = []
# 下载计数
num = 0
for url in result:
redis_result = redis_cli.get(url)
if redis_result == None:
redis_cli.set(url,1)
print('{}已经写入redis'.format(url))
print('一共{}个视频'.format(len(result)))
ssl._create_default_https_context = ssl._create_unverified_context
url = 'https://www.youtube.com' + url
yt = YouTube(url=url,proxies={'https':'127.0.0.1:1087'})
data = yt.streams.first()
file_name = data.default_filename
print('视频开始下载......')
# 调用google翻译生成新的文件名
go = GoogleTranslate(file_name)
file_name = go.get_translated()
print(file_name)
data.download(filename=file_name)
num += 1
print('第%d个视频下载完成,文件名是:%s' % (num,file_name))
self.names.append(file_name + '.mp4')
print(self.names)
if num > 0:
return 'ok'
else:
return 'no'
else:
return 'no'
# 连接youtube和腾讯
def context(self):
result = self.download_vides()
if result == 'ok':
for file_name in self.names:
self.set_cookie()
self.upload(file_name)
# 上传所有本地的视频,每日最多3条
def set_cookie(self):
# 请求目标地址
url = 'https://om.qq.com/article/articlePublish#/!/view:article?typeName=multivideos'
driver.get(url)
driver.implicitly_wait(18)
# 给目标地址添加cookie
driver.delete_all_cookies()
for c in cookie:
new = dict(c,**{
"domain": ".qq.com",
"expires": "",
'path': '/',
'httpOnly': False,
'HostOnly': False,
'Secure': False,
})
driver.add_cookie(new)
# 再次请求目标地址
driver.get(url)
driver.implicitly_wait(18)
# 文件导入,上传
def upload(self,file_name):
# 将本地文件上传到页面的input标签
path1 = '/Users/admin/Documents/qiehao2/'
# path1 = '/home/ubuntu/Documents/tencent/'
# file_name = "化妆变身迪士尼公主.mp4"
# 标题名需要小于30个字符
# file_name = file_name.split(' ')[0]
file_name2 = path1+file_name
driver.find_element_by_xpath('//input').send_keys(file_name2)
path = '进入视频上传主页.png'
driver.save_screenshot(path)
print('进入视频上传主页截图保存成功')
# 尝试cookie登陆,如果失败模拟登陆
try:
status = driver.find_element_by_xpath('//span[@class="text-title"]').get_attribute('textContent').strip()
print(status)
except:
print('进入失败,可能cookie已经过期,重新模拟登陆')
driver.delete_all_cookies()
url = 'https://om.qq.com/article/articlePublish#/!/view:article?typeName=multivideos'
driver.get(url)
driver.implicitly_wait(18)
driver.find_element_by_xpath('//input[@name="email"]').send_keys('xxxx你的账号')
driver.find_element_by_xpath('//input[@name="password"]').send_keys('oooo你的密码')
driver.find_element_by_xpath('//button[@class="btnLogin btn btn-primary"]').click()
time.sleep(30)
driver.get(url)
driver.implicitly_wait(18)
self.upload(file_name)
else:
# 等待上传完成
num = 0
while True:
time.sleep(60)
num += 1
status = driver.find_element_by_xpath('//span[@class="text-title"]').get_attribute('textContent').strip()
path = '载入进度.png'
driver.save_screenshot(path)
print('已经上传了第%d分钟'%num)
if status == '共1个视频,已上传1个':
print('ok')
break
path = 'success.png'
driver.save_screenshot(path)
print('文件导入成功')
#滚动到目标位置 选择分类
# 上传错误处理
self.upload_error_handling(file_name)
# 封装上传错误处理
def upload_error_handling(self,file_name):
try:
command = driver.find_element_by_xpath('//span[contains(./text(),"请选择分类")]')
except:
# 不超过阈值重试
self.upload_error_handling_num +=1
if self.upload_error_handling_num < 4:
print('文件上传出现异常,准备重试,重试第{}次'.format(self.upload_error_handling_num))
self.upload(file_name)
else:
# 超过阈值初始化计数器,放弃本条视频上传;
self.upload_error_handling_num = 0
print('上传异常次数过多,可能本条视频已经传过了,放弃。准备上传下一条......')
pass
else:
driver.execute_script("arguments[0].scrollIntoView();", command)
command.click()
time.sleep(1)
# 滚动到目标位置,输入内容并且回车
command = driver.find_element_by_xpath('//div[@class="chosen-search"]/input')
driver.execute_script("arguments[0].scrollIntoView();", command)
command.send_keys('少儿节目')
command.send_keys(Keys.ENTER)
# 将页面滚动到底部
js = "var q=document.documentElement.scrollTop=10000"
driver.execute_script(js)
time.sleep(10)
# 选择标签,默认选择9个标签
self.get_tag()
# 选择封面图
driver.find_element_by_xpath('//button[contains(./text(),"设置封面")]').click()
# 滚动到目标位置并且点击
# 如果封面图加载超时过长那么就放弃本次上传
num = 0
while True:
time.sleep(60)
try:
command = driver.find_element_by_xpath(
'//div[@class="upload-cover-block"]/span/img[contains(./@src,"1.jpg")]')
print('图片加载好了')
driver.execute_script("arguments[0].scrollIntoView();", command)
command.click()
break
except:
num += 1
print('图片还没加载好,继续等待')
# 放弃上传
if num > 10:
print('放弃本次上传')
break
# 如果加载超时,放弃执行接下来语句
try:
driver.find_element_by_xpath('//button[contains(./text(),"确定")]').click()
path = '封面设置成功.png'
driver.save_screenshot(path)
# 发布
time.sleep(3)
num = 0
while True:
try:
driver.find_element_by_xpath('//button[@action="publish"]').click()
path = '上床成功.png'
driver.save_screenshot(path)
print('上传成功')
break
except:
print('设置封面图窗口关闭中')
time.sleep(5)
num += 1
if num > 3:
print('上传按钮等待超时')
break
except:
# 放弃上传
pass
# 填写标签
def get_tag(self):
single = driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[1]/a').get_attribute('textContent').strip()
print(single)
if single != '暂无':
# 滚动到标签位置
command = driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[1]/a')
driver.execute_script("arguments[0].scrollIntoView();", command)
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[1]/a').click()
except:
print('没有推荐的标签')
driver.find_element_by_xpath('//div/input[contains(./@id,"videoTags")]').send_keys('玩具')
driver.find_element_by_xpath('//div/input[contains(./@id,"videoTags")]').send_keys(Keys.ENTER)
# driver.find_element_by_xpath('//label[contains(./text(),"标签")]').click()
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[2]/a').click()
except:
print('没有第二个推荐的标签')
driver.find_element_by_xpath('//div/input[contains(./@id,"videoTags")]').send_keys('欧美流行')
driver.find_element_by_xpath('//div/input[contains(./@id,"videoTags")]').send_keys(Keys.ENTER)
# driver.find_element_by_xpath('//label[contains(./text(),"标签")]').click()
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[3]/a').click()
except:
print('没有第三个推荐的biaoqian ')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[4]/a').click()
except:
print('没有第四个推荐的标签')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[5]/a').click()
except:
print('没有第五个推荐的标签')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[6]/a').click()
except:
print('没有第六个推荐的标签')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[7]/a').click()
except:
print('没有第七个推荐的标签')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[8]/a').click()
except:
print('没有第八个推荐的标签')
try:
driver.find_element_by_xpath('//li[contains(./text(),"推荐标签")]/following::li[9]/a').click()
except:
print('没有第九个推荐的标签')
else:
# 滚动到目标位置,手动输入标签
command = driver.find_element_by_xpath('//div/input[contains(./@id,"videoTags")]')
driver.execute_script("arguments[0].scrollIntoView();", command)
command.send_keys('玩具')
command.send_keys(Keys.ENTER)
command.send_keys('欧美流行')
command.send_keys(Keys.ENTER)
# 每24小时更新一次
def main(self):
while True:
self.context()
time.sleep(24*60*60)
if __name__ == '__main__':
qie = QIE()
qie.main()