使用Apache Arrow和标准库做不规则的CSV文件对比

不规则CSV文件有两列[bdp.csv],第一列是std::string,第二列是一个json字符串["content"列],格式如下,

   {
       "Statement": [
            {"Resource": ["xxxx", "xxxx", "xxx"]  }
      ]
   }

另一个文件是一个规则的csv文件。
工程目录如下,


image.png

CMakeLists.txt文件如下,

cmake_minimum_required(VERSION 2.6)

if(APPLE)
    message(STATUS "This is Apple, do nothing.")
    set(CMAKE_MACOSX_RPATH 1)
    set(CMAKE_PREFIX_PATH /Users/aabjfzhu/software/vcpkg/ports/cppwork/vcpkg_installed/x64-osx/share )
elseif(UNIX)
    message(STATUS "This is linux, set CMAKE_PREFIX_PATH.")
    set(CMAKE_PREFIX_PATH /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/share)
endif(APPLE)

project(bdp_dbr)

set(CMAKE_CXX_STANDARD 17)

add_definitions(-g)

find_package(ZLIB)

find_package(OpenCV REQUIRED )
find_package(Arrow CONFIG REQUIRED)

find_package(unofficial-brotli REQUIRED)
find_package(unofficial-utf8proc CONFIG REQUIRED)
find_package(Thrift CONFIG REQUIRED)

find_package(glog REQUIRED)

find_package(OpenSSL REQUIRED)

find_package(Boost REQUIRED COMPONENTS
    system
    filesystem
    serialization
    program_options
    thread
    )

find_package(DataFrame REQUIRED)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set INCLUDE_DIRS")
set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include /usr/local/iODBC/include /opt/snowflake/snowflakeodbc/include/ ${CMAKE_CURRENT_SOURCE_DIR}/../include/ ${CMAKE_CURRENT_SOURCE_DIR}/../../../include)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set INCLUDE_DIRS")
    set(INCLUDE_DIRS ${Boost_INCLUDE_DIRS} /usr/local/include ${CMAKE_CURRENT_SOURCE_DIR}/../include/   ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/)
endif(APPLE)


if(APPLE)
    MESSAGE(STATUS "This is APPLE, set LINK_DIRS")
    set(LINK_DIRS /usr/local/lib /usr/local/iODBC/lib /opt/snowflake/snowflakeodbc/lib/universal)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(LINK_DIRS ${Boost_INCLUDE_DIRS} /usr/local/lib /vcpkg/ports/cppwork/vcpkg_installed/x64-linux/lib)
endif(APPLE)

if(APPLE)
    MESSAGE(STATUS "This is APPLE, set ODBC_LIBS")
    set(ODBC_LIBS iodbc iodbcinst)
elseif(UNIX)
    MESSAGE(STATUS "This is linux, set LINK_DIRS")
    set(ODBC_LIBS odbc odbcinst ltdl)
endif(APPLE)

include_directories(${INCLUDE_DIRS})
LINK_DIRECTORIES(${LINK_DIRS})

file( GLOB test_file_list ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) 

file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/../impl/utils/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../include/utils/*.h ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/arr_/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/http/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/yaml/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/df/impl/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../../include/death_handler/impl/*.cpp)

add_library(${PROJECT_NAME}_lib SHARED ${APP_SOURCES} ${test_file})
target_link_libraries(${PROJECT_NAME}_lib ${Boost_LIBRARIES} ZLIB::ZLIB glog::glog DataFrame::DataFrame ${OpenCV_LIBS})
target_link_libraries(${PROJECT_NAME}_lib OpenSSL::SSL OpenSSL::Crypto libgtest.a pystring libyaml-cpp.a libgmock.a ${ODBC_LIBS} libnanodbc.a pthread dl backtrace libzstd.a libbz2.a libsnappy.a re2::re2 parquet lz4 unofficial::brotli::brotlidec-static unofficial::brotli::brotlienc-static unofficial::brotli::brotlicommon-static utf8proc thrift::thrift  arrow arrow_dataset)

foreach( test_file ${test_file_list} )
    file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${test_file})
    string(REPLACE ".cpp" "" file ${filename})
    add_executable(${file}  ${test_file})
    target_link_libraries(${file} ${PROJECT_NAME}_lib)
endforeach( test_file ${test_file_list})

include/utils/d_op.h

#ifndef _FREDRIC_D_OP_H_
#define _FREDRIC_D_OP_H_
#include <vector>
#include <string>

struct d_op {
    static std::vector<std::string> get_bdp_bucket_list();
    static std::vector<std::string> get_databricks_bucket_list();
    static std::vector<std::string> get_missing_bucket_list(std::vector<std::string> const& bdp_final, std::vector<std::string> const& db_final);
    static std::vector<std::string> fx_missing_results(std::vector<std::string> const& missing_results);

    static bool write_to_csv(std::vector<std::string> const& missing_final);
};

#endif

impl/utils/d_op.cpp

#include "utils/d_op.h"
#include "utils/bdp_dbr_cfg.h"

#include "glog/logging.h"

#include "json/json.hpp"
#include "df/df.h"
#include "arr_/arr_.h"
#include "pystring/pystring.h"

#include <algorithm>
#include <fstream>
#include <set>


using json = nlohmann::json;

std::vector<std::string> d_op::get_bdp_bucket_list() {
    std::shared_ptr<arrow::Table> tb{};
    ArrowUtil::read_csv(bdp_csv_path.c_str(), tb);
    auto const& tb_ = *tb;  
    auto const& policy_conts_chunk_arr = tb_.GetColumnByName("content");
    auto const& policies_js = ArrowUtil::chunked_array_to_str_vector(policy_conts_chunk_arr);
    
    std::vector<std::string> policies{};
    for(auto const& policy_js: policies_js) {
        auto const& policy_ob = json::parse(policy_js);
        auto const& stats = policy_ob["Statement"];
        for(auto const& stat: stats) {
            auto eles = stat["Resource"];
            for(auto ele: eles) {
                policies.emplace_back(ele.get<std::string>());
            }
        }
    }
    return policies;
}

std::vector<std::string> d_op::get_databricks_bucket_list() {
    std::vector<std::string> res_policies{};
    std::shared_ptr<arrow::Table> tb{};
    ArrowUtil::read_csv(db_csv_path.c_str(), tb);
    auto const& tb_ = *tb;  
    auto const& db_bulkets_chunk_arr = tb_.GetColumnByName("S3 bucket list");  
    auto policies_js = ArrowUtil::chunked_array_to_str_vector(db_bulkets_chunk_arr);
    for(auto const& policy: policies_js) {
        if(policy != "") {
            res_policies.emplace_back(std::move(policy));
        }
    }
    return res_policies;
}

std::vector<std::string> d_op::get_missing_bucket_list(std::vector<std::string> const& bdp_final, std::vector<std::string> const& db_final) {
    std::vector<std::string> missing_buckets;
    for(auto const& bdp_bucket: bdp_final) {
        bool is_in_db {false};
        for(auto const& db_bucket: db_final) {
            if(bdp_bucket.find(db_bucket)!=std::string::npos) {
                is_in_db = true;
                break;
            }
        }
        if(!is_in_db) {
            missing_buckets.emplace_back(bdp_bucket);
        }
    }
    return missing_buckets;
}

std::vector<std::string> d_op::fx_missing_results(std::vector<std::string> const& missing_results) {
    // std::set insert自动去重
    std::set<std::string> final_missing_results;
    for(auto const& missing_bucket: missing_results) {
        auto tmp = pystring::replace(missing_bucket, "arn:aws:s3:::", "");
        if(tmp.find("/*") == tmp.size()-2) {
            tmp = tmp.substr(0, tmp.size()-2);    
        }
        final_missing_results.insert(tmp);
    }

    std::cout << "SIZE: " << final_missing_results.size() << "\n";
    std::vector<std::string> results{};
    results.resize(final_missing_results.size());

    std::transform(final_missing_results.begin(), final_missing_results.end(), results.begin(), [](auto const& str){
        return str;
    });
    return results;
}

bool d_op::write_to_csv(std::vector<std::string> const& missing_final) {
     CLDataFrame df;
    std::vector<unsigned long> idxes{};
    idxes.reserve(missing_final.size());
    for(unsigned long i=1; i<=missing_final.size(); ++i) {
        idxes.emplace_back(std::move(i));
    }
    
    df.load_data(std::move(idxes),
        std::make_pair("missing_bulkets", std::move(missing_final)));

    std::fstream fs {result_csv_path, std::ios::out | std::ios::trunc};
    if(!fs.is_open()) {
        LOG(ERROR) << "Open file failed" << "\n";
        return false;
    }
    
    df.write<std::ostream, std::string>(fs, hmdf::io_format::csv2, true);
    fs.close();
    return true;
}

include/utils/bdp_dbr_cfg.h

#ifndef _FREDRIC_BDP_DBR_CFG_H_
#define _FREDRIC_BDP_DBR_CFG_H_

#include <string>

extern std::string bdp_csv_path;

extern std::string db_csv_path;

extern std::string result_csv_path;
#endif

impl/utils/bdp_dbr_cfg.cpp

#include "utils/bdp_dbr_cfg.h"

std::string bdp_csv_path = "../datas/bdp.csv";

std::string db_csv_path = "../datas/databricks.csv";

std::string result_csv_path = "../datas/missing.csv";

test/bdp_dbr_test.cpp


#include "death_handler/death_handler.h"
#include "json/json.hpp"
#include <glog/logging.h>

#include <gtest/gtest.h>
#include "df/df.h"

#include "utils/d_op.h"


using json = nlohmann::json;

int main(int argc, char** argv) {
    FLAGS_log_dir = "./";
    FLAGS_alsologtostderr = true;
    // 日志级别 INFO, WARNING, ERROR, FATAL 的值分别为0、1、2、3
    FLAGS_minloglevel = 0;

    Debug::DeathHandler dh;

    google::InitGoogleLogging("./logs.log");
    testing::InitGoogleTest(&argc, argv);
    int ret = RUN_ALL_TESTS();
    return ret;
}

GTEST_TEST(BDPDbrTests, GetDbrMissingBulkets) {
    auto bdp_bucket_ls = d_op::get_bdp_bucket_list();
    auto db_bucket_ls = d_op::get_databricks_bucket_list();
    auto missing_bucket_ls = d_op::get_missing_bucket_list(bdp_bucket_ls, db_bucket_ls);
    std::cout << "MISSING: " << missing_bucket_ls.size() << "\n";

    auto missing_final = d_op::fx_missing_results(missing_bucket_ls);
    
    auto write_res = d_op::write_to_csv(missing_final);
    ASSERT_TRUE(write_res);
}

程序会输出一个对比完成的csv文件。
会输出Dbr比BDP少多少数据。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容