Python json 模块完整使用指南
一、json 模块核心函数
- 基本函数对照表
函数 作用 输入 输出
json.dumps() Python对象 → JSON字符串 Python对象 JSON字符串
json.loads() JSON字符串 → Python对象 JSON字符串 Python对象
json.dump() Python对象 → JSON文件 Python对象 + 文件对象 无(写入文件)
json.load() JSON文件 → Python对象 文件对象 Python对象
二、数据类型映射
Python 到 JSON 转换
import json
# 类型映射对照表
data = {
"string": "Hello World", # → JSON string
"integer": 42, # → JSON number
"float": 3.14159, # → JSON number
"boolean_true": True, # → JSON true
"boolean_false": False, # → JSON false
"none": None, # → JSON null
"list": [1, 2, 3], # → JSON array
"tuple": (4, 5, 6), # → JSON array
"dict": {"key": "value"}, # → JSON object
"nested": {
"a": 1,
"b": [1, 2, 3],
"c": {"x": 1, "y": 2}
}
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
三、json.dumps() 详细用法
- 基础序列化
import json
# 基础用法
data = {"name": "张三", "age": 25, "city": "北京"}
json_str = json.dumps(data)
print(json_str) # {"name": "\u5f20\u4e09", "age": 25, "city": "\u5317\u4eac"}
# 处理中文(ensure_ascii=False)
json_str = json.dumps(data, ensure_ascii=False)
print(json_str) # {"name": "张三", "age": 25, "city": "北京"}
- 格式化输出
import json
data = {
"name": "李四",
"age": 30,
"hobbies": ["读书", "游泳", "编程"],
"address": {
"province": "广东",
"city": "深圳"
}
}
# indent: 缩进空格数
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
- 排序键
import json
data = {"name": "王五", "age": 28, "city": "上海", "email": "wang@example.com"}
# 按键排序
json_str = json.dumps(data, indent=2, sort_keys=True)
print(json_str)
- 分隔符定制
import json
data = {"name": "赵六", "age": 35, "hobbies": ["音乐", "电影"]}
# 默认分隔符: separators=(', ', ': ')
print(json.dumps(data))
# {"name": "\u8d75\u516d", "age": 35, "hobbies": ["\u97f3\u4e50", "\u7535\u5f71"]}
# 紧凑格式(移除空格)
print(json.dumps(data, separators=(',', ':')))
# {"name":"\u8d75\u516d","age":35,"hobbies":["\u97f3\u4e50","\u7535\u5f71"]}
四、json.loads() 详细用法
- 基础反序列化
import json
# 解析 JSON 字符串
json_str = '{"name": "张三", "age": 25, "city": "北京"}'
data = json.loads(json_str)
print(data) # {'name': '张三', 'age': 25, 'city': '北京'}
print(type(data)) # <class 'dict'>
print(data['name']) # 张三
- 解析不同格式
import json
# 解析 JSON 数组
json_array = '[1, 2, 3, "hello", true, false, null]'
data = json.loads(json_array)
print(data) # [1, 2, 3, 'hello', True, False, None]
print(type(data)) # <class 'list'>
# 解析嵌套 JSON
json_complex = '''
{
"users": [
{"name": "张三", "age": 25},
{"name": "李四", "age": 30}
],
"total": 2
}
'''
data = json.loads(json_complex)
print(data['users'][0]['name']) # 张三
五、json.dump() 和 json.load() 文件操作
- 写入 JSON 文件
import json
data = {
"name": "张三",
"age": 25,
"hobbies": ["编程", "阅读"],
"address": {
"city": "北京",
"district": "朝阳区"
}
}
# 写入文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("文件写入成功")
- 读取 JSON 文件
import json
# 读取文件
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
print(data)
print(data['name']) # 张三
六、实际案例
案例1:处理 journalctl JSON 日志
import json
from typing import List, Dict
def parse_journal_logs(filename: str) -> List[Dict]:
"""解析 journalctl 输出的 JSON Lines 格式"""
logs = []
with open(filename, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
log_entry = json.loads(line)
logs.append(log_entry)
except json.JSONDecodeError as e:
print(f"第 {line_num} 行解析失败: {e}")
continue
return logs
# 使用
logs = parse_journal_logs('/tmp/jour.json')
print(f"共解析 {len(logs)} 条日志")
# 提取错误日志
errors = [log for log in logs if 'error' in log.get('MESSAGE', '').lower()]
print(f"错误日志: {len(errors)} 条")
# 显示前3条
for log in logs[:3]:
print(json.dumps(log, ensure_ascii=False, indent=2)[:200])
案例2:配置文件管理
import json
import os
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_file='config.json'):
self.config_file = config_file
self.config = self.load()
def load(self):
"""加载配置文件"""
if not os.path.exists(self.config_file):
return {}
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError:
print("配置文件格式错误")
return {}
def save(self):
"""保存配置文件"""
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
def get(self, key, default=None):
"""获取配置项"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
if value is None:
return default
else:
return default
return value
def set(self, key, value):
"""设置配置项"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
self.save()
# 使用示例
config = ConfigManager('app_config.json')
# 设置配置
config.set('database.host', 'localhost')
config.set('database.port', 3306)
config.set('app.name', 'MyApp')
# 获取配置
db_host = config.get('database.host')
db_port = config.get('database.port', 3306)
app_name = config.get('app.name')
print(f"数据库地址: {db_host}:{db_port}")
print(f"应用名称: {app_name}")
案例3:处理复杂嵌套数据
import json
class DataProcessor:
"""数据处理类"""
def __init__(self, data=None):
self.data = data or []
def load_from_file(self, filename):
"""从文件加载数据"""
with open(filename, 'r', encoding='utf-8') as f:
self.data = json.load(f)
def save_to_file(self, filename):
"""保存数据到文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
def filter_by_keyword(self, keyword, field='MESSAGE'):
"""根据关键字过滤"""
results = []
for item in self.data:
if keyword.lower() in str(item.get(field, '')).lower():
results.append(item)
return results
def group_by_field(self, field):
"""按字段分组"""
groups = {}
for item in self.data:
key = item.get(field, 'unknown')
if key not in groups:
groups[key] = []
groups[key].append(item)
return groups
def statistics(self):
"""统计信息"""
if not self.data:
return {}
stats = {
'total': len(self.data),
'fields': set(),
'sample': self.data[0] if self.data else None
}
# 收集所有字段
for item in self.data:
stats['fields'].update(item.keys())
stats['fields'] = list(stats['fields'])
return stats
# 使用示例
processor = DataProcessor()
# 加载 JSON 数据
processor.load_from_file('/tmp/jour.json')
# 统计信息
stats = processor.statistics()
print(f"总条数: {stats['total']}")
print(f"字段列表: {stats['fields'][:10]}")
# 过滤错误日志
errors = processor.filter_by_keyword('error')
print(f"错误日志: {len(errors)} 条")
# 按优先级分组
groups = processor.group_by_field('PRIORITY')
for priority, items in groups.items():
print(f"优先级 {priority}: {len(items)} 条")
案例5:处理 JSON Lines 大文件(流式处理)
import json
from typing import Iterator, Dict
def stream_json_lines(filename: str) -> Iterator[Dict]:
"""流式读取 JSON Lines 文件,节省内存"""
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def process_large_file(filename: str, batch_size: int = 100):
"""分批处理大文件"""
batch = []
count = 0
for log in stream_json_lines(filename):
batch.append(log)
count += 1
# 每 batch_size 条处理一次
if len(batch) >= batch_size:
print(f"处理批次: {len(batch)} 条")
# 在这里处理 batch
batch = []
# 处理最后一批
if batch:
print(f"处理最后批次: {len(batch)} 条")
# 使用
process_large_file('/tmp/jour.json', batch_size=1000)
七、常见错误处理
import json
def safe_json_loads(json_str):
"""安全解析 JSON 字符串"""
try:
return json.loads(json_str), None
except json.JSONDecodeError as e:
return None, f"JSON 解析错误: {e}"
except TypeError as e:
return None, f"类型错误: {e}"
def safe_json_dumps(data):
"""安全序列化 JSON"""
try:
return json.dumps(data, ensure_ascii=False), None
except TypeError as e:
return None, f"序列化错误: {e}"
# 使用示例
json_str = '{"name": "张三", "age": 25'
data, error = safe_json_loads(json_str)
if error:
print(f"错误: {error}")
else:
print(data)
# 处理不可序列化对象
from datetime import datetime
data = {'time': datetime.now()}
json_str, error = safe_json_dumps(data)
if error:
print(f"需要自定义编码器: {error}")
九、实战:完整的日志分析工具
import json
import sys
from collections import Counter
from datetime import datetime
class LogAnalyzer:
"""日志分析工具"""
def __init__(self, filename):
self.filename = filename
self.logs = []
self.load()
def load(self):
"""加载日志"""
try:
with open(self.filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
self.logs.append(json.loads(line))
except:
continue
print(f"✅ 加载 {len(self.logs)} 条日志")
except FileNotFoundError:
print(f"❌ 文件不存在: {self.filename}")
def analyze(self):
"""分析日志"""
if not self.logs:
return
# 统计错误数量
errors = [l for l in self.logs if 'error' in l.get('MESSAGE', '').lower()]
# 统计优先级
priorities = Counter([l.get('PRIORITY', 'unknown') for l in self.logs])
# 统计进程
processes = Counter([l.get('_PID', 'unknown') for l in self.logs])
return {
'total': len(self.logs),
'errors': len(errors),
'priorities': dict(priorities.most_common()),
'top_processes': dict(processes.most_common(10))
}
def search(self, keyword, case_sensitive=False):
"""搜索日志"""
if not case_sensitive:
keyword = keyword.lower()
return [l for l in self.logs if keyword in l.get('MESSAGE', '').lower()]
return [l for l in self.logs if keyword in l.get('MESSAGE', '')]
def export(self, output_file, format='json'):
"""导出结果"""
if format == 'json':
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.logs, f, ensure_ascii=False, indent=2)
elif format == 'jsonl':
with open(output_file, 'w', encoding='utf-8') as f:
for log in self.logs:
f.write(json.dumps(log, ensure_ascii=False) + '\n')
print(f"✅ 导出到 {output_file}")
# 使用
if __name__ == "__main__":
analyzer = LogAnalyzer('/tmp/jour.json')
stats = analyzer.analyze()
if stats:
print(f"\n📊 统计信息:")
print(f" 总日志: {stats['total']}")
print(f" 错误日志: {stats['errors']}")
print(f" 优先级分布: {stats['priorities']}")
# 搜索错误
errors = analyzer.search('error')
print(f"\n🔍 找到 {len(errors)} 条包含 'error' 的日志")