多线程下载视频、批量爬取数据

Download_video --功能说明

@功能1:检测能否连接到数据库、获取数据库集合中总共的数据(条)
@功能2:连接数据库查询数据
@功能3:批量下载视频、异步非阻塞多线程
@功能4:工作进度保存机制

  • 实际效果


    开启 “从上一次下载进度的位置启动.png

    关闭 “从上一次下载进度的位置启动.png

    运行(多线程下载4线程).png

Requests_video --功能说明

@功能1:批量爬取数据(发送请求、获取标题、获取高清链接、获取标清链接、)
@功能2:可视化进度条
@功能3:连接数据库存储数据

  • 实际效果


    运行状态.png
声明:该文章所有代码仅供学习参考使用,若使用不法途径本人一概不负。
# -*- encoding: utf-8 -*-
"""
@File    :   Download_video.py
@Contact :   t.ianxi@foxmail.com
@License :   // Copyright (C) 2018 Milo Yip<dell>

@Modify Time      @Author    @Version    @Desciption
------------      -------    --------    -----------
2022/8/30 20:55     MuKe~       1.0          None
"""
import sys
import time
import pymongo
import os
import threading




# ================================================= 检测网络\统计 ========================================================
def Mongodb_find_net_num(Host,Post,input_Nome):
    """
    检测能否连接到数据库、获取数据库集合中总共的数据(条)
    :param Host: ip地址
    :param Post: 端口号
    :param input_Nome: 数据库集合
    :return: 返回当前数据库集合中总共的数据(条)
    """
    try:                                                                                            # 捕获连接数据库时可能发生的异常
        Client = pymongo.MongoClient(host=Host, port=Post)                                          # 连接数据库
        db = Client[Mongodb_name]                                                                   # 数据库名称
        Data_db = db[input_Nome]                                                                    # 集合名称
        Mongodb_num = Data_db.find().count()                                                        # 查询数据库当前集合中数据的总共数量(条)
        return Mongodb_num                                                                          # 返回统计后的数量

    except:                                                                                         # 发生异常后,并返回False
        return False


# ==================================================== 查询数据库 ========================================================
def Mongodb_post(Host,Post,Mongodb_name,input_Nome,page_data):
    """
    连接数据库获取数据
    :param Host: ip地址
    :param Post: 端口号
    :param Mongodb_name:数据库名称
    :param input_Nome:数据库集合
    :param Yema: ‘当前页面’id
    :return: 字典、数据内容:(作品名称、上架时间、下载链接)
    """
    Client = pymongo.MongoClient(host=Host, port=Post)                                               # 连接数据库
    db = Client[Mongodb_name]                                                                        # 数据库名称
    print("[INFO] 当前数据库名称:" + input_Nome)                                                       # 输出集合名称
    Data_db = db[input_Nome]                                                                         # 输出集合名称
    results = Data_db.find({"写入序号":str(page_data)})                                                # 查询下数据库中的key值
    for key in results:                                                                              # 历遍获取到的字典数据
        # print("video_nome:", key['作品标题'] + ' ' + key['时间数据'], key['标清下载'])
        Mongodb_DataDB = {
            "Data_db_name":key['作品标题'],
            "Data_db_page":key['页面位置'],
            "Data_db_date":key['时间数据'],
            "Data_db_download":key['标清下载']
        }
        return Mongodb_DataDB                                                                        # 对字典中的数据重命名,并返回


# ====================================================   下载文件  =======================================================
def Download_URL_video(Mongodb_DataDB,video_path):
    """
    下载视频、异步非阻塞多进程
    Mongodb_DataDB = {
        :param video_name: 作品名称
        :param video_date: 上架时间
        :param video_downloadurl: 下载链接
        :param video_path: 文件存储路径

    }
    :return: Nome
    """
    video_name = Mongodb_DataDB['Data_db_name']                                                      # 获取字典中key值
    video_date = Mongodb_DataDB['Data_db_date']
    video_downloadurl = Mongodb_DataDB['Data_db_download']
    print('==============================================================================================================')
    print("[INFO] "+video_name)
    print("[INFO] "+video_date)
    print("[++++] "+video_path)
    print("[INFO] "+video_downloadurl)


    # you-get使用方法参考文章 https://github.com/soimort/you-get/blob/develop/README.md
    # you-get -o 路径 -O 文件名 链接
    # print(f'you-get -o {video_path} -O {video_name} {video_date} {video_downloadurl}')
    if not os.path.exists(video_path + '\\' + video_name + '.mp4'):                                  # 判断文件是否存在
        os.system(f'you-get -o {video_path} -O {video_name} {video_date} {video_downloadurl}')       # 通过传参调用’you-get‘下载文件
    else:
        print("[INFO] 已存在此作品",video_name + '.mp4')


# ===================================================  记录下载进度   ====================================================
def Detect_MP4(video_path,page_data):
    """
    写入下载进度数据(计数)
    :param video_path: 文件的存储路径
    :param page_data: 数据库“当前页面”数据
    :return: None
    """
    try:                                                                                             # 用于捕获IO偶尔会发生的意外(如:写入文件被占用)
        with open(video_path+'\\count.txt','w',encoding='utf-8') as count:
            count.write(str(page_data + 1))                                                          # 写入字符串 (计数)
    except OSError as Os:                                                                            # 捕获并输出异常
        print(f'[INFO] 无法写入下载进度,异常原因:{Os}')



# ====================================================   配置信息   ======================================================

Host = "10.52.xx.xx"                                                    # Mongodb数据库地址(ip)
Post = 27017                                                            # Mongodb数据库端口
Mongodb_name = "video_URL"                                              # Mongodb数据库名
                                                                        # (批量操作)Mongodb数据库集合名,以及起始位置
URL_class_dict = {
    "vlg_抖音":2,
    "国产网剧":2,
    "卡通动漫":2,
    "微视频":3
}
# input_Nome = "yazhouqingse"                                           # (单个操作)Mongodb数据库集合名

OFF_ON_page = True                                                      # 开启/关闭 “从上一次下载进度的位置启动”(若读取失败或者为0则默认为字典的默认初始化值)
video_path = r'D:\Pyfile\脚本\视频url收集'                                # 文件存储路径
thread_time = int(0.2)                                                  # 线程减速
thread_list = []                                                        # 存储线程
thread_num = int(os.cpu_count())                                        # 定义线程数量(检测cpu核数)

# ==================================================== 程序入口(main) ====================================================
if __name__=='__main__':                                                                            # 定义一个main
    # 该方法只适合(批量操作),数据库集合只有一个情况不需要一下三行代码
    for D in URL_class_dict:                                                                        # 历遍包含数据库集合名称字典及起始位置
        input_Nome = D                                                                              # 获取key(集合名称)
        page_data = URL_class_dict[D]                                                               # 获取起始位置

        Mongodb_num = Mongodb_find_net_num(Host, Post, input_Nome)                                  # 实例化“检测网络\统计”方法
        if Mongodb_num != False:                                                                    # 判断数据库连接是否异常
            print("[INFO] 数据库已连接")
            for i in range(1,Mongodb_num):                                                          # 历遍数据库当前集合中数据的总数(条)
                if OFF_ON_page == True:                                                             # 判断“配置信息”如果用户开启则读取上一次的下载进度
                    try:                                                                            # 捕获IO在读写时偶尔会发生的意外
                        with open(video_path+'\\count.txt','r',encoding='utf-8') as count0:         # 读取“count.txt”配置文件(用于存储下载进度)
                            page_data = int(count0.read())                                          # 读取数值
                            print(f'[++++] 上一次进度停留在:{page_data}')
                    except:                                                                         # 捕获异常并赋值为字典的默认初始化值作为数据库查询条件
                        print("[INFO] 存储进度文件“count.txt”已丢失!")
                        with open(video_path + '\\count.txt', 'w+', encoding='utf-8'):
                            pass
                        page_data = URL_class_dict[D]
                        print(f"[INFO] 已改为字典的默初始化值{page_data}")

                Mongodb_DataDB = Mongodb_post(Host, Post,Mongodb_name,input_Nome,page_data)        # 实例化“查询数据库”方法
                t = threading.Thread(target=Download_URL_video, args=(Mongodb_DataDB,video_path))   # 创建多线程、调用“下载文件”方法

                thread_list.append(t)                                                               # 向列表中加入线程池
                t.start()                                                                           # 启动线程
                print('[INFO] 线程池',thread_list[0])                                                # 输出线程池信息

                while len(thread_list) > thread_num:                                                # 统计线程池总数,若大于获取到cpu核数(如:4核)
                    thread_list = [x for x in thread_list if x.is_alive()]                          # 历遍线程池中的线程,判断线程是否在运行
                    time.sleep(thread_time)                                                         # 设备性能较差防止循环过快
                    # print('剩余线程池________' + str(thread_list))

                if page_data <= Mongodb_num:                                                        # 判断如果数据库中总数小于当前下载进度 则继续写入下载进度(用于切换数据库集合时防止下载进度累计增加)
                    Detect_MP4(video_path,page_data)                                                # 实例化“记录下载进度”方法,写入下载进度一边下次启动使用
                elif page_data >= Mongodb_num:
                    print(f"[INFO] 已达到数据库阈值!{input_Nome}集合总数:{Mongodb_num}")
                    time.sleep(10)
                    Detect_MP4(video_path,0)                                                        # 判断如果数据库中总数大于当前下载进度 则重置写入下载进度(用于切换数据库集合时重置下载进度)
                    break                                                                           # 跳出循环切换数据库集合

        elif Mongodb_num == False:                                                                  # 判断“检测网络\统计”是否异常,并输出信息
            print("[INFO] 数据库连接失败!")



















# -*- encoding: utf-8 -*-
"""
@File    :   Requests_video.py
@Contact :   t.ianxi@foxmail.com
@License :   // Copyright (C) 2018 Milo Yip<dell>

@Modify Time      @Author    @Version    @Desciption
------------      -------    --------    -----------
2022/7/4 22:23     MuKe~       1.0          None
"""
"""
@File    :   Requests_video.py
@Contact :   t.ianxi@foxmail.com
@License :   // Copyright (C) 2018 Milo Yip<dell>

@Modify Time      @Author    @Version    @Desciption
------------      -------    --------    -----------
2022/7/4 22:23     MuKe~       1.0          None
"""
import time
import requests
import csv
import os
import datetime
import pymongo
from lxml import etree
from tqdm import tqdm

# 网页类目,字典:卡通动漫、国产网剧、vlg_抖音、微视频,列表:开始位置,结束位置
URL_class_dict = {
    "卡通动漫":[2,58],
    "国产网剧":[2,123],
    "vlg_抖音:[2,135],
    "微视频":[3,78]
}

for i in URL_class_dict.items():
    il = list(i)

    input_Start = il[1][0]              # 起始页
    input_Stop = il[1][1]               # 终止页
    input_Nome = il[0]                  # 数据库集合名(表)、网页视频类目参数


# ======================================================================================================================
    def GET_urlnome():
        """
        发送请求、获取标题、获取高清链接、获取标清链接、
        :return:"页面位置":i+1,
                "时间数据":time_data,
                "作品标题":nome_video,
                "高清下载":HD_video,
        """
        for i in range(input_Start,input_Stop):                                     # 实现翻页功能
            URL = "https://taxm.com/{Class_url}/index_{i}.html".format(i=i,Class_url=input_Nome)
            headers = {

                "referer": "https://taxm.com/",
                "User-Agent": "Mozilla/4.0 (Windows NT 11.0; Win64; x64) "
                              "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4430.85 Safari/537.16"
            }
            # 参考文章https://www.cnblogs.com/Renyi-Fan/p/13266518.html#:~:text=requests.Session,%28%29%EF%BC%9A%E7%BB%B4%E6%8C%81%E4%BC%9A%E8%AF%9D%EF%BC%8C%E5%8F%AF%E4%BB%A5%E8%AE%A9%E6%88%91%E4%BB%AC%E5%9C%A8%E8%B7%A8%E8%AF%B7%E6%B1%82%E6%97%B6%E4%BF%9D%E5%AD%98%E6%9F%90%E4%BA%9B%E5%8F%82%E6%95%B0%EF%BC%8C%E8%BF%99%E6%A0%B7%E5%8F%AF%E4%BB%A5%E5%BE%97%E5%88%B0%E4%B8%80%E4%B8%AAsession%E5%AF%B9%E8%B1%A1%EF%BC%8C%E8%80%8C%E8%BF%99%E4%B8%AAsession%E5%AF%B9%E8%B1%A1%E6%98%AF%E5%8F%AF%E4%BB%A5%E5%8F%91%E9%80%81%E5%90%84%E7%A7%8D%E8%AF%B7%E6%B1%82%E7%9A%84
            Session = requests.Session()                                            # Session()  持会话,可以让我们在跨请求时保存某些参数,这样可以得到一个session对象,而这个session对象是可以发送各种请求的
            GET_url = Session.get(URL,headers=headers).text                         # 发送get请求,并调用text函数将数据类型转换为文本信息
            GET_html = etree.HTML(GET_url)                                          # 使用lxml库的etree下的HTML()方法解析text文本为html
            bue = tqdm(range(1,36))                                                 # 实例tqdm对象 历遍url,从1开始36结束
            for i_s in bue:
                Ye_url = GET_html.xpath('/html/body/div[11]/div[5]/ul/li[{i}]/a/@href'.format(i=int(i_s)))  # 获取作品链接 href属于属性
                video_url = "https://taxm.com"+Ye_url[0]                         # 拼接作品url(每页共有36个作品)
                response = Session.get(video_url, headers=headers)                  # 调用Session的get()方法发送请求,参数(url,请求头)
                response.encoding = response.apparent_encoding                      # html文本为外语,转换编码 注意:没有转换会出现乱码
                video_html = etree.HTML(response.text)# 发送get请求,并调用text函数将数据类型转换为文本信息,并使用lxml库的etree下的HTML()方法解析text文本为html

                HD_video = video_html.xpath('/html/body/div[11]/div[5]/div[7]/ul[2]/li[1]/a/@href')   # 获取标清链接
                nome_video = video_html.xpath('/html/body/div[11]/div[3]/span[4]/text()')             # 获取作品标题
                # print("标清下载:",HD_video)
                # print("作品标题:",nome_video)                                      # 进度条标题,格式化字符串参数(页码,标题)
                bue.set_description('第{State}页:{print_nome}'.format(State=i+1,print_nome=nome_video))

                now = datetime.datetime.now()                                       # 实例化datetime库的now()方法
                time_data = now.strftime('%Y-%m-%d %H:%M:%S')                       # 获取当前时间
                input_Mongdb = {
                    "页面位置":i+1,
                    "时间数据":time_data,
                    "作品标题":nome_video[0],
                    "高清下载":HD_video[0],
                }

                # print(input_Mongdb)
                # 实例化save_to_mongo()(自定义的方法,连接Mongdb数据库、存储数据)
                save_to_mongo(result=input_Mongdb)

    # 定义一个数据库
    MONGO_DB = 'video_URL'
    # 指定集合名(表)
    MONGO_COLLECTION = input_Nome
    # 连接MongoDB数据库参数一为主机或IP,参数二为端口号默认27017 (连接数据库)
    client = pymongo.MongoClient(host='localhost',port=27017)
    # 指定数据库
    db = client[MONGO_DB]

# ======================================================================================================================
    def save_to_mongo(result):
        """
        :param result:结果
        :return: None
        """
        try:
            # 调用insert()方法将数据插入到Mongdb数据库中,result变量是get_products()方法中传来的product变量(包含着商品相关数据)
            if db[MONGO_COLLECTION].insert(result):
                # print("储存到MongDB成功!")
                pass
        except Exception:
            print("储存到MongDB失败!")

    if __name__=='__main__':
        GET_urlnome()






最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容