元数据管理系统 Amundsen 安装及使用

1、线上安装环境

3、安装docker

安装docker-ce

  • 安装/升级Docker客户端,安装必要的一些系统工具。

    yum update -y

    yum install -y yum-utils device-mapper-persistent-data lvm2

  • 添加软件源信息

    yum-config-manager --add-repo [http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo](http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo)

  • 更新并安装

    yum makecache fast
    yum -y install docker-ce

  • 开启Docker服务

    service docker start

配置镜像加速器,为docker容器设置默认网段

  • 添加docker 配置文件,若已存在,则修改配置文件

tee /etc/docker/daemon.json <<-'EOF'
{
  "debug" : true,
  "registry-mirrors": ["https://dpayzz9i.mirror.aliyuncs.com"],
  "default-address-pools" : [
    {
      "base" : "192.168.0.0/16",
      "size" : 24
    }
  ]
}
EOF
  • 重启,并设置开机自启

    systemctl daemon-reload
    systemctl restart docker
    systemctl enable docker

docker-compose安装

  • 下载

    curl -L "https://get.daocloud.io/docker/compose/releases/download/1.27.3/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

  • 加上可执行权限:

    chmod +x /usr/local/bin/docker-compose

修改docker数据路径,修改到数据盘

默认是在根目录:/var/lib/docker ,修改到数据盘: /data/docker ,使用软链接方式。

service docker stop

mv /var/lib/docker /data/
ln -sf /data/docker /var/lib/docker
service docker start

4、安装python3.7

5、安装Amundsen

确保您至少有 3GB 可用空间供 docker 使用。

通过git克隆Amundsen:

git clone --recursive [https://github.com/amundsen-io/amundsen.git](https://github.com/amundsen-io/amundsen.git)

进入克隆的目录并在下面启动docker:

# For Neo4j Backend
docker-compose -f docker-amundsen.yml up

如有报错,解决方法可参考 FAQ

后台启动命令:

docker-compose -f docker-amundsen.yml up -d

修改es容器,添加ik中文分词器

  • 参考: https://zhuanlan.zhihu.com/p/377433737

  • 下载与es版本匹配的ik分词器 (7.13.3) : https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2

  • 进入es容器,创建/usr/share/elasticsearch/plugins/ik/ 文件夹

    docker exec -it 3d701cddd320 /bin/bash
    mkdir /usr/share/elasticsearch/plugins/ik/

  • 将压缩包复制到容器内

    docker cp ./elasticsearch-analysis-ik-7.13.3.zip 3d701cddd320:/usr/share/elasticsearch/plugins/ik/

  • 进入容器,解压压缩包,并删除压缩包

    docker exec -it 3d701cddd320 /bin/bash
    cd /usr/share/elasticsearch/plugins/ik/
    unzip elasticsearch-analysis-ik-7.13.3.zip
    rm -rf elasticsearch-analysis-ik-7.13.3.zip

    exit

  • 重启es容器

    docker stop 3d701cddd320
    docker-compose -f docker-es.yml up -d

  • 修改导入元数据代码,修改mapping,添加ik分词器,详情查看svn代码。重新导入元数据。

  • 把修改过的es容器保存为镜像

# 生成自己的镜像:
docker commit 82fb415dcf0c  my/elasticsearch:7.13.3

#修改docker-amundsen.yml 文件,使用上面生成的镜像
elasticsearch:
      image: my/elasticsearch:7.13.3

# 把镜像保存成文件
docker save -o my_es_docker_image.tar my/elasticsearch:7.13.3

添加prometheus+grafana监控

prometheus+grafana搭建及配置参考
使用cadvisor服务监控docker容器运行,它本身也是一个容器。

运行 cadvisor 容器

docker run \
--volume=/:/rootfs:ro \
--volume=/var/run:/var/run:ro \
--volume=/sys:/sys:ro \
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/dev/disk/:/dev/disk:ro \
--publish=8080:8080 \
--detach=true \
--name=cadvisor \
google/cadvisor:v0.24.1

添加prometheus配置

- targets: ['ip:8080']
  labels:
    instance: amundsen_docker

添加grafana监控,导入code码: 193

导入元数据

  • 创建python3虚拟环境,安装依赖
cd /data/service/amundsen
yum install gcc
yum install python3-devel mysql-devel

python3 -m venv venv
source venv/bin/activate
/data/service/amundsen/venv/bin/pip3 install --upgrade pip

/data/service/amundsen/venv/bin/pip3 install -r databuilder/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/python3 databuilder/setup.py install

/data/service/amundsen/venv/bin/pip3 install Mysqlclient -i https://mirrors.aliyun.com/pypi/simple/

/data/service/amundsen/venv/bin/pip3 install pyyaml  -i https://mirrors.aliyun.com/pypi/simple/

  • 脚本 部署路径
amundsen/databuilder/my_metadata
  • 使用python3虚拟环境运行脚本,导入元数据
# 先导入 mysql
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_mysql_loader.py

# 再导入 hive
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_hive_loader.py

Kibana 简单语法

# 查询所有 mappinf
GET _mapping


#查看elasticsearch版本
GET /

#查看健康状况
GET /_cat/health?v

#查看节点
GET /_cat/nodes?v

#查看索引
GET /_cat/indices?v

#查看JVM内存
GET /_nodes/stats/jvm?pretty

#查看磁盘
GET /_cat/allocation?v

#查看安装插件
GET /_cat/plugins?v

#查询全部数据
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
    "query" : {
        "match_all" : {}
    }
}

#DSL查询语法
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
    "query" : {
        "match" : { "cluster" : "ndz_fund"}
    }
}



#统计index下的document数量 hive:837 ,mysql:14034
GET _cat/count/tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1?v
{
    "query" : {
        "match" : { "database": "mysql" }
    }
}

#分词效果测试:
POST _analyze
{
  "analyzer": "ik_max_word",
  "text":     "借方本年累计发生额"
}

#分词效果测试:
POST _analyze
{
  "analyzer": "ik_smart",
  "text":     "借方本年累计发生额"
}

# 删除,标记为删除,不会立即删除
POST tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_delete_by_query
{
  "query": { 
    "match": { 
       "database": "hive"
    }
  }
}

# 合并,删除被标记删除的数据。高资源消耗动作
POST /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_forcemerge


导入脚本代码

  • my_util.py代码
import textwrap
import yaml
import sys


# 读取配置文件
def read_config(config_file):
    with open(config_file, 'rb') as f:
        config_data = list(yaml.safe_load_all(f))
        if len(config_data) == 0:
            print("------配置文件: {} 为空------".format(config_file))
            sys.exit(1)
        # print(config_data[0].get(key1))
        return config_data[0]


# es mapping
YZF_TABLE_ES_INDEX_MAPPING = textwrap.dedent(
    """
    {
    "mappings":{
        "table":{
          "properties": {
            "name": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "schema": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "display_name": {
              "type": "keyword"
            },
            "last_updated_timestamp": {
              "type": "date",
              "format": "epoch_second"
            },
            "description": {
              "type": "text",
              "analyzer": "ik_max_word"
            },
            "column_names": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "column_descriptions": {
              "type": "text",
              "analyzer": "ik_max_word"
            },
            "tags": {
              "type": "keyword"
            },
            "badges": {
              "type": "keyword"
            },
            "cluster": {
              "type": "text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "database": {
              "type": "text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "key": {
              "type": "keyword"
            },
            "total_usage":{
              "type": "long"
            },
            "unique_usage": {
              "type": "long"
            },
            "programmatic_descriptions": {
              "type": "text",
              "analyzer": "ik_max_word"
            }
          }
        }
      }
    }
    """
)
  • my_mysql_loader.py 代码
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.

"""

import logging
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
from my_util import read_config

import yaml

es_host = None
neo_host = None
if len(sys.argv) > 1:
    es_host = sys.argv[1]
if len(sys.argv) > 2:
    neo_host = sys.argv[2]

es = Elasticsearch([
    {'host': es_host or 'localhost'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

# 连接串后缀  &useSSL=false
mysql_conn_ = '?charset=utf8'

LOGGER = logging.getLogger(__name__)


def run_mysql_job(conn_str, connect_name):
    #where_clause_suffix = textwrap.dedent("""
    #    where c.table_schema = 'yzf_biz'
    #""")
    where_clause_suffix = textwrap.dedent("""
        where 1 = 1
    """)
    connect = conn_str + mysql_conn_
    logging.info("Begin load mysql conn: {}".format(connect))

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': False,
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.CLUSTER_KEY}': connect_name,
        f'extractor.mysql_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connect,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': connect_name,  # should use unique tag here like {ds}
    })
    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                     publisher=Neo4jCsvPublisher())
    return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                   elasticsearch_doc_type_key='table',
                                   model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                   cypher_query=None,
                                   elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_search_index`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                       it uses the `Table` query baked into the Extractor
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_doc_type_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias,
    })

    # only optionally add these keys, so need to dynamically `put` them
    if cypher_query:
        job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                       cypher_query)
    if elasticsearch_mapping:
        job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                       elasticsearch_mapping)

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    return job


if __name__ == "__main__":
    # Uncomment next line to get INFO level logging
    logging.basicConfig(level=logging.INFO)

    conf_data = read_config('/data/service/amundsen/databuilder/yzf_metadata/mysql_connect.yaml')
    mysql_conf = conf_data.get('conn')
    for conn_name, mysql_list in mysql_conf.items():
        for mysql_conn in enumerate(mysql_list):
            loading_job = run_mysql_job(mysql_conn[1], conn_name)
            loading_job.launch()

            job_es_table = create_es_publisher_sample_job(
                elasticsearch_index_alias='table_search_index',
                elasticsearch_doc_type_key='table',
                model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
            job_es_table.launch()
  • my_hive_loader.py 代码
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.

"""

import logging
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING

es_host = None
neo_host = None
if len(sys.argv) > 1:
    es_host = sys.argv[1]
if len(sys.argv) > 2:
    neo_host = sys.argv[2]

es = Elasticsearch([
    {'host': es_host or 'localhost'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

LOGGER = logging.getLogger(__name__)

# Todo: user provides a list of schema for indexing

SUPPORTED_HIVE_SCHEMAS = ['accounting_collect', 'accounting_company', 'accounting_report', 'ads', 'bi_dm_ods', \
                          'biz_dm', 'biz_dw', 'common', 'common_kudu', 'common_sim', 'companyinfo_ods', 'customer', \
                          'customer_kudu', 'datax', 'default', 'di', 'dingtalk', 'dingtalk_kudu', 'dwd', 'dwd_sim', \
                          'dws', 'fintax_account', 'fintax_application', 'fintax_asset', 'fintax_data_init',
                          'fintax_fund', 'fintax_invoice', \
                          'fintax_salary', 'fintax_statistics', 'fintax_stock', 'fintax_task', 'fintax_tax',
                          'fintax_user_point', 'flink_database', \
                          'invoice_lake', 'log_ods', 'monitor', 'octopus_ods', 'sale_ods', 'taxops_ods', 'ucenter',
                          'upm_paas', 'view', 'yzf_biz', \
                          'yzf_biz_init', 'yzf_common', 'yzf_config', 'yzf_report', 'yzf_report_init']
# Global used in all Hive metastore queries.
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_HIVE_SCHEMAS))


# todo: connection string needs to change
def connection_string():
    user = 'root'
    host = 'ip'
    port = '3306'
    db = 'hivemetastore'
    pd = '123456'
    return "mysql://%s:%s@%s:%s/%s?charset=utf8" % (user, pd, host, port, db)


def create_table_wm_job(templates_dict):
    sql = textwrap.dedent("""
        SELECT From_unixtime(A0.create_time) as create_time,
               'hive'                        as `database`,
               C0.NAME                       as `schema`,
               B0.tbl_name as table_name,
               {func}(A0.part_name) as part_name,
               {watermark} as part_type
        FROM   PARTITIONS A0
               LEFT OUTER JOIN TBLS B0
                            ON A0.tbl_id = B0.tbl_id
               LEFT OUTER JOIN DBS C0
                            ON B0.db_id = C0.db_id
        WHERE  C0.NAME IN {schemas}
               AND B0.tbl_type IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
               AND A0.PART_NAME NOT LIKE '%%__HIVE_DEFAULT_PARTITION__%%'
        GROUP  BY C0.NAME, B0.tbl_name
        ORDER by create_time desc
    """).format(func=templates_dict.get('agg_func'),
                watermark=templates_dict.get('watermark_type'),
                schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)

    LOGGER.info('SQL query: %s', sql)
    tmp_folder = '/var/tmp/amundsen/table_{hwm}'.format(hwm=templates_dict.get('watermark_type').strip("\""))
    node_files_folder = f'{tmp_folder}/nodes'
    relationship_files_folder = f'{tmp_folder}/relationships'

    hwm_extractor = SQLAlchemyExtractor()
    csv_loader = FsNeo4jCSVLoader()

    task = DefaultTask(extractor=hwm_extractor,
                       loader=csv_loader,
                       transformer=NoopTransformer())

    job_config = ConfigFactory.from_dict({
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
        f'extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
        f'extractor.sqlalchemy.{SQLAlchemyExtractor.EXTRACT_SQL}': sql,
        'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
    })
    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=Neo4jCsvPublisher())
    job.launch()


def run_hive_job():
    """
        Launches data builder job that extracts table and column metadata from MySQL Hive metastore database,
        and publishes to Neo4j.
        @param kwargs:
        @return:
        """

    # Adding to where clause to scope schema, filter out temp tables which start with numbers and views
    where_clause_suffix = textwrap.dedent("""
            WHERE d.NAME IN {schemas}
            AND t.TBL_NAME NOT REGEXP '^[0-9]+'
            AND t.TBL_TYPE IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
        """).format(schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
        f'extractor.hive_table_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
        'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
    })

    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=HiveTableMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                     publisher=Neo4jCsvPublisher())
    return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                   elasticsearch_doc_type_key='table',
                                   model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                   cypher_query=None,
                                   elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_search_index`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                       it uses the `Table` query baked into the Extractor
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    # 临时文件,可删除,对查询不影响
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_doc_type_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias,
    })

    # only optionally add these keys, so need to dynamically `put` them
    if cypher_query:
        job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                       cypher_query)
    if elasticsearch_mapping:
        job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                       elasticsearch_mapping)

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    return job


if __name__ == "__main__":
    # Uncomment next line to get INFO level logging
    logging.basicConfig(level=logging.INFO)

    loading_job = run_hive_job()
    loading_job.launch()

    templates_dict = {'agg_func': 'min',
                      'watermark_type': '"low_watermark"',
                      'part_regex': '{{ ds }}'}
    templates_dict.get('agg_func')

    create_table_wm_job(templates_dict)

    templates_dict = {'agg_func': 'max',
                      'watermark_type': '"high_watermark"',
                      'part_regex': '{{ ds }}'}

    create_table_wm_job(templates_dict)

    job_es_table = create_es_publisher_sample_job(
        elasticsearch_index_alias='table_search_index',
        elasticsearch_doc_type_key='table',
        model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
    job_es_table.launch()
  • mysql_connect.yaml 配置文件
conn:
  mysql1:
    - mysql://root:123456@ip1:3306/<db>
  mysql2:
    - mysql://root:123456@ip2:3306/<db>

FAQ

1. docker-compose -f docker-amundsen.yml up启动容器报错:

[2021-12-01T08:02:56,599][INFO ][o.e.b.BootstrapChecks ] [PD4Rw8t] bound or publishing to a non-loopback address, enforcing bootstrap checks
es_amundsen_atlas | ERROR: [1] bootstrap checks failed
es_amundsen_atlas | [1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

  • 增加堆内存,修改 vim /etc/sysctl.conf ,添加内容: vm.max_map_count = 262144

  • 重加载修改配置: sysctl -p

  • 重跑 docker-compose -f docker-amundsen.yml up

2.页面搜索元数据信息,中文注释显示乱码

  • 修改连接mysql url,在连接串后添加utf-8设置: ?charset=utf8

3.导入Hive的mysql元数据时报错:

sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1055, "Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'hive.A0.CREATE_TIME' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
[SQL:
SELECT From_unixtime(A0.create_time) as create_time,

....

  • 参考https://blog.csdn.net/fansili/article/details/78664267,修改mysql配置:

    mysql> set global sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

    mysql> set session sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

4. 阿蒙森官方FAQ: https://www.amundsen.io/amundsen/installation/#troubleshooting

5.防火墙是关闭的,通过215.5还可以访问不通阿蒙森机器的5000端口

解决办法:先打开防火墙,打开5000端口限制,再关闭防火墙。然后搜索正常访问5000端口。

systemctl start firewalld
firewall-cmd --zone=public --add-port=5000/tcp --permanent
firewall-cmd --reload
systemctl stop firewalld
systemctl disable firewalld

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容