Python 基于hdfs路径统计hive表存储信息

背景

你的Hive数据库是不是也悄悄“膨胀”了?

开发过程中产生的各种中间表,日积月累,占用了大量存储空间。手动清理时却犯了难:

  • 哪些表最占空间?

  • 哪些表早已过时?

  • 谁创建的?能不能删?

为解决这个问题,我写了一个Python脚本,能一键生成统计报表,清晰列出:

📊 表存储大小

📅 最后修改时间

👨💻 表使用人

让你对库表情况了如指掌,精准清理,彻底告别存储焦虑!

小编环境

import sys

print('python 版本:',sys.version.split('|')[0])
#python 版本: 3.11.11

统计结果示例

结果示例

完整代码

"""
Hive表存储信息统计脚本 - 基于HDFS路径检查
"""

import subprocess
import re
import datetime
import logging
import sys
import os
from typing import Dict, List, Tuple, Optional

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("hive_table_stats_hdfs.log", encoding='utf-8'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

class HiveTableStatsHDFS:
    def __init__(self, hdfs_cmd: str = "hadoop fs"):
        self.hdfs_cmd = hdfs_cmd
        # self.warehouse_base = "/user/hive/warehouse"
        
    def execute_hdfs_command(self, command: str) -> Tuple[bool, str]:
        """执行HDFS命令"""
        try:
            cmd = f"{self.hdfs_cmd} {command}"
            result = subprocess.run(
                cmd, 
                shell=True, 
                stdout=subprocess.PIPE, 
                stderr=subprocess.PIPE, 
                universal_newlines=True,
                timeout=60
            )
            
            if result.returncode == 0:
                return True, result.stdout.strip()
            else:
                logger.error(f"HDFS命令失败: {result.stderr}")
                return False, result.stderr
                
        except subprocess.TimeoutExpired:
            logger.error(f"HDFS命令超时: {command}")
            return False, "Command timeout"
        except Exception as e:
            logger.error(f"执行HDFS命令时发生异常: {e}")
            return False, str(e)
    
    def get_tables_from_database(self, db_path: str) -> List[str]:
        """获取指定数据库中的所有表"""
        logger.info(f"获取数据库 {db_path} 中的表列表...")
        success, output = self.execute_hdfs_command(f"-ls {db_path}")
        
        if not success:
            logger.warning(f"无法访问数据库路径: {db_path}")
            return []
        
        tables = []
        for line in output.split('\n'):
            if line.startswith('d'):
                parts = line.split()
                if len(parts) >= 8:
                    table_path = parts[-1]
                    table_name = table_path.split('/')[-1]
                    tables.append((table_name, table_path))
        
        logger.info(f"在 {db_path} 中找到 {len(tables)} 张表")
        return tables
    
    def get_table_size(self, table_path: str) -> Optional[str]:
        """获取表的存储大小"""
        success, output = self.execute_hdfs_command(f"-du -s -h {table_path}")
        
        if not success:
            return None
        
        # 解析输出,获取总大小
        if output:
            parts = output.split()
            if len(parts) == 5:
                return parts[0], parts[1]  # 返回人类可读的大小、字节类型
            
            if len(parts) == 3:
                return parts[0], 'B'  # 返回人类可读的大小、字节类型
        
        return None
    
    def get_table_details(self, table_path: str) -> Tuple[Optional[str], Optional[str], bool]:
        """获取表的详细信息:最后修改时间、修改人员、是否分区表"""
        success, output = self.execute_hdfs_command(f"-ls -t {table_path} | head -n 5")
        
        if not success:
            return None, None, False
        
        lines = output.strip().split('\n')
        if not lines:
            return None, None, False
        
        # 获取第一条记录(最新的)
        if 'Found' in lines[0]:
            first_line = lines[1]
        else:
            first_line = lines[0]
        
        parts = first_line.split()
        
        if len(parts) < 8:
            return None, None, False
        
        # 解析修改时间和修改人员
        mod_time = f"{parts[5]} {parts[6]}"
        modifier = parts[2]  # 文件所有者
        
        # 检查是否为分区表:查看表目录下是否有子目录
        is_partitioned = False
        if parts[0].startswith('d'):
            # 如果有子目录,则认为是分区表
            is_partitioned = True
        
        return modifier, mod_time, is_partitioned
    
    def get_all_table_stats(self, specific_database: str = None) -> List[Dict]:
        """获取所有表的统计信息"""
        results = []
        
        # 数据库名字
        db_name = specific_database.split('/')[-1].replace('.db','')
        
        # 获取数据库路径
        tables = self.get_tables_from_database(specific_database)
        total_tables = len(tables)
        
        logger.info(f"开始处理数据库 {db_name} 中的 {len(tables)} 张表...")
            
        for i, (table_name, table_path) in enumerate(tables, 1):
            logger.info(f"处理表 [{i}/{len(tables)}]: {db_name}.{table_name}")
            
            try:
                # 获取表大小
                size_1, size_2 = self.get_table_size(table_path)
                
                # 获取表详细信息
                modifier, mod_time, is_partitioned = self.get_table_details(table_path)
                
                table_info = {
                    'table_name': f'{db_name}.{table_name}',
                    'storage_size_1': float(size_1),
                    'storage_size_2': size_2,
                    'storage_location': table_path,
                    'last_modification_date': mod_time or "未知",
                    'last_modifier': modifier or "未知",
                    'is_partitioned': '是' if is_partitioned else '否'
                }
                
                results.append(table_info)
                
            except Exception as e:
                logger.error(f"处理表 {db_name}.{table_name} 时发生错误: {e}")
                error_info = {
                    'database': db_name,
                    'table_name': table_name,
                    'storage_size': '错误',
                    'storage_location': table_path,
                    'last_modification_date': '错误',
                    'last_modifier': '错误',
                    'is_partitioned': '错误'
                }
                results.append(error_info)
        
        logger.info(f"总共处理了 {len(results)} 张表")
        return results
    
    def export_to_excel(self, data: List[Dict], filename: str = None):
        """导出结果到Excel文件"""        
        try:
            import pandas as pd
            
            pd_data = pd.DataFrame(data)
            
            # 按大小进行排序
            key_type={'T':1,'G':2,'M':3,'K':4}
            # 创建临时列用于排序
            pd_data['temp_storage_size_2'] = pd_data['storage_size_2'].map(key_type)
            pd_data_sorted = pd_data.sort_values(
                by=['temp_storage_size_2','storage_size_1'],
                ascending=[True,False]
            )
            # 删除临时列
            pd_data_sorted = pd_data_sorted.drop('temp_storage_size_2', axis=1)
            
            # 重命名字段
            dic_columns={
                "table_name":"表名",
                "storage_size_1":"大小-1",
                "storage_size_2":"大小-2",
                "storage_location":"存储位置",
                "last_modification_date":"最后一次修改时间",
                "last_modifier":"人员",
                "is_partitioned":"是否分区"
            }
            pd_data_sorted_rename = pd_data_sorted.rename(columns=dic_columns)
            pd_data_sorted_rename.to_excel(filename, index=False, )
            
            logger.info(f"结果已导出到: {filename}")
            return True
        except ModuleNotFoundError as e:
            logger.error(f"没有安装 pandas : {e}")
            return False
        except Exception as e:
            logger.error(f"导出失败: {e}")
            return False

def main():
    """主函数"""  
    # 手动指定数据库目录
    specific_db = '/user/hive/warehouse/warehouse.db'
    
    # 创建统计对象
    stats = HiveTableStatsHDFS()
    
    try:
        # 获取表统计信息
        print(f"\n开始统计表信息...")
        results = stats.get_all_table_stats(specific_db)        
        
        # 导出到Excel
        stats.export_to_excel(results, 'hdfs_hive_table_statistics.xlsx')
        
        print("\n统计完成! 请查看日志文件 'hive_table_stats_hdfs.log' 获取详细信息")
        
    except KeyboardInterrupt:
        print("\n用户中断操作")
    except Exception as e:
        print(f"\n执行过程中发生错误: {e}")
        logger.exception("主程序异常")


if __name__ == "__main__":
    main()

历史相关文章


以上是自己实践中遇到的一些问题,分享出来供大家参考学习,欢迎关注微信公众号:DataShare ,不定期分享干货

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容