不规则CSV文件有两列[bdp.csv],第一列是std::string,第二列是一个json字符串["content"列],格式如下,
{
"Statement": [
{"Resource": ["xxxx", "xxxx", "xxx"] }
]
}
另一个文件是一个规则的csv文件。
工程目录如下,
image.png
CMakeLists.txt文件如下,
cmake_minimum_required(VERSION 2.6)
if(APPLE)
message(STATUS "This is Apple, do nothing.")
set(CMAKE_MACOSX_RPATH 1)
set(CMAKE_PREFIX_PATH /Users/aabjfzhu/software/vcpkg/ports/cppwork/vcpkg_installed/x64-osx/share )
elseif(UNIX)
message(STATUS "This is linux, set CMAKE_PREFIX_PATH.")
set(CMAKE_PREFIX_PATH /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/share)
endif(APPLE)
project(bdp_dbr)
set(CMAKE_CXX_STANDARD 17)
add_definitions(-g)
find_package(ZLIB)
find_package(OpenCV REQUIRED )
find_package(Arrow CONFIG REQUIRED)
find_package(unofficial-brotli REQUIRED)
find_package(unofficial-utf8proc CONFIG REQUIRED)
find_package(Thrift CONFIG REQUIRED)
find_package(glog REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS
system
filesystem
serialization
program_options
thread
)
find_package(DataFrame REQUIRED)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/)
endif(APPLE)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set LINK_DIRS")
set(LINK_DIRS /usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set LINK_DIRS")
set(LINK_DIRS ${Boost_INCLUDE_DIRS} /usr/local/lib /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/lib)
endif(APPLE)
if(APPLE)
MESSAGE(STATUS "This is APPLE, set ODBC_LIBS")
set(ODBC_LIBS iodbc iodbcinst)
elseif(UNIX)
MESSAGE(STATUS "This is linux, set LINK_DIRS")
set(ODBC_LIBS odbc odbcinst ltdl)
endif(APPLE)
include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})
file( GLOB test_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../impl/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../include/utils/*.h ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/arr_/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/http/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/yaml/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/df/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/death_handler/impl/*.cpp)
add_library(${PROJECT_NAME}_lib SHARED ${APP_SOURCES} ${test_file})
target_link_libraries(${PROJECT_NAME}_lib ${Boost_LIBRARIES} ZLIB::ZLIB glog::glog DataFrame::DataFrame ${OpenCV_LIBS})
target_link_libraries(${PROJECT_NAME}_lib OpenSSL::SSL OpenSSL::Crypto libgtest.a pystring libyaml-cpp.a libgmock.a ${ODBC_LIBS} libnanodbc.a pthread dl backtrace libzstd.a libbz2.a libsnappy.a re2::re2 parquet lz4 unofficial::brotli::brotlidec-static unofficial::brotli::brotlienc-static unofficial::brotli::brotlicommon-static utf8proc thrift::thrift arrow arrow_dataset)
foreach( test_file ${test_file_list} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${test_file})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${test_file})
target_link_libraries(${file} ${PROJECT_NAME}_lib)
endforeach( test_file ${test_file_list})
include/utils/d_op.h
#ifndef _FREDRIC_D_OP_H_
#define _FREDRIC_D_OP_H_
#include <vector>
#include <string>
struct d_op {
static std::vector<std::string> get_bdp_bucket_list();
static std::vector<std::string> get_databricks_bucket_list();
static std::vector<std::string> get_missing_bucket_list(std::vector<std::string> const& bdp_final, std::vector<std::string> const& db_final);
static std::vector<std::string> fx_missing_results(std::vector<std::string> const& missing_results);
static bool write_to_csv(std::vector<std::string> const& missing_final);
};
#endif
impl/utils/d_op.cpp
#include "utils/d_op.h"
#include "utils/bdp_dbr_cfg.h"
#include "glog/logging.h"
#include "json/json.hpp"
#include "df/df.h"
#include "arr_/arr_.h"
#include "pystring/pystring.h"
#include <algorithm>
#include <fstream>
#include <set>
using json = nlohmann::json;
std::vector<std::string> d_op::get_bdp_bucket_list() {
std::shared_ptr<arrow::Table> tb{};
ArrowUtil::read_csv(bdp_csv_path.c_str(), tb);
auto const& tb_ = *tb;
auto const& policy_conts_chunk_arr = tb_.GetColumnByName("content");
auto const& policies_js = ArrowUtil::chunked_array_to_str_vector(policy_conts_chunk_arr);
std::vector<std::string> policies{};
for(auto const& policy_js: policies_js) {
auto const& policy_ob = json::parse(policy_js);
auto const& stats = policy_ob["Statement"];
for(auto const& stat: stats) {
auto eles = stat["Resource"];
for(auto ele: eles) {
policies.emplace_back(ele.get<std::string>());
}
}
}
return policies;
}
std::vector<std::string> d_op::get_databricks_bucket_list() {
std::vector<std::string> res_policies{};
std::shared_ptr<arrow::Table> tb{};
ArrowUtil::read_csv(db_csv_path.c_str(), tb);
auto const& tb_ = *tb;
auto const& db_bulkets_chunk_arr = tb_.GetColumnByName("S3 bucket list");
auto policies_js = ArrowUtil::chunked_array_to_str_vector(db_bulkets_chunk_arr);
for(auto const& policy: policies_js) {
if(policy != "") {
res_policies.emplace_back(std::move(policy));
}
}
return res_policies;
}
std::vector<std::string> d_op::get_missing_bucket_list(std::vector<std::string> const& bdp_final, std::vector<std::string> const& db_final) {
std::vector<std::string> missing_buckets;
for(auto const& bdp_bucket: bdp_final) {
bool is_in_db {false};
for(auto const& db_bucket: db_final) {
if(bdp_bucket.find(db_bucket)!=std::string::npos) {
is_in_db = true;
break;
}
}
if(!is_in_db) {
missing_buckets.emplace_back(bdp_bucket);
}
}
return missing_buckets;
}
std::vector<std::string> d_op::fx_missing_results(std::vector<std::string> const& missing_results) {
// std::set insert自动去重
std::set<std::string> final_missing_results;
for(auto const& missing_bucket: missing_results) {
auto tmp = pystring::replace(missing_bucket, "arn:aws:s3:::", "");
if(tmp.find("/*") == tmp.size()-2) {
tmp = tmp.substr(0, tmp.size()-2);
}
final_missing_results.insert(tmp);
}
std::cout << "SIZE: " << final_missing_results.size() << "\n";
std::vector<std::string> results{};
results.resize(final_missing_results.size());
std::transform(final_missing_results.begin(), final_missing_results.end(), results.begin(), [](auto const& str){
return str;
});
return results;
}
bool d_op::write_to_csv(std::vector<std::string> const& missing_final) {
CLDataFrame df;
std::vector<unsigned long> idxes{};
idxes.reserve(missing_final.size());
for(unsigned long i=1; i<=missing_final.size(); ++i) {
idxes.emplace_back(std::move(i));
}
df.load_data(std::move(idxes),
std::make_pair("missing_bulkets", std::move(missing_final)));
std::fstream fs {result_csv_path, std::ios::out | std::ios::trunc};
if(!fs.is_open()) {
LOG(ERROR) << "Open file failed" << "\n";
return false;
}
df.write<std::ostream, std::string>(fs, hmdf::io_format::csv2, true);
fs.close();
return true;
}
include/utils/bdp_dbr_cfg.h
#ifndef _FREDRIC_BDP_DBR_CFG_H_
#define _FREDRIC_BDP_DBR_CFG_H_
#include <string>
extern std::string bdp_csv_path;
extern std::string db_csv_path;
extern std::string result_csv_path;
#endif
impl/utils/bdp_dbr_cfg.cpp
#include "utils/bdp_dbr_cfg.h"
std::string bdp_csv_path = "../datas/bdp.csv";
std::string db_csv_path = "../datas/databricks.csv";
std::string result_csv_path = "../datas/missing.csv";
test/bdp_dbr_test.cpp
#include "death_handler/death_handler.h"
#include "json/json.hpp"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "df/df.h"
#include "utils/d_op.h"
using json = nlohmann::json;
int main(int argc, char** argv) {
FLAGS_log_dir = "./";
FLAGS_alsologtostderr = true;
// 日志级别 INFO, WARNING, ERROR, FATAL 的值分别为0、1、2、3
FLAGS_minloglevel = 0;
Debug::DeathHandler dh;
google::InitGoogleLogging("./logs.log");
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
return ret;
}
GTEST_TEST(BDPDbrTests, GetDbrMissingBulkets) {
auto bdp_bucket_ls = d_op::get_bdp_bucket_list();
auto db_bucket_ls = d_op::get_databricks_bucket_list();
auto missing_bucket_ls = d_op::get_missing_bucket_list(bdp_bucket_ls, db_bucket_ls);
std::cout << "MISSING: " << missing_bucket_ls.size() << "\n";
auto missing_final = d_op::fx_missing_results(missing_bucket_ls);
auto write_res = d_op::write_to_csv(missing_final);
ASSERT_TRUE(write_res);
}
程序会输出一个对比完成的csv文件。
会输出Dbr比BDP少多少数据。