kolla项目地址:https://github.com/openstack/kolla
kolla项目做什么的
主要是用来制作openstack的docker镜像,因为支持了很多openstack组件,所以镜像很多,也很大(每个镜像几百M),要是每次都去从远程获取是件很耗时间的事,所以kolla项目诞生,意在你可以在本地build出你需要的镜像,还可以进行对镜像定制化;
kolla源码
1. 主要流程
kolla的主要实现代码在kolla/image/build.py中,网上有很多解析,抄了别人的函数流程,加了些自己的理解注释
cmd/build.py main() ## 入口
-> image/build.py build.run_build()
-> common/config.py common_config.parse(...) ## 命令行,配置文件参数的定义配置注册
-> kolla=KollaWorker() ## build的任务对象,把命令行,配置文件的参数拿出来给对象。
->self.dc = docker.APIClient() ## 创建docker api client 用于调用docker 接口
-> kolla.setup_working_dir() ## 新建个放dockerfile和构建的工作目录,把docker目录的文件拷贝过来
-> kolla.find_dockerfiles() ## 检查下工作目录下的dokerfile.j2文件,放到self.docker_build_paths里了格式是(root, dirs, names )
-> kolla.create_dockerfiles() ## 做镜像文件,里边的变量时kollaWorker类的__init__通过conf拿过来的
-> jinja2.Environment() ## jinja2的使用流程 见jinja2 介绍
-> jinja2.FileSystemLoader()
-> template.render() ## 所有的*j2模板重新渲染,后面写入新建dockerfie文件
->queue=kolla.build_queue() ## 创建多线程放到队列里来执行build操作
-> self.build_image_list() ## 根据配置生成所有image列表,已经依赖关系
-> self.find_parents() ## 整理出image的父image
-> self.filter_images() ## 根据配置的regex和profile生成需要编译的iamge列表
-> queue.put(BuildTask(self.conf, image, push_queue))
->for x in six.moves.range(conf.threads): #在线程数内开启n个线程
->worker=WorkThread(conf, queue)
->worker.start()
-> WorkThread().run()
-> task.run()
-> self.builder()
-> make_an_archive ## 需要下载plugin和additions
-> self.dc.build(*args) ## 来自docker task, 调用 docker api 创建image,这里的build方法来自docker API client()的build方法
-> 如果push:PushIntoQueueTask(参数,self.image,参数)
-> queue.put(task.followups) ##将镜像的子镜像放入队列
# push_queue,如果需要push的话才会执行
->for x in six.moves.range(conf.push_threads)
->WorkerThread(conf, push_queue)
->worker.start()
-> WorkThread().run()
-> task.run()
->PushTask.run()
-> self.push_image(image)
-> self.dc.push ## 调用 docker api push image到仓库
->kolla.summary() #返回结果汇总和处理
主要不是很明白queue是怎么处理带有依赖层级的镜像的,上边流程图queue以后比较详细,方便我理解;
2.从入口开始解析
run_build()
def run_build():
...
kolla = KollaWorker(conf) #注册kolla实例
kolla.setup_working_dir() #创建工作目录
kolla.find_dockerfiles() #查找dockerfile
kolla.create_dockerfiles() #创建dockerfile
#template_only参数是只生成dockerfile,不做其他的build镜像操作
if conf.template_only:
LOG.info('Dockerfiles are generated in %s', kolla.working_dir)
return
......
###重点解析下面这段代码的实现
push_queue = six.moves.queue.Queue()
queue = kolla.build_queue(push_queue)
workers = []
with join_many(workers):
try:
for x in six.moves.range(conf.threads):
worker = WorkerThread(conf, queue)
worker.setDaemon(True)
worker.start()
workers.append(worker)
for x in six.moves.range(conf.push_threads):
worker = WorkerThread(conf, push_queue)
worker.setDaemon(True)
worker.start()
workers.append(worker)
# sleep until queue is empty
while queue.unfinished_tasks or push_queue.unfinished_tasks:
time.sleep(3)
# ensure all threads exited happily
push_queue.put(WorkerThread.tombstone)
queue.put(WorkerThread.tombstone)
except KeyboardInterrupt:
for w in workers:
w.should_stop = True
push_queue.put(WorkerThread.tombstone)
queue.put(WorkerThread.tombstone)
raise
join_many()解析
首先join_many() 代码如下,带上了上下文装饰器,yield字段以上是代表enter()方法,以下是exit()方法,函数主要做的就是exit()方法,主要处理线程人为使用ctrl+c退出,当ctrl+c两次,强制退出;
@contextlib.contextmanager
def join_many(threads):
try:
yield
for t in threads:
t.join()
except KeyboardInterrupt:
try:
LOG.info('Waiting for daemon threads exit. Push Ctrl + c again to'
' force exit')
for t in threads:
if t.is_alive():
LOG.debug('Waiting thread %s to exit', t.name)
# NOTE(Jeffrey4l): Python Bug: When join without timeout,
# KeyboardInterrupt is never sent.
t.join(0xffff)
LOG.debug('Thread %s exits', t.name)
except KeyboardInterrupt:
LOG.warning('Force exits')
build_queue()
queue 队列的定义
six包主要是用来兼容py2和py3的,six.moves里的函数是py3里对模块做了重新规划,改变了位置和名字,跟py2对不上了,所以就有了moves,
有了moves以后,这样我们的代码可以py2下也可以在py3下顺利的执行,不然写起来很丑陋
举例:
six.moves中的html_parser,对应了py2中的HTMLParser,py3中的html.parser
在这我们用的queue:
在six.moves.queue 对应py2里的Queue, py3里的queue
(在github的kolla master分支上,已经不再支持py2, 所以也没用six)
在这里我们初始化了一个push_queue, 然后又创建了一个queue,为啥
push_queue = six.moves.queue.Queue()
queue = kolla.build_queue(push_queue)
build_queue()函数返回一个queue,主要做的几件事:
- 列出要build的镜像列表,
- 判断镜像依赖关系对self.images重新整理,
- 使用配置文件带的正则或者profiles里的内容过滤镜像列表;
- 将那些没有父镜像的元素加入到队列
- queue中put的对象的内容是:BuildTask(self.conf, image, push_queue);
def build_queue(self, push_queue):
"""Organizes Queue list.
Return a list of Queues that have been organized into a hierarchy
based on dependencies
"""
self.build_image_list()
self.find_parents()
self.filter_images()
queue = six.moves.queue.Queue()
for image in self.images:
if image.status in (STATUS_UNMATCHED, STATUS_SKIPPED):
# Don't bother queuing up build tasks for things that
# were not matched in the first place... (not worth the
# effort to run them, if they won't be used anyway).
continue
if image.parent is None:
queue.put(BuildTask(self.conf, image, push_queue))
LOG.info('Added image %s to queue', image.name)
return queue
下面开始从队列里取这些放进去的BuildTask放到线程里去build
先看这里:
定义conf.threads个线程,从queue里取出BuildTask任务放到后台
with join_many(workers):
try:
for x in six.moves.range(conf.threads): #conf.threads是开启的线程数量
worker = WorkerThread(conf, queue)
worker.setDaemon(True) #设置守护线程,主线程挂了,会把守护线程杀了
worker.start() #线程开始,一个线程只能调一次,执行内容在run方法里
workers.append(worker)
WorkerThread(conf, queue).run()
我们去看下WorkerThread(conf, queue)里的run()方法
- 总之就是在self.conf.retries次数内开始执行task.run(), 这里的run()是BuildTask.run()
2.执行完后将task.followups的任务追加到队列中(这里是我主要不理解要往下挖的地方)
def run(self):
while not self.should_stop:
task = self.queue.get()
if task is self.tombstone:
# Ensure any other threads also get the tombstone.
self.queue.put(task)
break
try:
for attempt in six.moves.range(self.conf.retries + 1):
if self.should_stop:
break
LOG.info("Attempt number: %s to run task: %s ",
attempt + 1, task.name)
try:
task.run()
if task.success:
break
except Exception:
LOG.exception('Unhandled error when running %s',
task.name)
# try again...
task.reset()
if task.success and not self.should_stop:
for next_task in task.followups:
LOG.info('Added next task %s to queue',
next_task.name)
self.queue.put(next_task)
finally:
self.queue.task_done()
BuildTask.run() -->self.builder(self.image)
直接看builder方法里有什么
- make_an_archives()
- build镜像之前的操作
- 使用self.dc.build(**)开始build镜像
def builder(self, image):
def make_an_archive(items, arcname, item_child_path=None):
....主要实现archives和plugins...
...一系列的条件处理...
buildargs = self.update_buildargs()
try:
for stream in self.dc.build(path=image.path,
tag=image.canonical_name,
nocache=not self.conf.cache,
rm=True,
decode=True,
network_mode=self.conf.network_mode,
pull=pull,
forcerm=self.forcerm,
buildargs=buildargs):
if 'stream' in stream:
for line in stream['stream'].split('\n'):
if line:
self.logger.info('%s', line)
if 'errorDetail' in stream:
image.status = STATUS_ERROR
self.logger.error('Error\'d with the following message')
for line in stream['errorDetail']['message'].split('\n'):
if line:
self.logger.error('%s', line)
return
if image.status != STATUS_ERROR and self.conf.squash:
self.squash()
except docker.errors.DockerException:
......
else:
image.status = STATUS_BUILT
self.logger.info('Built')
看看WorkerThread里写往queue里添加BuildTask()的那段代码
1.顺便把task.run()的部分贴了出来
- 把task.followups加到队列中,这里的followups是BuildTask的一个属性
class WorkerThread(threading.Thread):
def run(self):
.....
for attempt in six.moves.range(self.conf.retries + 1):
try:
task.run()
if task.success:
break
except Exception:
......
if task.success and not self.should_stop:
for next_task in task.followups:
LOG.info('Added next task %s to queue',
next_task.name)
self.queue.put(next_task)
BuildTask().followups()
1.当配置文件里需要push镜像,self.conf.push为True,并且镜像build成功
放BuildTask(self.image)放进followups里
2.如果镜像有子镜像(self.image.children),并且镜像build成功,把镜像放进followups[]里
tips:
我看了下这里的followups.extend([BuildTask(..),])和 followups.append(BuildTask(..))没啥区别,??
class BuildTask(DockerTask):
......
def run(self):
self.builder(self.image)
if self.image.status in (STATUS_BUILT, STATUS_SKIPPED):
self.success = True
@property
def followups(self):
followups = []
if self.conf.push and self.success:
followups.extend([
# If we are supposed to push the image into a docker
# repository, then make sure we do that...
PushIntoQueueTask(
PushTask(self.conf, self.image),
self.push_queue),
])
if self.image.children and self.success:
for image in self.image.children:
if image.status == STATUS_UNMATCHED:
continue
followups.append(BuildTask(self.conf, image, self.push_queue))
return followups
1.能不能有个pdb出的结果,验证下;
镜像的依赖关系:
{
"base": [
{
"ceph-base": [
"ceph-mds",
"ceph-mgr",
"ceph-mon",
"ceph-nfs",
"ceph-osd",
"ceph-rgw",
"cephfs-fuse"
]
},
"certmonger",
"chrony",
"crane",
"cron",
"dnsmasq",
"elasticsearch",
"etcd",
"fluentd",
"grafana",
"haproxy",
"helm-repository",
"influxdb",
"iscsid",
"kafka",
"keepalived",
"kibana",
"kolla-toolbox",
{
"nova-base": [
"nova-api",
"nova-compute-ironic",
"nova-compute",
"nova-conductor",
"nova-consoleauth",
"nova-novncproxy",
"nova-placement-api",
"nova-scheduler",
"nova-serialproxy",
"nova-spicehtml5proxy",
"nova-ssh"
]
},
{
"novajoin-base": [
"novajoin-notifier",
"novajoin-server"
]
},
{
"octavia-base": [
"octavia-api",
"octavia-health-manager",
"octavia-housekeeping",
"octavia-worker"
]
},
....省略...
}