1、线上安装环境
- 路径:amundsen: /opt/amundsen ,kibana: /opt/kibana-7.13.3-linux-x86_64
- amundsen web页面: http://localhost:5000/
- kibana 访问页面: http://localhost:5601/app/dev_tools#/console (连接amundsen的es,只用来查询,请勿put数据)
3、安装docker
安装docker-ce
-
安装/升级Docker客户端,安装必要的一些系统工具。
yum update -y
yum install -y yum-utils device-mapper-persistent-data lvm2
-
添加软件源信息
yum-config-manager --add-repo [http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo](http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo)
-
更新并安装
yum makecache fast
yum -y install docker-ce
-
开启Docker服务
service docker start
配置镜像加速器,为docker容器设置默认网段
- 添加docker 配置文件,若已存在,则修改配置文件
tee /etc/docker/daemon.json <<-'EOF'
{
"debug" : true,
"registry-mirrors": ["https://dpayzz9i.mirror.aliyuncs.com"],
"default-address-pools" : [
{
"base" : "192.168.0.0/16",
"size" : 24
}
]
}
EOF
-
重启,并设置开机自启
systemctl daemon-reload
systemctl restart docker
systemctl enable docker
docker-compose安装
-
下载
curl -L "https://get.daocloud.io/docker/compose/releases/download/1.27.3/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
-
加上可执行权限:
chmod +x /usr/local/bin/docker-compose
修改docker数据路径,修改到数据盘
默认是在根目录:/var/lib/docker ,修改到数据盘: /data/docker ,使用软链接方式。
service docker stop
mv /var/lib/docker /data/
ln -sf /data/docker /var/lib/docker
service docker start
4、安装python3.7
5、安装Amundsen
确保您至少有 3GB 可用空间供 docker 使用。
通过git克隆Amundsen:
git clone --recursive [https://github.com/amundsen-io/amundsen.git](https://github.com/amundsen-io/amundsen.git)
进入克隆的目录并在下面启动docker:
# For Neo4j Backend
docker-compose -f docker-amundsen.yml up
如有报错,解决方法可参考 FAQ
后台启动命令:
docker-compose -f docker-amundsen.yml up -d
- 启动成功后,打开web页面: http://localhost:5000/
修改es容器,添加ik中文分词器
下载与es版本匹配的ik分词器 (7.13.3) : https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2
-
进入es容器,创建/usr/share/elasticsearch/plugins/ik/ 文件夹
docker exec -it 3d701cddd320 /bin/bash
mkdir /usr/share/elasticsearch/plugins/ik/
-
将压缩包复制到容器内
docker cp ./elasticsearch-analysis-ik-7.13.3.zip 3d701cddd320:/usr/share/elasticsearch/plugins/ik/
-
进入容器,解压压缩包,并删除压缩包
docker exec -it 3d701cddd320 /bin/bash
cd /usr/share/elasticsearch/plugins/ik/
unzip elasticsearch-analysis-ik-7.13.3.zip
rm -rf elasticsearch-analysis-ik-7.13.3.zip
exit
-
重启es容器
docker stop 3d701cddd320
docker-compose -f docker-es.yml up -d
修改导入元数据代码,修改mapping,添加ik分词器,详情查看svn代码。重新导入元数据。
把修改过的es容器保存为镜像
# 生成自己的镜像:
docker commit 82fb415dcf0c my/elasticsearch:7.13.3
#修改docker-amundsen.yml 文件,使用上面生成的镜像
elasticsearch:
image: my/elasticsearch:7.13.3
# 把镜像保存成文件
docker save -o my_es_docker_image.tar my/elasticsearch:7.13.3
添加prometheus+grafana监控
prometheus+grafana搭建及配置参考
使用cadvisor服务监控docker容器运行,它本身也是一个容器。
运行 cadvisor 容器
docker run \
--volume=/:/rootfs:ro \
--volume=/var/run:/var/run:ro \
--volume=/sys:/sys:ro \
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/dev/disk/:/dev/disk:ro \
--publish=8080:8080 \
--detach=true \
--name=cadvisor \
google/cadvisor:v0.24.1
添加prometheus配置
- targets: ['ip:8080']
labels:
instance: amundsen_docker
添加grafana监控,导入code码: 193
导入元数据
- 创建python3虚拟环境,安装依赖
cd /data/service/amundsen
yum install gcc
yum install python3-devel mysql-devel
python3 -m venv venv
source venv/bin/activate
/data/service/amundsen/venv/bin/pip3 install --upgrade pip
/data/service/amundsen/venv/bin/pip3 install -r databuilder/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/python3 databuilder/setup.py install
/data/service/amundsen/venv/bin/pip3 install Mysqlclient -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/pip3 install pyyaml -i https://mirrors.aliyun.com/pypi/simple/
- 脚本 部署路径
amundsen/databuilder/my_metadata
- 使用python3虚拟环境运行脚本,导入元数据
# 先导入 mysql
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_mysql_loader.py
# 再导入 hive
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_hive_loader.py
Kibana 简单语法
# 查询所有 mappinf
GET _mapping
#查看elasticsearch版本
GET /
#查看健康状况
GET /_cat/health?v
#查看节点
GET /_cat/nodes?v
#查看索引
GET /_cat/indices?v
#查看JVM内存
GET /_nodes/stats/jvm?pretty
#查看磁盘
GET /_cat/allocation?v
#查看安装插件
GET /_cat/plugins?v
#查询全部数据
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
"query" : {
"match_all" : {}
}
}
#DSL查询语法
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
"query" : {
"match" : { "cluster" : "ndz_fund"}
}
}
#统计index下的document数量 hive:837 ,mysql:14034
GET _cat/count/tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1?v
{
"query" : {
"match" : { "database": "mysql" }
}
}
#分词效果测试:
POST _analyze
{
"analyzer": "ik_max_word",
"text": "借方本年累计发生额"
}
#分词效果测试:
POST _analyze
{
"analyzer": "ik_smart",
"text": "借方本年累计发生额"
}
# 删除,标记为删除,不会立即删除
POST tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_delete_by_query
{
"query": {
"match": {
"database": "hive"
}
}
}
# 合并,删除被标记删除的数据。高资源消耗动作
POST /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_forcemerge
导入脚本代码
- my_util.py代码
import textwrap
import yaml
import sys
# 读取配置文件
def read_config(config_file):
with open(config_file, 'rb') as f:
config_data = list(yaml.safe_load_all(f))
if len(config_data) == 0:
print("------配置文件: {} 为空------".format(config_file))
sys.exit(1)
# print(config_data[0].get(key1))
return config_data[0]
# es mapping
YZF_TABLE_ES_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"table":{
"properties": {
"name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"display_name": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text",
"analyzer": "ik_max_word"
},
"column_names": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"column_descriptions": {
"type": "text",
"analyzer": "ik_max_word"
},
"tags": {
"type": "keyword"
},
"badges": {
"type": "keyword"
},
"cluster": {
"type": "text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"database": {
"type": "text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"key": {
"type": "keyword"
},
"total_usage":{
"type": "long"
},
"unique_usage": {
"type": "long"
},
"programmatic_descriptions": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
}
"""
)
- my_mysql_loader.py 代码
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""
import logging
import sys
import textwrap
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
from my_util import read_config
import yaml
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host or 'localhost'},
])
DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
# 连接串后缀 &useSSL=false
mysql_conn_ = '?charset=utf8'
LOGGER = logging.getLogger(__name__)
def run_mysql_job(conn_str, connect_name):
#where_clause_suffix = textwrap.dedent("""
# where c.table_schema = 'yzf_biz'
#""")
where_clause_suffix = textwrap.dedent("""
where 1 = 1
""")
connect = conn_str + mysql_conn_
logging.info("Begin load mysql conn: {}".format(connect))
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.mysql_metadata.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.mysql_metadata.{MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': False,
f'extractor.mysql_metadata.{MysqlMetadataExtractor.CLUSTER_KEY}': connect_name,
f'extractor.mysql_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connect,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': connect_name, # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_doc_type_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
cypher_query)
if elasticsearch_mapping:
job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
logging.basicConfig(level=logging.INFO)
conf_data = read_config('/data/service/amundsen/databuilder/yzf_metadata/mysql_connect.yaml')
mysql_conf = conf_data.get('conn')
for conn_name, mysql_list in mysql_conf.items():
for mysql_conn in enumerate(mysql_list):
loading_job = run_mysql_job(mysql_conn[1], conn_name)
loading_job.launch()
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
- my_hive_loader.py 代码
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""
import logging
import sys
import textwrap
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host or 'localhost'},
])
DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
LOGGER = logging.getLogger(__name__)
# Todo: user provides a list of schema for indexing
SUPPORTED_HIVE_SCHEMAS = ['accounting_collect', 'accounting_company', 'accounting_report', 'ads', 'bi_dm_ods', \
'biz_dm', 'biz_dw', 'common', 'common_kudu', 'common_sim', 'companyinfo_ods', 'customer', \
'customer_kudu', 'datax', 'default', 'di', 'dingtalk', 'dingtalk_kudu', 'dwd', 'dwd_sim', \
'dws', 'fintax_account', 'fintax_application', 'fintax_asset', 'fintax_data_init',
'fintax_fund', 'fintax_invoice', \
'fintax_salary', 'fintax_statistics', 'fintax_stock', 'fintax_task', 'fintax_tax',
'fintax_user_point', 'flink_database', \
'invoice_lake', 'log_ods', 'monitor', 'octopus_ods', 'sale_ods', 'taxops_ods', 'ucenter',
'upm_paas', 'view', 'yzf_biz', \
'yzf_biz_init', 'yzf_common', 'yzf_config', 'yzf_report', 'yzf_report_init']
# Global used in all Hive metastore queries.
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_HIVE_SCHEMAS))
# todo: connection string needs to change
def connection_string():
user = 'root'
host = 'ip'
port = '3306'
db = 'hivemetastore'
pd = '123456'
return "mysql://%s:%s@%s:%s/%s?charset=utf8" % (user, pd, host, port, db)
def create_table_wm_job(templates_dict):
sql = textwrap.dedent("""
SELECT From_unixtime(A0.create_time) as create_time,
'hive' as `database`,
C0.NAME as `schema`,
B0.tbl_name as table_name,
{func}(A0.part_name) as part_name,
{watermark} as part_type
FROM PARTITIONS A0
LEFT OUTER JOIN TBLS B0
ON A0.tbl_id = B0.tbl_id
LEFT OUTER JOIN DBS C0
ON B0.db_id = C0.db_id
WHERE C0.NAME IN {schemas}
AND B0.tbl_type IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
AND A0.PART_NAME NOT LIKE '%%__HIVE_DEFAULT_PARTITION__%%'
GROUP BY C0.NAME, B0.tbl_name
ORDER by create_time desc
""").format(func=templates_dict.get('agg_func'),
watermark=templates_dict.get('watermark_type'),
schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
LOGGER.info('SQL query: %s', sql)
tmp_folder = '/var/tmp/amundsen/table_{hwm}'.format(hwm=templates_dict.get('watermark_type').strip("\""))
node_files_folder = f'{tmp_folder}/nodes'
relationship_files_folder = f'{tmp_folder}/relationships'
hwm_extractor = SQLAlchemyExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=hwm_extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
f'extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'extractor.sqlalchemy.{SQLAlchemyExtractor.EXTRACT_SQL}': sql,
'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag' # TO-DO unique tag must be added
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_hive_job():
"""
Launches data builder job that extracts table and column metadata from MySQL Hive metastore database,
and publishes to Neo4j.
@param kwargs:
@return:
"""
# Adding to where clause to scope schema, filter out temp tables which start with numbers and views
where_clause_suffix = textwrap.dedent("""
WHERE d.NAME IN {schemas}
AND t.TBL_NAME NOT REGEXP '^[0-9]+'
AND t.TBL_TYPE IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
""").format(schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
f'extractor.hive_table_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag' # TO-DO unique tag must be added
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=HiveTableMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
# 临时文件,可删除,对查询不影响
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_doc_type_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
cypher_query)
if elasticsearch_mapping:
job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
logging.basicConfig(level=logging.INFO)
loading_job = run_hive_job()
loading_job.launch()
templates_dict = {'agg_func': 'min',
'watermark_type': '"low_watermark"',
'part_regex': '{{ ds }}'}
templates_dict.get('agg_func')
create_table_wm_job(templates_dict)
templates_dict = {'agg_func': 'max',
'watermark_type': '"high_watermark"',
'part_regex': '{{ ds }}'}
create_table_wm_job(templates_dict)
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
- mysql_connect.yaml 配置文件
conn:
mysql1:
- mysql://root:123456@ip1:3306/<db>
mysql2:
- mysql://root:123456@ip2:3306/<db>
FAQ
1. docker-compose -f docker-amundsen.yml up启动容器报错:
[2021-12-01T08:02:56,599][INFO ][o.e.b.BootstrapChecks ] [PD4Rw8t] bound or publishing to a non-loopback address, enforcing bootstrap checks
es_amundsen_atlas | ERROR: [1] bootstrap checks failed
es_amundsen_atlas | [1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
增加堆内存,修改 vim /etc/sysctl.conf ,添加内容: vm.max_map_count = 262144
重加载修改配置: sysctl -p
重跑 docker-compose -f docker-amundsen.yml up
2.页面搜索元数据信息,中文注释显示乱码
- 修改连接mysql url,在连接串后添加utf-8设置: ?charset=utf8
3.导入Hive的mysql元数据时报错:
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1055, "Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'hive.A0.CREATE_TIME' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
[SQL:
SELECT From_unixtime(A0.create_time) as create_time,
....
-
参考https://blog.csdn.net/fansili/article/details/78664267,修改mysql配置:
mysql> set global sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
mysql> set session sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
4. 阿蒙森官方FAQ: https://www.amundsen.io/amundsen/installation/#troubleshooting
5.防火墙是关闭的,通过215.5还可以访问不通阿蒙森机器的5000端口
解决办法:先打开防火墙,打开5000端口限制,再关闭防火墙。然后搜索正常访问5000端口。
systemctl start firewalld
firewall-cmd --zone=public --add-port=5000/tcp --permanent
firewall-cmd --reload
systemctl stop firewalld
systemctl disable firewalld