Download_video --功能说明
@功能1:检测能否连接到数据库、获取数据库集合中总共的数据(条)
@功能2:连接数据库查询数据
@功能3:批量下载视频、异步非阻塞多线程
@功能4:工作进度保存机制
-
实际效果
开启 “从上一次下载进度的位置启动.png
关闭 “从上一次下载进度的位置启动.png
运行(多线程下载4线程).png
Requests_video --功能说明
@功能1:批量爬取数据(发送请求、获取标题、获取高清链接、获取标清链接、)
@功能2:可视化进度条
@功能3:连接数据库存储数据
-
实际效果
运行状态.png
声明:该文章所有代码仅供学习参考使用,若使用不法途径本人一概不负。
# -*- encoding: utf-8 -*-
"""
@File : Download_video.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/8/30 20:55 MuKe~ 1.0 None
"""
import sys
import time
import pymongo
import os
import threading
# ================================================= 检测网络\统计 ========================================================
def Mongodb_find_net_num(Host,Post,input_Nome):
"""
检测能否连接到数据库、获取数据库集合中总共的数据(条)
:param Host: ip地址
:param Post: 端口号
:param input_Nome: 数据库集合
:return: 返回当前数据库集合中总共的数据(条)
"""
try: # 捕获连接数据库时可能发生的异常
Client = pymongo.MongoClient(host=Host, port=Post) # 连接数据库
db = Client[Mongodb_name] # 数据库名称
Data_db = db[input_Nome] # 集合名称
Mongodb_num = Data_db.find().count() # 查询数据库当前集合中数据的总共数量(条)
return Mongodb_num # 返回统计后的数量
except: # 发生异常后,并返回False
return False
# ==================================================== 查询数据库 ========================================================
def Mongodb_post(Host,Post,Mongodb_name,input_Nome,page_data):
"""
连接数据库获取数据
:param Host: ip地址
:param Post: 端口号
:param Mongodb_name:数据库名称
:param input_Nome:数据库集合
:param Yema: ‘当前页面’id
:return: 字典、数据内容:(作品名称、上架时间、下载链接)
"""
Client = pymongo.MongoClient(host=Host, port=Post) # 连接数据库
db = Client[Mongodb_name] # 数据库名称
print("[INFO] 当前数据库名称:" + input_Nome) # 输出集合名称
Data_db = db[input_Nome] # 输出集合名称
results = Data_db.find({"写入序号":str(page_data)}) # 查询下数据库中的key值
for key in results: # 历遍获取到的字典数据
# print("video_nome:", key['作品标题'] + ' ' + key['时间数据'], key['标清下载'])
Mongodb_DataDB = {
"Data_db_name":key['作品标题'],
"Data_db_page":key['页面位置'],
"Data_db_date":key['时间数据'],
"Data_db_download":key['标清下载']
}
return Mongodb_DataDB # 对字典中的数据重命名,并返回
# ==================================================== 下载文件 =======================================================
def Download_URL_video(Mongodb_DataDB,video_path):
"""
下载视频、异步非阻塞多进程
Mongodb_DataDB = {
:param video_name: 作品名称
:param video_date: 上架时间
:param video_downloadurl: 下载链接
:param video_path: 文件存储路径
}
:return: Nome
"""
video_name = Mongodb_DataDB['Data_db_name'] # 获取字典中key值
video_date = Mongodb_DataDB['Data_db_date']
video_downloadurl = Mongodb_DataDB['Data_db_download']
print('==============================================================================================================')
print("[INFO] "+video_name)
print("[INFO] "+video_date)
print("[++++] "+video_path)
print("[INFO] "+video_downloadurl)
# you-get使用方法参考文章 https://github.com/soimort/you-get/blob/develop/README.md
# you-get -o 路径 -O 文件名 链接
# print(f'you-get -o {video_path} -O {video_name} {video_date} {video_downloadurl}')
if not os.path.exists(video_path + '\\' + video_name + '.mp4'): # 判断文件是否存在
os.system(f'you-get -o {video_path} -O {video_name} {video_date} {video_downloadurl}') # 通过传参调用’you-get‘下载文件
else:
print("[INFO] 已存在此作品",video_name + '.mp4')
# =================================================== 记录下载进度 ====================================================
def Detect_MP4(video_path,page_data):
"""
写入下载进度数据(计数)
:param video_path: 文件的存储路径
:param page_data: 数据库“当前页面”数据
:return: None
"""
try: # 用于捕获IO偶尔会发生的意外(如:写入文件被占用)
with open(video_path+'\\count.txt','w',encoding='utf-8') as count:
count.write(str(page_data + 1)) # 写入字符串 (计数)
except OSError as Os: # 捕获并输出异常
print(f'[INFO] 无法写入下载进度,异常原因:{Os}')
# ==================================================== 配置信息 ======================================================
Host = "10.52.xx.xx" # Mongodb数据库地址(ip)
Post = 27017 # Mongodb数据库端口
Mongodb_name = "video_URL" # Mongodb数据库名
# (批量操作)Mongodb数据库集合名,以及起始位置
URL_class_dict = {
"vlg_抖音":2,
"国产网剧":2,
"卡通动漫":2,
"微视频":3
}
# input_Nome = "yazhouqingse" # (单个操作)Mongodb数据库集合名
OFF_ON_page = True # 开启/关闭 “从上一次下载进度的位置启动”(若读取失败或者为0则默认为字典的默认初始化值)
video_path = r'D:\Pyfile\脚本\视频url收集' # 文件存储路径
thread_time = int(0.2) # 线程减速
thread_list = [] # 存储线程
thread_num = int(os.cpu_count()) # 定义线程数量(检测cpu核数)
# ==================================================== 程序入口(main) ====================================================
if __name__=='__main__': # 定义一个main
# 该方法只适合(批量操作),数据库集合只有一个情况不需要一下三行代码
for D in URL_class_dict: # 历遍包含数据库集合名称字典及起始位置
input_Nome = D # 获取key(集合名称)
page_data = URL_class_dict[D] # 获取起始位置
Mongodb_num = Mongodb_find_net_num(Host, Post, input_Nome) # 实例化“检测网络\统计”方法
if Mongodb_num != False: # 判断数据库连接是否异常
print("[INFO] 数据库已连接")
for i in range(1,Mongodb_num): # 历遍数据库当前集合中数据的总数(条)
if OFF_ON_page == True: # 判断“配置信息”如果用户开启则读取上一次的下载进度
try: # 捕获IO在读写时偶尔会发生的意外
with open(video_path+'\\count.txt','r',encoding='utf-8') as count0: # 读取“count.txt”配置文件(用于存储下载进度)
page_data = int(count0.read()) # 读取数值
print(f'[++++] 上一次进度停留在:{page_data}')
except: # 捕获异常并赋值为字典的默认初始化值作为数据库查询条件
print("[INFO] 存储进度文件“count.txt”已丢失!")
with open(video_path + '\\count.txt', 'w+', encoding='utf-8'):
pass
page_data = URL_class_dict[D]
print(f"[INFO] 已改为字典的默初始化值{page_data}")
Mongodb_DataDB = Mongodb_post(Host, Post,Mongodb_name,input_Nome,page_data) # 实例化“查询数据库”方法
t = threading.Thread(target=Download_URL_video, args=(Mongodb_DataDB,video_path)) # 创建多线程、调用“下载文件”方法
thread_list.append(t) # 向列表中加入线程池
t.start() # 启动线程
print('[INFO] 线程池',thread_list[0]) # 输出线程池信息
while len(thread_list) > thread_num: # 统计线程池总数,若大于获取到cpu核数(如:4核)
thread_list = [x for x in thread_list if x.is_alive()] # 历遍线程池中的线程,判断线程是否在运行
time.sleep(thread_time) # 设备性能较差防止循环过快
# print('剩余线程池________' + str(thread_list))
if page_data <= Mongodb_num: # 判断如果数据库中总数小于当前下载进度 则继续写入下载进度(用于切换数据库集合时防止下载进度累计增加)
Detect_MP4(video_path,page_data) # 实例化“记录下载进度”方法,写入下载进度一边下次启动使用
elif page_data >= Mongodb_num:
print(f"[INFO] 已达到数据库阈值!{input_Nome}集合总数:{Mongodb_num}")
time.sleep(10)
Detect_MP4(video_path,0) # 判断如果数据库中总数大于当前下载进度 则重置写入下载进度(用于切换数据库集合时重置下载进度)
break # 跳出循环切换数据库集合
elif Mongodb_num == False: # 判断“检测网络\统计”是否异常,并输出信息
print("[INFO] 数据库连接失败!")
# -*- encoding: utf-8 -*-
"""
@File : Requests_video.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/7/4 22:23 MuKe~ 1.0 None
"""
"""
@File : Requests_video.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/7/4 22:23 MuKe~ 1.0 None
"""
import time
import requests
import csv
import os
import datetime
import pymongo
from lxml import etree
from tqdm import tqdm
# 网页类目,字典:卡通动漫、国产网剧、vlg_抖音、微视频,列表:开始位置,结束位置
URL_class_dict = {
"卡通动漫":[2,58],
"国产网剧":[2,123],
"vlg_抖音:[2,135],
"微视频":[3,78]
}
for i in URL_class_dict.items():
il = list(i)
input_Start = il[1][0] # 起始页
input_Stop = il[1][1] # 终止页
input_Nome = il[0] # 数据库集合名(表)、网页视频类目参数
# ======================================================================================================================
def GET_urlnome():
"""
发送请求、获取标题、获取高清链接、获取标清链接、
:return:"页面位置":i+1,
"时间数据":time_data,
"作品标题":nome_video,
"高清下载":HD_video,
"""
for i in range(input_Start,input_Stop): # 实现翻页功能
URL = "https://taxm.com/{Class_url}/index_{i}.html".format(i=i,Class_url=input_Nome)
headers = {
"referer": "https://taxm.com/",
"User-Agent": "Mozilla/4.0 (Windows NT 11.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4430.85 Safari/537.16"
}
# 参考文章https://www.cnblogs.com/Renyi-Fan/p/13266518.html#:~:text=requests.Session,%28%29%EF%BC%9A%E7%BB%B4%E6%8C%81%E4%BC%9A%E8%AF%9D%EF%BC%8C%E5%8F%AF%E4%BB%A5%E8%AE%A9%E6%88%91%E4%BB%AC%E5%9C%A8%E8%B7%A8%E8%AF%B7%E6%B1%82%E6%97%B6%E4%BF%9D%E5%AD%98%E6%9F%90%E4%BA%9B%E5%8F%82%E6%95%B0%EF%BC%8C%E8%BF%99%E6%A0%B7%E5%8F%AF%E4%BB%A5%E5%BE%97%E5%88%B0%E4%B8%80%E4%B8%AAsession%E5%AF%B9%E8%B1%A1%EF%BC%8C%E8%80%8C%E8%BF%99%E4%B8%AAsession%E5%AF%B9%E8%B1%A1%E6%98%AF%E5%8F%AF%E4%BB%A5%E5%8F%91%E9%80%81%E5%90%84%E7%A7%8D%E8%AF%B7%E6%B1%82%E7%9A%84
Session = requests.Session() # Session() 持会话,可以让我们在跨请求时保存某些参数,这样可以得到一个session对象,而这个session对象是可以发送各种请求的
GET_url = Session.get(URL,headers=headers).text # 发送get请求,并调用text函数将数据类型转换为文本信息
GET_html = etree.HTML(GET_url) # 使用lxml库的etree下的HTML()方法解析text文本为html
bue = tqdm(range(1,36)) # 实例tqdm对象 历遍url,从1开始36结束
for i_s in bue:
Ye_url = GET_html.xpath('/html/body/div[11]/div[5]/ul/li[{i}]/a/@href'.format(i=int(i_s))) # 获取作品链接 href属于属性
video_url = "https://taxm.com"+Ye_url[0] # 拼接作品url(每页共有36个作品)
response = Session.get(video_url, headers=headers) # 调用Session的get()方法发送请求,参数(url,请求头)
response.encoding = response.apparent_encoding # html文本为外语,转换编码 注意:没有转换会出现乱码
video_html = etree.HTML(response.text)# 发送get请求,并调用text函数将数据类型转换为文本信息,并使用lxml库的etree下的HTML()方法解析text文本为html
HD_video = video_html.xpath('/html/body/div[11]/div[5]/div[7]/ul[2]/li[1]/a/@href') # 获取标清链接
nome_video = video_html.xpath('/html/body/div[11]/div[3]/span[4]/text()') # 获取作品标题
# print("标清下载:",HD_video)
# print("作品标题:",nome_video) # 进度条标题,格式化字符串参数(页码,标题)
bue.set_description('第{State}页:{print_nome}'.format(State=i+1,print_nome=nome_video))
now = datetime.datetime.now() # 实例化datetime库的now()方法
time_data = now.strftime('%Y-%m-%d %H:%M:%S') # 获取当前时间
input_Mongdb = {
"页面位置":i+1,
"时间数据":time_data,
"作品标题":nome_video[0],
"高清下载":HD_video[0],
}
# print(input_Mongdb)
# 实例化save_to_mongo()(自定义的方法,连接Mongdb数据库、存储数据)
save_to_mongo(result=input_Mongdb)
# 定义一个数据库
MONGO_DB = 'video_URL'
# 指定集合名(表)
MONGO_COLLECTION = input_Nome
# 连接MongoDB数据库参数一为主机或IP,参数二为端口号默认27017 (连接数据库)
client = pymongo.MongoClient(host='localhost',port=27017)
# 指定数据库
db = client[MONGO_DB]
# ======================================================================================================================
def save_to_mongo(result):
"""
:param result:结果
:return: None
"""
try:
# 调用insert()方法将数据插入到Mongdb数据库中,result变量是get_products()方法中传来的product变量(包含着商品相关数据)
if db[MONGO_COLLECTION].insert(result):
# print("储存到MongDB成功!")
pass
except Exception:
print("储存到MongDB失败!")
if __name__=='__main__':
GET_urlnome()



