工作需要用到我国环境监测站点的污染物浓度数据(感谢大佬的分享), 数据很全, 不过csv格式在分析的时候尤其大量数据分析的时候并不友好, 所以一般要二次处理一下
上学的时候搞过一次, 当时为了查询数据方便, 塞到了sqlite3的单文件数据库里, 不过制作起来很慢(可能是我没用并行支持好的数据库), 近期更新了一下数据且又重新调整了一下数据处理逻辑, 这里记录一下
主要的思路是以netcdf保存(此前做出来db之后还要再转nc, 麻烦的很), 然后数据规模的话, 物种一共15个, 时间是2014年到现在逐小时, 站点大概在1000多
采用每月一个nc文件存储, 每个物种单独变量, 每个变量的维度是(hour, sites)或者是(hour, cities), 因为数据除了站点的还有城市的值
然后希望处理流程尽量通用一些, 以后一些其他的观测数据也基于类似的逻辑和形式去做
那么处理流程大概是三步走
1. 制作站点列表
现有的站点信息文件有点乱, 需要制作一份完整的站点列表信息, 并且最好涵盖地理位置信息和行政区信息, 以便处理的时候筛选用, 如下 (risk后续讨论):
code,name,city,prov,nation,lat,lon,risk
1013A,市监测中心(停运150702),天津市,天津市,中国,39.097,117.151,1
1014A,南口路(停运161122),天津市,天津市,中国,39.173,117.193,0
在制作过程中大概会遇到一些问题:
- 数据中的站点编号对应的名字会变, 这里以新名字为准
- 数据中的站点编号对应的经纬度会变, 这里以新的经纬为准, 但是不知道是什么原因, 是站点搬迁了还是老的定位不准咋的, 所以基于经纬度变化幅度(0~0.1度), 给risk(风险等级)设定2-11
- 数据中站点的经纬度信息缺失, 根据名称自己在经纬度地图上找的点, risk等级设定12
- 数据中站点编号对应的我国行政区信息不统一, 这个比较麻烦, 有时候是叫法问题, 有时候是到不同级别的问题, 有时候甚至是行政区变迁的问题, 为了保持统一, 我直接根据最新的shapefile地图配合经纬度信息标注站点的行政区信息, 具体到地级市/自治州/直辖市
对于risk问题, 稳定的站点是0, 仅改名字也是0, 经纬度信息有/无变化的设为1, 2-12如上, 目的主要是在处理的时候可以做个筛选, risk越高, 观测结果和地理信息对应偏差的可能性越高, 可以考虑在对比模式验证的时候舍去试试
2. 制作nc数据文件
由于要逐月去生成, 且为了同步初始化数据和后续更新数据的逻辑, 采用的方式是先init好这个月的文件, 然后就都是根据csv文件去update了
注意的点要是:
- 每个月的文件要以这个月的sites为准, 不要用上一步制作的全部sites, 不过站点信息可以从上一步的结果里获取, 这一步用wsl跑个bash脚本来实现
- sites和cities的结果放一个文件里
- 数据较多, 所以基于MS-MPI和mpi4py做了个并行支持, 单机4核一起跑也能快很多! 不用py自带的多进程是因为mpi更熟悉...而且显然更好使
最终得到的结果如下, 包含了站点信息和观测数值
netcdf CNMEE.aqo.site-city.201808 {
dimensions:
site = 1605 ;
city = 367 ;
ymdh = 744 ;
variables:
string site(site) ;
string city(city) ;
string ymdh(ymdh) ;
string site_name(site) ;
string site_city(site) ;
string site_prov(site) ;
string site_nation(site) ;
double site_lat(site) ;
site_lat:_FillValue = NaN ;
double site_lon(site) ;
site_lon:_FillValue = NaN ;
int64 site_risk(site) ;
float AQI(ymdh, site) ;
float PM2.5(ymdh, site) ;
PM2.5:units = "ug/m3" ;
float PM2.5_24h(ymdh, site) ;
PM2.5_24h:units = "ug/m3" ;
float PM10(ymdh, site) ;
PM10:units = "ug/m3" ;
... ...
float CO(ymdh, site) ;
CO:units = "mg/m3" ;
float CO_24h(ymdh, site) ;
CO_24h:units = "mg/m3" ;
float AQI_city(ymdh, city) ;
float PM2.5_city(ymdh, city) ;
PM2.5_city:units = "ug/m3" ;
float PM2.5_24h_city(ymdh, city) ;
PM2.5_24h_city:units = "ug/m3" ;
... ...
// global attributes:
:source = "https://quotsoft.net/air/" ;
}
3. 后处理查询脚本支持
本质上数据整理的目的还是为了用, 查询或者后处理等等, 目前先搞了个简单的查询并且屏幕打印的功能
通过config.py来控制参数, 如下
# coding=utf-8
name = 'test'
mode = 'slice'
sites = ['1223A', '1224A']
datehours = ['2022070100', '2022070101', '2021080100']
variables = ['PM2.5', 'O3']
后续还得持续优化, 争取把什么数据提取到nc/db/csv, 然后对应wrf-cmaq网格化, 计算MDA8等等常规处理流程都搞起来
代码放在了这里, 仅供参考