Apache Airflow单机/分布式环境搭建

[TOC]


Airflow简介

Apache Airflow是一个提供基于DAG(有向无环图)来编排工作流的、可视化的分布式任务调度平台(也可单机),与Oozie、Azkaban等调度平台类似。Airflow在2014年由Airbnb发起,2016年3月进入Apache基金会,在2019年1月成为顶级项目。Airflow采用Python语言编写,并提供可编程方式定义DAG工作流(编写Python代码)。当工作流通过代码来定义时,它们变得更加可维护、可版本化、可测试和协作。

Airflow的可视化界面提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以在界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。在Airflow中工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始跑。

Airflow通常用在数据处理领域,也属于大数据生态圈的一份子。当然Airflow也可以用于调度非数据处理的任务,只不过数据处理任务之间通常都会存在依赖关系。而且这个关系可能还比较复杂,用crontab等基础工具无法满足,因此更需要被调度平台编排和管理。例如:

  • 时间依赖:任务需要等待某一个时间点触发
  • 外部系统依赖:任务依赖外部系统需要调用接口去访问
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响
  • 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行

Airflow的架构图如下:


image.png
  • Metadata Database:Airflow的元数据库,用于Webserver、Executor及Scheduler存储各种状态数据,通常是MySQL或PostgreSQL
  • User Interface:用户界面,即前端web界面
  • Webserver:web服务器,用于提供用户界面的操作接口
  • Scheduler:调度器,负责处理触发调度的工作流,并将工作流中的任务提交给执行器处理
  • Executor:执行器,负责处理任务实例。在本地模式下会运行在调度器中,并负责所有任务实例的处理。但是大多数适合于生产的执行器实际上是一个消息队列(RabbitMQ、Redis),负责将任务实例推送给工作节点执行
  • Workers:工作节点,真正负责调起任务进程、执行任务的节点,worker可以有多个,是独立的进程
  • DAG Directory:存放DAG任务图定义的Python代码的目录,代表一个Airflow的处理流程。代码文件所在的位置通过Airflow配置dags_folder指定,需要保证执行器、调度器以及工作节点都能够访问到

关于Airflow的更多内容可以参考官方文档:


准备工作

1、准备虚拟机或云服务环境,我这里使用的是本地的虚拟机:

  • 操作系统:CentOS7
  • CPU:8核
  • 内存:16G
  • 硬盘:20G
  • IP:192.168.243.175

2、编译安装Python3,安装步骤可以参考下文:

我这里安装的版本是3.9.1:

[root@localhost ~]# python3 --version
Python 3.9.1

3、安装Docker环境,安装步骤可以参考下文:

我这里安装的版本是19.03.12:

[root@localhost ~]# docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:46:54 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:45:28 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

4、安装MySQL数据库,安装步骤可以参考下文或MySQL官方文档

我这里安装的版本是8.0.21:

> select version();
8.0.21

Airflow单机环境搭建

完成准备工作后,我们就先来搭建Airflow的单机环境,先上官方文档:

设置一下Airflow的文件存储目录:

[root@localhost ~]# vim /etc/profile
export AIRFLOW_HOME=/usr/local/airflow
[root@localhost ~]# source /etc/profile

Airflow的安装很简单,只需要一条命令就能完成:

$ pip3 install "apache-airflow==2.1.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.0/constraints-3.9.txt" -i https://pypi.tuna.tsinghua.edu.cn/simple --default-timeout=6000

安装完成后,执行如下命令初始化数据库:

[root@localhost ~]# airflow db init
Traceback (most recent call last):
  File "/usr/local/python/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/usr/local/python/lib/python3.9/site-packages/airflow/__init__.py", line 34, in <module>
    from airflow import settings
  File "/usr/local/python/lib/python3.9/site-packages/airflow/settings.py", line 35, in <module>
    from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf  # NOQA F401
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 1115, in <module>
    conf = initialize_config()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 877, in initialize_config
    conf.validate()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 202, in validate
    self._validate_config_dependencies()
  File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 234, in _validate_config_dependencies
    import sqlite3
  File "/usr/local/python/lib/python3.9/sqlite3/__init__.py", line 23, in <module>
    from sqlite3.dbapi2 import *
  File "/usr/local/python/lib/python3.9/sqlite3/dbapi2.py", line 27, in <module>
    from _sqlite3 import *
ModuleNotFoundError: No module named '_sqlite3'

这时肯定会报错,因为我们还没有配置数据相关信息。之所以要先执行一下这条命令是为了让Airflow在我们设定的目录下生成配置文件:

[root@localhost ~]# ls /usr/local/airflow/
airflow.cfg  webserver_config.py
[root@localhost ~]# 

修改配置文件:

[root@localhost ~]# vim /usr/local/airflow/airflow.cfg
[core]
dags_folder = /usr/local/airflow/dags
default_timezone = Asia/Shanghai
# 配置数据库
sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
# Are DAGs paused by default at creation
dags_are_paused_at_creation = False

[webserver]
default_ui_timezone = Asia/Shanghai
# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = graph

[scheduler]
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30

到MySQL上创建数据库和用户:

CREATE DATABASE airflow CHARACTER SET utf8;
create user 'airflow'@'%' identified by '123456a.';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
  • Tips:数据库编码需为utf8,否则Airflow初始化数据库时可能会失败

安装MySQL客户端相关依赖包,需要具备如下依赖才能成功安装Python的mysqlclient库:

[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-devel-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-plugins-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-common-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-compat-8.0.25-1.el7.x86_64.rpm
[root@localhost ~]# yum install -y ./*.rpm

安装gcc:

[root@localhost ~]# yum install -y gcc make libffi-devel zlib*

安装mysqlclient:

[root@localhost ~]# pip3 install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple

再次初始化数据库:

[root@localhost ~]# airflow db init
...
Initialization done

初始化成功后,数据库表如下:


image.png

然后创建管理员用户:

[root@localhost ~]# airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

启动webserver:

[root@localhost ~]# airflow webserver --port 8080

启动scheduler:

[root@localhost ~]# airflow scheduler

执行官方的示例任务,测试下Airflow是否已正常启动,如下输出success代表没问题:

[root@localhost ~]# airflow tasks run example_bash_operator runme_0 2015-01-01
[2021-06-19 21:44:47,149] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/airflow/dags
Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
[2021-06-19 21:44:47,763] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/python/lib/python3.9/site-packages/airflow/example_dags/example_bash_operator.py
Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
[root@localhost ~]# 

Airflow的常用命令

# 守护进程运行webserver
$ airflow webserver -D       

# 守护进程运行调度器
$ airflow scheduler -D       

# 守护进程运行调度器
$ airflow worker -D          

# 守护进程运行celery worker并指定任务并发数为1
$ airflow worker -c 1 -D     

# 暂停任务
$ airflow pause $dag_id     

# 取消暂停,等同于在管理界面打开off按钮
$ airflow unpause $dag_id    

# 查看task列表
$ airflow list_tasks $dag_id  

# 清空任务实例
$ airflow clear $dag_id       

# 运行整个dag文件
$ airflow trigger_dag $dag_id -r $RUN_ID -e $EXEC_DATE  

# 运行task
$ airflow run $dag_id $task_id $execution_date       

常用页面操作

接着访问http://192.168.243.175:8080,登录airflow的用户界面:


image.png

登录成功,首页如下:


image.png

右上角可以选择时区:


image.png

页面上有些示例的任务,我们可以手动触发一些任务进行测试:


image.png

image.png

点击具体的DAG,就可以查看该DAG的详细信息和各个节点的运行状态:


image.png

点击DAG中的节点,就可以对该节点进行操作:


image.png

自定义DAG

接下来我们自定义一个简单的DAG给Airflow运行,创建Python代码文件:

[root@localhost ~]# mkdir /usr/local/airflow/dags
[root@localhost ~]# vim /usr/local/airflow/dags/my_dag_example.py

代码示例:

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# 默认参数
args = {
    'owner': 'admin',
}

with DAG(
        dag_id='my_dag_example',
        default_args=args,
        schedule_interval='@once',
        start_date=days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['my_dag'],
        params={"example_key": "example_value"}
) as dag:
    # 定义DAG中的节点
    first = BashOperator(
        task_id='first',
        bash_command='echo "run first task"',
    )
    middle = BashOperator(
        task_id='middle',
        bash_command='echo "run middle task"',
    )
    last = BashOperator(
        task_id='last',
        bash_command='echo "run last task"',
    )
    
    # 定义节点的上下游关系
    first >> middle >> last

等待一会在Web界面上可以看到我们自定义的DAG任务已经被运行完了,因为比较简单,所以执行得很快:


image.png

查看下节点的关系是否与我们在代码中定义的一样:


image.png

关于DAG的代码定义可以参考官方的示例代码和官方文档,自带的例子在如下目录:

  • /usr/local/python/lib/python3.9/site-packages/airflow/example_dags

Airflow分布式环境搭建

如果Airflow要支持分布式的话,需要安装RabbitMQ或Redis作为Airflow的Executor,安装步骤可以参考下文:

文本采用的是RabbitMQ,版本为3.8.9。若只是测试的话可以使用Docker快速安装,如下:

[root@localhost ~]# docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.9-management
[root@localhost ~]# docker exec -it rabbitmq bash
root@49c8ebed2525:/# rabbitmqctl add_user airflow password  # 添加用户
root@49c8ebed2525:/# rabbitmqctl add_vhost airflow_vhost   # 添加虚拟主机
root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow airflow_vhost  # 为用户绑定虚拟主机
root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow administrator  # 设置用户权限为管理员
root@49c8ebed2525:/# rabbitmqctl  set_permissions -p airflow_vhost airflow '.*' '.*' '.*' # 设置远程登录权限

在分布式这一环节我们使用Docker来部署,因为容器的弹性能力更强,而且部署方便,可以快速扩展多个worker。首先,拉取airflow的docker镜像:

[root@localhost ~]# docker pull apache/airflow

拷贝之前本地安装时生成的airflow配置文件:

[root@localhost ~]# cp /usr/local/airflow/airflow.cfg ~
[root@localhost ~]# vim airflow.cfg

然后修改配置文件的内容如下:

[core]
# 存放dag定义文件的目录
dags_folder = /opt/airflow/dags
default_timezone = Asia/Shanghai
# 配置数据库
sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
# The executor class that airflow should use
executor = CeleryExecutor
# Are DAGs paused by default at creation
dags_are_paused_at_creation = False
plugins_folder = /opt/airflow/plugins

[webserver]
default_ui_timezone = Asia/Shanghai
# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = graph

[scheduler]
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30
child_process_log_directory = /opt/airflow/logs/scheduler

[logging]
base_log_folder = /opt/airflow/logs
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log

[celery]
# worker的并发度,worker可以执行的任务实例的数量
worker_concurrency = 16
# worker日志服务的端口
worker_log_server_port = 8795
# RabbitMQ的连接地址
broker_url = amqp://airflow:password@192.168.243.175:5672/airflow_vhost
result_backend = db+mysql://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
flower_host = 0.0.0.0
flower_port = 5555

[operators]
default_queue = airflow_queue

创建一个airflow专属的docker网络,为了启动容器时能够指定各个节点的ip以及设置host,也利于与其他容器的网络隔离:

[root@localhost ~]# docker network create --driver bridge --subnet=172.18.12.0/16 --gateway=172.18.1.1 airflow

然后从镜像中创建各个节点的容器,注意ip和host的设置:

[root@localhost ~]# docker run -d -p 8080:8080 --name airflow_webserver \
--network=airflow --ip 172.18.12.1 --hostname airflow_webserver \
--add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow webserver

[root@localhost ~]# docker run -d --name airflow_scheduler \
--network=airflow --ip 172.18.12.2 --hostname airflow_scheduler \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow scheduler

[root@localhost ~]# docker run -d -p 5555:5555 --name airflow_flower \
--network=airflow --ip 172.18.12.3 --hostname airflow_flower \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_scheduler:172.18.12.2 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow celery flower

[root@localhost ~]# docker run -d -p 8795:8795 --name airflow_worker1 \
--network=airflow --ip 172.18.12.4 --hostname airflow_worker1 \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_worker2:172.18.12.5 \
apache/airflow celery worker

[root@localhost ~]# docker run -d -p 8796:8795 --name airflow_worker2 \
--network=airflow --ip 172.18.12.5 --hostname airflow_worker2 \
--add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
--add-host=airflow_worker1:172.18.12.4 --add-host=airflow_scheduler:172.18.12.2 \
apache/airflow celery worker

将宿主机上修改后的配置文件替换容器内的配置文件:

[root@localhost ~]# docker cp ./airflow.cfg airflow_webserver:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_scheduler:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_flower:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_worker1:/opt/airflow/airflow.cfg
[root@localhost ~]# docker cp ./airflow.cfg airflow_worker2:/opt/airflow/airflow.cfg

删除之前部署单机版时产生的数据表,然后重新执行数据库的初始化:

[root@localhost ~]# airflow db init

由于删除了之前的数据,所以需要重新创建airflow的管理员用户:

[root@localhost ~]# airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

然后重启各个节点:

[root@localhost ~]# docker restart airflow_webserver
[root@localhost ~]# docker restart airflow_scheduler
[root@localhost ~]# docker restart airflow_flower
[root@localhost ~]# docker restart airflow_worker1
[root@localhost ~]# docker restart airflow_worker2

通过docker ps确认各个节点都启动成功后,访问flower的web界面,可以查看在线的worker信息,以确认worker的存活状态:

image.png

然后访问webserver的web界面,确认能正常访问:


image.png

由于容器内的/opt/airflow/dags目录下没有任何文件,所以webserver的界面是空的。现在我们将之前编写的dag文件拷贝到容器内。注意,dag文件需要同步到所有的scheduler和worker节点,并且要保证airflow对该文件有足够的权限。如下示例:

[root@localhost ~]# chmod 777 /usr/local/airflow/dags/my_dag_example.py  # 为了避免权限问题,这里直接放开所有权限
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker1:/opt/airflow/dags/my_dag_example.py    # 先拷贝到worker节点,如果先拷贝到scheduler节点会触发调度,此时worker节点没相应的dag文件就会报错
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker2:/opt/airflow/dags/my_dag_example.py
[root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_scheduler:/opt/airflow/dags/my_dag_example.py

同步完dag文件后,等待一会可以看到任务被调度起来了:


image.png

运行成功:


image.png

进入graph view界面查看各个节点的状态:


image.png

查看first节点的日志信息,看看是否被正确调度到worker上了。可以看到,该节点被调度到了airflow_worker2上:

image.png

middle节点则被调度到了airflow_worker1上:

image.png

至此,我们就完成了airflow分布式环境的搭建和验证。但是还有一些不完美,就是在这个架构下webserver和scheduler有单点故障问题,不具备高可用性。不过在较新的版本中这个问题也比较好解决,webserver和scheduler都启动多个节点就好了,不像在老版本中为了让scheduler节点高可用还要做额外的特殊处理。关于scheduler的高可用说明可以参考官方文档:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容