目录:
- 前言
- Atomic类型对象
- Multipart类型对象
- 附录
前言
宏观上,我们知道:
上传一个对象到rgw,要写多个地方(开启bi log和data log的话):
-
<zone>.rgw.buckets.data
pool中加入一个名为 <bucket id>_<key>的对象 -
<zone>.rgw.bucket.meta
pool中的bucket index对象的omap中增加该对象的信息 - bucket index对象的omap中还要增加bi log
-
<zone>.rgw.log
pool 的data_log对象的omap中增加data log
知道这些,读代码时会有一个大的方向。
Atomic类型对象
一次Atomic类型的上传就是上传了一个完整的rgw对象。而一次Multipart类型的上传只是上传了某个rgw对象的一部分。
这是开启了compression时的Atomic对象写入流程。
函数比较多,下面只给出部分关键函数,其他一些函数的功能在注释中用文字描述:
- RGWPutObj::execute
- RGWPutObj_Compress::prepare
- RGWPutObjProcessor_Atomic::prepare
- put_data_and_throttle
- RGWPutObj_Compress::handle_data
- RGWPutObjProcessor_Atomic::handle_data
- RGWPutObjProcessor_Atomic::write_data
- RGWPutObjProcessor_Aio::handle_obj_data
- RGWRados::aio_put_obj_data
- librados aio API
- RGWRados::aio_put_obj_data
- RGWPutObjProcessor_Aio::handle_obj_data
- RGWPutObjProcessor_Atomic::write_data
- RGWPutObjProcessor_Atomic::handle_data
- RGWPutObjProcessor_Aio::throttle_data
- RGWPutObj_Compress::handle_data
- RGWPutObjProcessor::complete
- RGWPutObjProcessor_Atomic::complete
- RGWPutObj_Compress::prepare
RGWPutObj::execute
rgw上传对象的执行函数是RGWPutObj::execute()
,下面对这个函数流程进行分析,可能会有一些错误和疏漏。
主要内容在代码中以注释的方式给出,方便结合代码阅读。
有几点要注意:
-
RGWPutObjProcessor *processor
,这个对象是一个父类指针,根据对象传输方式的不同,会指向两个不同的对象:RGWPutObjProcessor_Atomic
或RGWPutObjProcessor_Multipart
。分别处理Atomic对象和Multipart对象的上传工作。 -
RGWPutObjDataProcessor *filter
意如其名,作为一个中间件的存在,其handle_data
和throttle_data
函数,会根据是否设置sse和compression调用不同子类的函数,对数据进行压缩、加密等工作。然后将处理后的数据传递给RGWPutObjProcessor_Multipart
或RGWPutObjProcessor_Atomic
的对应函数进行处理。
void RGWPutObj::execute()
{
//---------------------------------------------------------
// 变量定义、初始化
//---------------------------------------------------------
RGWPutObjProcessor *processor = NULL;
// filter用于对数据进行处理,比如加密和压缩
RGWPutObjDataProcessor *filter = nullptr;
std::unique_ptr<RGWPutObjDataProcessor> encrypt;
// 用于存储用户提供的md5、计算的md5 相关的数组
char supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1];
char supplied_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
MD5 hash;
bufferlist bl, aclbl, bs;
int len;
map<string, string>::iterator iter;
bool multipart;
// copy source range 相关
off_t fst;
off_t lst;
// 根据zone配置选择object的压缩类型,可为none或具体的压缩插件名字
// http://docs.ceph.com/docs/kraken/radosgw/compression/
const auto& compression_type = store->get_zone_params().get_compression_type(
s->bucket_info.placement_rule);
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;
bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
perfcounter->inc(l_rgw_put);
op_ret = -EINVAL;
//---------------------------------------------------------
// 解析并检查请求参数是否完整、正确
//---------------------------------------------------------
// 判断用户请求object name、bucket name等是否正确
if (s->object.empty()) {
goto done;
}
if (!s->bucket_exists) {
op_ret = -ERR_NO_SUCH_BUCKET;
return;
}
// 解析并判断http请求的相关参数,包括copy obj的情况、包含tagging的情况、包含version的情况,以及基本的objname和bucketname解析
op_ret = get_params();
if (op_ret < 0) {
ldout(s->cct, 20) << "get_params() returned ret=" << op_ret << dendl;
goto done;
}
op_ret = get_system_versioning_params(s, &olh_epoch, &version_id);
if (op_ret < 0) {
ldout(s->cct, 20) << "get_system_versioning_params() returned ret="
<< op_ret << dendl;
goto done;
}
// 判断并处理请求是否提供了md5来校验请求完整
if (supplied_md5_b64) {
need_calc_md5 = true;
ldout(s->cct, 15) << "supplied_md5_b64=" << supplied_md5_b64 << dendl;
op_ret = ceph_unarmor(supplied_md5_bin, &supplied_md5_bin[CEPH_CRYPTO_MD5_DIGESTSIZE + 1],
supplied_md5_b64, supplied_md5_b64 + strlen(supplied_md5_b64));
ldout(s->cct, 15) << "ceph_armor ret=" << op_ret << dendl;
if (op_ret != CEPH_CRYPTO_MD5_DIGESTSIZE) {
op_ret = -ERR_INVALID_DIGEST;
goto done;
}
buf_to_hex((const unsigned char *)supplied_md5_bin, CEPH_CRYPTO_MD5_DIGESTSIZE, supplied_md5);
ldout(s->cct, 15) << "supplied_md5=" << supplied_md5 << dendl;
}
// 判断http传输是否使用了chunk传输的方式,如果没有,可以直接根据content length来判断quota,否则需要等到所有chunk接收完成
if (!chunked_upload) { /* with chunked upload we don't know how big is the upload.
we also check sizes at the end anyway */
// 判断是否满足user和bucket的quota约束
op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
user_quota, bucket_quota, s->content_length);
if (op_ret < 0) {
ldout(s->cct, 20) << "check_quota() returned ret=" << op_ret << dendl;
goto done;
}
// 判断是否满足bucket index的shard的约束
op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
if (op_ret < 0) {
ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
goto done;
}
}
// 当启用Multipart上传时,用户每次上传新part需要带上之前上传response中返回的etag
// 判断用户是否提供了etag
if (supplied_etag) {
strncpy(supplied_md5, supplied_etag, sizeof(supplied_md5) - 1);
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
// 判断用户是否使用multipart方式的obj,并返回对应的processor : RGWPutObjProcessor_Atomic 或 RGWPutObjProcessor_Multipart
// 并用multipart (bool)标识是否是multipart
processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
// no filters by default
filter = processor;
/* Handle object versioning of Swift API. */
if (! multipart) {
rgw_obj obj(s->bucket, s->object);
op_ret = store->swift_versioning_copy(*static_cast<RGWObjectCtx *>(s->obj_ctx),
s->bucket_owner.get_id(),
s->bucket_info,
obj);
if (op_ret < 0) {
goto done;
}
}
// 调用RGWPutObjProcessor_Atomic或RGWPutObjProcessor_Multipart的prepare:
// RGWPutObjProcessor_Atomic:
// 写入前的准备工作:生成对象名称前缀、设置placement rules、
// 在内存中创建对应的对象、设置切分head和tail对象的尺寸等等工作
// RGWPutObjProcessor_Multipart:
// 比起Atomic,多了处理uploadId和partNumber的过程
// 完成对应的工作后,嵌套调用RGWPutObjProcessor_Aio的prepare:
// 根据用户配置,设置aio的window size
// 然后会嵌套调用RGWPutObjProcessor的prepare:
// 设置RGWPutObjProcessor的store指针
op_ret = processor->prepare(store, NULL);
if (op_ret < 0) {
ldout(s->cct, 20) << "processor->prepare() returned ret=" << op_ret
<< dendl;
goto done;
}
// 如果是copy source range操作,获得source对象的起止偏移
fst = copy_source_range_fst;
lst = copy_source_range_lst;
// sse相关,如果用户设置了sse,则进行加密的准备
op_ret = get_encrypt_filter(&encrypt, filter);
if (op_ret < 0) {
goto done;
}
// 需要加密时,filter用于加密数据
if (encrypt != nullptr) {
filter = encrypt.get();
} else {
//no encryption, we can try compression
if (compression_type != "none") {
// 不需要加密时,并且compression_type被设置了,filter被用于压缩数据
plugin = get_compressor_plugin(s, compression_type);
if (!plugin) {
ldout(s->cct, 1) << "Cannot load plugin for compression type "
<< compression_type << dendl;
} else {
// 如果一切都没问题,构造compressor
compressor.emplace(s->cct, plugin, filter);
filter = &*compressor;
}
}
}
//-------------------------------------------------------------------------
// 前期参数解析、工具准备、head obj初始化、写入ctx初始化工作完成
// 下面从req中读取数据,经处理后存入rados
//-------------------------------------------------------------------------
do {
bufferlist data;
if (fst > lst)
break;
if (!copy_source) {
// 如果不是copy,是正常的put
// 读取请求体rgw_max_chunk_size字节的数据到data
/* 有关rgw_max_chunk_size的解释:
Option("rgw_max_chunk_size", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(4_M)
.set_description("Set RGW max chunk size")
.set_long_description(
"The chunk size is the size of RADOS I/O requests that RGW sends when accessing "
"data objects. RGW read and write operation will never request more than this amount "
"in a single request. This also defines the rgw object head size, as head operations "
"need to be atomic, and anything larger than this would require more than a single "
"operation."),
*/
len = get_data(data);
} else {
// 否则,从另一个对象读取
uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
op_ret = get_data(fst, cur_lst, data);
if (op_ret < 0)
goto done;
len = data.length();
s->content_length += len;
fst += len;
}
if (len < 0) {
op_ret = len;
ldout(s->cct, 20) << "get_data() returned ret=" << op_ret << dendl;
goto done;
}
// 计算data的md5
if (need_calc_md5) {
hash.Update((const byte *)data.c_str(), data.length());
}
/* update torrrent */
torrent.update(data);
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
* head, e.g., multipart object we need to make sure we're the first one writing to this object
*/
bool need_to_wait = (ofs == 0) && multipart;
bufferlist orig_data;
if (need_to_wait) {
orig_data = data;
}
// 先将调用RGWPutObj_Compress::handle_data数据进行压缩
// (或加密 或者 什么都不做)
// 然后调用`RGWPutObjProcessor_Atomic::handle_data`
// 将处理后的数据切分成一个head和多个tail对象
// handle_data最终调用`store->aio_put_obj_data`函数,将对象写入rados
// 在使用librados异步写时,需要先调用aio_create_completion函数,该
// 函数会返回一个rados_completion_t类型的对象,来表示异步写的状态
// rados_completion_t: Represents the state of an asynchronous operation
// - it contains the return value once the operation completes,
// and can be used to block until the operation is complete or safe.
// put_data_and_throttle调用throttle_data时会传入这个对象的指针(handle)
// 这里,如果是上传Multipart类型对象的第一块数据,need_to_wait为true
// need_to_wait为true表示函数会等到该块数据写入rados才返回(变为同步写)
op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
if (op_ret < 0) {
if (!need_to_wait || op_ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret="
<< op_ret << dendl;
goto done;
}
/* need_to_wait == true and op_ret == -EEXIST */
ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
/* restore original data */
data.swap(orig_data);
/* restart processing with different oid suffix */
dispose_processor(processor);
processor = select_processor(*static_cast<RGWObjectCtx *>(s->obj_ctx), &multipart);
filter = processor;
string oid_rand;
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
oid_rand.append(buf);
op_ret = processor->prepare(store, &oid_rand);
if (op_ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->prepare() returned "
<< op_ret << dendl;
goto done;
}
op_ret = get_encrypt_filter(&encrypt, filter);
if (op_ret < 0) {
goto done;
}
if (encrypt != nullptr) {
filter = encrypt.get();
} else {
if (compressor) {
compressor.emplace(s->cct, plugin, filter);
filter = &*compressor;
}
}
op_ret = put_data_and_throttle(filter, data, ofs, false);
if (op_ret < 0) {
goto done;
}
}
// ofs表示当前已经从请求体中读取的数据长度
ofs += len;
// len==0 表示对象数据读取完成
} while (len > 0);
{
// flush 缓冲区
bufferlist flush;
op_ret = put_data_and_throttle(filter, flush, ofs, false);
if (op_ret < 0) {
goto done;
}
}
// 如果不是chunk uoload,并且接收到的数据和content length不同,表明传输出现错误
if (!chunked_upload && ofs != s->content_length) {
op_ret = -ERR_REQUEST_TIMEOUT;
goto done;
}
s->obj_size = ofs;
perfcounter->inc(l_rgw_put_b, s->obj_size);
// 如函数名……
op_ret = do_aws4_auth_completion();
if (op_ret < 0) {
goto done;
}
// 判断是否超出quota限制
op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
user_quota, bucket_quota, s->obj_size);
if (op_ret < 0) {
ldout(s->cct, 20) << "second check_quota() returned op_ret=" << op_ret << dendl;
goto done;
}
// 判断是否超出bucket index 某个shards的最大obj数目
op_ret = store->check_bucket_shards(s->bucket_info, s->bucket, bucket_quota);
if (op_ret < 0) {
ldout(s->cct, 20) << "check_bucket_shards() returned ret=" << op_ret << dendl;
goto done;
}
hash.Final(m);
// 将压缩信息加入attrs
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = plugin->get_type_name();
cs_info.orig_size = s->obj_size;
cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
attrs[RGW_ATTR_COMPRESSION] = tmp;
ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
<< " with type=" << cs_info.compression_type
<< ", orig_size=" << cs_info.orig_size
<< ", blocks=" << cs_info.blocks.size() << dendl;
}
buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
etag = calc_md5;
// 判断数据的md5是否符合期望
if (supplied_md5_b64 && strcmp(calc_md5, supplied_md5)) {
op_ret = -ERR_BAD_DIGEST;
goto done;
}
// 把acl信息存入xattr
policy.encode(aclbl);
emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
// dlo 和 slo暂时不懂,略过先
if (dlo_manifest) {
op_ret = encode_dlo_manifest_attr(dlo_manifest, attrs);
if (op_ret < 0) {
ldout(s->cct, 0) << "bad user manifest: " << dlo_manifest << dendl;
goto done;
}
complete_etag(hash, &etag);
ldout(s->cct, 10) << __func__ << ": calculated md5 for user manifest: " << etag << dendl;
}
if (slo_info) {
bufferlist manifest_bl;
::encode(*slo_info, manifest_bl);
emplace_attr(RGW_ATTR_SLO_MANIFEST, std::move(manifest_bl));
hash.Update((byte *)slo_info->raw_data, slo_info->raw_data_len);
complete_etag(hash, &etag);
ldout(s->cct, 10) << __func__ << ": calculated md5 for user manifest: " << etag << dendl;
}
// etag相关
if (supplied_etag && etag.compare(supplied_etag) != 0) {
op_ret = -ERR_UNPROCESSABLE_ENTITY;
goto done;
}
bl.append(etag.c_str(), etag.size() + 1);
emplace_attr(RGW_ATTR_ETAG, std::move(bl));
// 将其他 (其他从http请求中获得的、对象需要的attr),存入xattr
populate_with_generic_attrs(s, attrs);
op_ret = rgw_get_request_metadata(s->cct, s->info, attrs);
if (op_ret < 0) {
goto done;
}
encode_delete_at_attr(delete_at, attrs);
encode_obj_tags_attr(obj_tags.get(), attrs);
/* Add a custom metadata to expose the information whether an object
* is an SLO or not. Appending the attribute must be performed AFTER
* processing any input from user in order to prohibit overwriting. */
if (slo_info) {
bufferlist slo_userindicator_bl;
slo_userindicator_bl.append("True", 4);
emplace_attr(RGW_ATTR_SLO_UINDICATOR, std::move(slo_userindicator_bl));
}
// 完成之前未完成的head和tail的写入,为head设置xattr
op_ret = processor->complete(s->obj_size, etag, &mtime, real_time(), attrs,
(delete_at ? *delete_at : real_time()), if_match, if_nomatch,
(user_data.empty() ? nullptr : &user_data));
// only atomic upload will upate version_id here
if (!multipart)
version_id = (static_cast<RGWPutObjProcessor_Atomic *>(processor))->get_version_id();
/* produce torrent */
if (s->cct->_conf->rgw_torrent_flag && (ofs == torrent.get_data_len()))
{
torrent.init(s, store);
torrent.set_create_date(mtime);
op_ret = torrent.complete();
if (0 != op_ret)
{
ldout(s->cct, 0) << "ERROR: torrent.handle_data() returned " << op_ret << dendl;
goto done;
}
}
done:
// 释放processor
dispose_processor(processor);
perfcounter->tinc(l_rgw_put_lat,
(ceph_clock_now() - s->time));
}
put_data_and_throttle
在execute中,这个函数被调用的过程如下:
do{
bufferlist data;
// 从请求体读取最多 rgw_max_chunk_size 字节的数据到data
len = get_data(data);
......
op_ret = put_data_and_throttle(filter, data, ofs, need_to_wait);
......
} while (len > 0);
不断从请求体读取数据,经过filter处理后,交由Atomic或Multipart类做分片和写入操作。
static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor,
bufferlist& data, off_t ofs,
bool need_to_wait)
{
bool again = false;
do {
void *handle = nullptr;
rgw_raw_obj obj;
uint64_t size = data.length();
// 有关Aio的部分,可以查阅附录A
// handle指针指向aio返回的对象,可以通过handle得知aio是否完成
int ret = processor->handle_data(data, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;
if (handle != nullptr)
{
// 将obj和handle封装后放入 Aio类的pending队列
// 并根据window_size限制pending队列的大小
ret = processor->throttle_data(handle, obj, size, need_to_wait);
if (ret < 0)
return ret;
}
else
break;
need_to_wait = false; /* the need to wait only applies to the first
* iteration */
} while (again);
return 0;
} /* put_data_and_throttle */
RGWPutObjProcessor_Atomic::handle_data
这个函数主要完成的是将一个rgw对象切分成一个head对象和多个tail对象的操作,然后调用write_data
函数异步写入rados。
int RGWPutObjProcessor_Atomic::handle_data(bufferlist &bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
{
*phandle = NULL;
// data_ofs表示当前已经执行写入操作的所有数据
// next_part_ofs表示下一rados对象的开头,也就是当前要写入的rados对象的结尾
// 也就是说,总数据从cur_part_ofs开始到next_part_ofs结束的部分写入cur_obj指向的rados对象
// 这么做是因为一个rgw对象会被切分成多个rados对象(一个head,多个tail),每个默认大小4M
uint64_t max_write_size = std::min(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
// 把bl中的数据move到pending_data_bl的尾部
pending_data_bl.claim_append(bl);
// 如果加上bl中的数据,数据总长度仍然达不到写入操作的阈值(max_chunk_size),返回,等待下一次handle_data的调用
if (pending_data_bl.length() < max_write_size)
{
*again = false;
return 0;
}
// 把pending_data_bl前max_write_size字节的数据移到bl中
pending_data_bl.splice(0, max_write_size, &bl);
// 如果pending_data_bl剩下的数据仍然大大于写入操作的阈值(max_chunk_size)
/* do we have enough data pending accumulated that needs to be written? */
*again = (pending_data_bl.length() >= max_chunk_size);
// 如果是head对象 并且 immutable_head()为false
// data_ofs为0表示第一次写数据
// immutable_head()函数
// 在RGWPutObjProcessor_Atomic中默认返回false
// 但可能会被子类继承并重写
if (!data_ofs && !immutable_head())
{
// 将bl中数据move到first_chunk中
first_chunk.claim(bl);
obj_len = (uint64_t)first_chunk.length();
// 更新next_part_ofs和cur_part_ofs,将cur_obj指针指向当前要写入的rados对象
int r = prepare_next_part(obj_len);
if (r < 0) return r;
// 更新总写入的数据偏移data_ofs
data_ofs = obj_len;
return 0;
}
off_t write_ofs = data_ofs;
data_ofs = write_ofs + bl.length();
// 对于不可改变类型的对象,当上传其head对象时,做一下标志,让后面做特别处理
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
// 该函数先判断write_ofs是否大于next_part_ofs
// 如果是,则调用prepare_next_part函数,更新cur_obj、cur_part_ofs、next_part_ofs
// 然后,将pobj设为cur_obj
// 最后调用 hanle_obj_data 函数,做进一步操作
// hanle_obj_data通过aio_put_obj_data,最终调用了librados aio相关的api,将数据异步写入rados
int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
if (ret >= 0)
{ /* we might return, need to clear bl as it was already sent */
bl.clear();
}
return ret;
}
RGWPutObjProcessor_Atomic::do_complete
complete
函数会转而去调用对应的do_complete
函数,没有太多逻辑,直接看do_complete
函数。
这个函数主要是做收尾工作,之前在put_data_and_throttle
函数中开始了异步写流程,在收尾时,首先等待所有异步写操作完成。然后将上传的rgw对象的attrs信息写入head对象的xattr中,并写bi log、bucket index,完成对象上传操作。
int RGWPutObjProcessor_Atomic::do_complete(size_t accounted_size, const string &etag,
real_time *mtime, real_time set_mtime,
map<string, bufferlist> &attrs,
real_time delete_at,
const char *if_match,
const char *if_nomatch, const string *user_data,
rgw_zone_set *zones_trace)
{
// 等待该rgw对象的所有异步写稻作完成
int r = complete_writing_data();
if (r < 0)
return r;
// 标识该对象为Atomic类型的对象
obj_ctx.obj.set_atomic(head_obj);
// 将该rgw对象的attrs写入head对象的xattr中
RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
/* some object types shouldn't be versioned, e.g., multipart parts */
op_target.set_versioning_disabled(!versioned_object);
RGWRados::Object::Write obj_op(&op_target);
obj_op.meta.data = &first_chunk;
obj_op.meta.manifest = &manifest;
obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
obj_op.meta.if_match = if_match;
obj_op.meta.if_nomatch = if_nomatch;
obj_op.meta.mtime = mtime;
obj_op.meta.set_mtime = set_mtime;
obj_op.meta.owner = bucket_info.owner;
obj_op.meta.flags = PUT_OBJ_CREATE;
obj_op.meta.olh_epoch = olh_epoch;
obj_op.meta.delete_at = delete_at;
obj_op.meta.user_data = user_data;
obj_op.meta.zones_trace = zones_trace;
obj_op.meta.modify_tail = true;
r = obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0)
{
return r;
}
canceled = obj_op.meta.canceled;
return 0;
}
Multipart类型对象
- RGWPutObj::execute
- RGWPutObj_Compress::prepare
- RGWPutObjProcessor_Multipart::prepare
- put_data_and_throttle
- RGWPutObj_Compress::handle_data
- RGWPutObjProcessor_Atomic::handle_data
- RGWPutObjProcessor_Atomic::write_data
- RGWPutObjProcessor_Aio::handle_obj_data
- RGWRados::aio_put_obj_data
- librados aio API
- RGWRados::aio_put_obj_data
- RGWPutObjProcessor_Aio::handle_obj_data
- RGWPutObjProcessor_Atomic::write_data
- RGWPutObjProcessor_Atomic::handle_data
- RGWPutObjProcessor_Aio::throttle_data
- RGWPutObj_Compress::handle_data
- RGWPutObjProcessor::complete
- RGWPutObjProcessor_Multipart::do_complete
- RGWPutObj_Compress::prepare
相比于一次上传完成的Atomic对象,Multipart对象会被分成多个part进行上传,甚至可能跨越一段非常长的时间。所以对其的处理略有不同,有关具体Multipart的运作方式可以参考:Multipart Upload
对于Multipart类型对象的上传过程,数据写入过程完全一样,不同的在于processor->prepare(store, NULL)
函数的前期准备工作不同,prepare阶段主要做的就是确定写入目标,根据bucket、oid等信息构造存入rados的对象的名称前缀。
Atomic类型一次直接将一rgw对象写入完成,而Multipart类型的对象上传只是上传对应rgw对象的一个part,所以其命名方式,周边元数据会有不同,这些在prepare阶段做不同处理。
而之后的数据上传过程完全一致,Multipart甚至没有实现handle_data
和throttle_data
两个函数,直接继承了Atomic类。
有一点要说下,Multipart类型对象的的每个part上传都基于当前的compression策略,所以可能出现多个part使用不同的compression算法的情况。这会在用户上传完所有parts之后,发送RGWCompleteMultipart
请求时进行处理,此处不做展开。
另一点不同存在于最后的收尾工作processor->complete
,首先,等待所有异步写的完成
,然后向rgw对象对应的head对象xattr中写入attrs。
除此之外,Multipart对象还需要创建一个名为RGWUploadPartInfo
的类对象,里面封装了该part对象的信息,然后存入到一个multipart_meta_obj
对象的omap中。
至于Multipart对象的完成步骤,不属于put obj的一部分,是额外一条请求,不在此展开。
RGWPutObjProcessor_Multipart::do_complete
int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size,
const string& etag,
real_time *mtime, real_time set_mtime,
map<string, bufferlist>& attrs,
real_time delete_at,
const char *if_match,
const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
{
// 等待异步写完成
complete_writing_data();
// 向head对象的xattr中写入attrs
RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
op_target.set_versioning_disabled(true);
RGWRados::Object::Write head_obj_op(&op_target);
head_obj_op.meta.set_mtime = set_mtime;
head_obj_op.meta.mtime = mtime;
head_obj_op.meta.owner = s->owner.get_id();
head_obj_op.meta.delete_at = delete_at;
head_obj_op.meta.zones_trace = zones_trace;
head_obj_op.meta.modify_tail = true;
int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0)
return r;
// 将该对象的信息封装到RGWUploadPartInfo对象
// 然后将该对象encode并写入到multipart_meta_obj对象的omap中
bufferlist bl;
RGWUploadPartInfo info;
string p = "part.";
bool sorted_omap = is_v2_upload_id(upload_id);
// 准备omap的key
if (sorted_omap) {
string err;
int part_num_int = strict_strtol(part_num.c_str(), 10, &err);
if (!err.empty()) {
dout(10) << "bad part number specified: " << part_num << dendl;
return -EINVAL;
}
char buf[32];
snprintf(buf, sizeof(buf), "%08d", part_num_int);
p.append(buf);
} else {
p.append(part_num);
}
info.num = atoi(part_num.c_str());
info.etag = etag;
info.size = obj_len;
info.accounted_size = accounted_size;
info.modified = real_clock::now();
info.manifest = manifest;
// 从attrs从拿到该对象的压缩信息,也存入info
bool compressed;
r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
if (r < 0) {
dout(1) << "cannot get compression info" << dendl;
return r;
}
::encode(info, bl);
string multipart_meta_obj = mp.get_meta();
rgw_obj meta_obj;
meta_obj.init_ns(bucket, multipart_meta_obj, mp_ns);
meta_obj.set_in_extra_data(true);
rgw_raw_obj raw_meta_obj;
store->obj_to_raw(s->bucket_info.placement_rule, meta_obj, &raw_meta_obj);
r = store->omap_set(raw_meta_obj, p, bl);
return r;
}
附录
A:RGWPutObjProcessor_Aio
这个类是对librados aio相关的操作做的一个简单的封装,在阅读put obj相关代码之前,对其系统性了解下,有助于阅读代码。
这个类维护了一个pending队列,表示当前处于aio写入过程中的obj,用pending_size
表示这个队列中所有在写数据的大小,而window_size
则被用于限制队列的大小,需要尽量满足pending_size
< window_size
。
当上层类要写入对象时,调用handle_obj_data
函数开启aio过程,并返回一个handle用于查询aio的状态。之后上层类需要调用throttle_data
函数,将之前开启aio过程的obj和对应的handle封装一下加入pending队列。并弹出pending队列中已经过时的对象(完成写入的),然后判断pending_size
和window_size
的大小,做对应的处理。
struct put_obj_aio_info {
void *handle;
rgw_raw_obj obj;
uint64_t size;
};
#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
{
// 当前正在进行的所有aio的状态信息,可以通过其中的handle成员判断aio是否完成
list<struct put_obj_aio_info> pending;
// 允许处于pending状态的最大数据限制,要尽量满足pending_size <= window_size
uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
// 当前处于异步写过程的总数据量
uint64_t pending_size{0};
// 弹出并返回pending中的第一个info,并从pending_size中减去弹出的info对应的size
struct put_obj_aio_info pop_pending();
// 弹出并等待pending中的第一个info对应的异步写完成
// 完成后,将写入完成的obj加入written_objs,返回
int wait_pending_front();
// 查询pending中的第一个info对应的异步写是否完成
// 背后调用了librados的wait_for_safe,safe的标准是数据已经写入到所有replicas
bool pending_has_completed();
// 最后一个完成的obj
rgw_raw_obj last_written_obj;
protected:
uint64_t obj_len{0};
// 已经完成异步写的objs
set<rgw_raw_obj> written_objs;
// 存储该rgw对象的head对象的信息
rgw_obj head_obj;
void add_written_obj(const rgw_raw_obj& obj) {
written_objs.insert(obj);
}
// 等待所有pending中的对象弹出并完成异步写,然后才返回
int drain_pending();
// 开始一次异步写,将bl中数据写到obj中偏移ofs的位置,并将phandle指向handle
int handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
int prepare(RGWRados *store, string *oid_rand) override;
// 将handle_obj_data返回的handle和对应的obj封装后存入pending队列
// 根据底层rados情况调整window_size
// 如果pending_size > window_size,调用一次wait_pending_front,减小pending_size
int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override;
RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
~RGWPutObjProcessor_Aio() override;
}; /* RGWPutObjProcessor_Aio */