polars

with_columns
    log_df = log_df.with_columns(pl.struct(pl.all()).map_elements(lambda row: gen_correct_sql(row, rule_id_correction_dict),
                                                                  return_dtype=pl.String).alias("operate_sql"))
with_columns

def udf_concurrency():
    def udf2(a: int) -> str:
        time.sleep(1)
        return f"{a}_{a}"

    def udf_batch(series) -> str:
        print("xx" * 100, len(series))
        res = []
        for a in series:
            time.sleep(1)
            res.append(f"{a}_{a}")
        return pl.Series(res)

    conn.create_function("udf2", udf2,
                         null_handling="special", exception_handling="return_null")
    a = [{"a": 1, "b": 1, "c": 'c1', "d": 'd1'},
         {"a": 1, "b": 1, "c": 'c2', "d": 'd2'},
         {"a": 2, "b": 2, "c": 'c3', "d": 'd3'},
         {"a": 2, "b": 2, "c": 'c4', "d": 'd4'},
         {"a": 3, "b": 3, "c": 'c5', "d": 'd5'}]

    df = pl.from_dicts(a)
    start = time.time()
    conn.execute("PRAGMA threads=4;")
    conn.sql("select a,udf2(b) from df").show()
    print(f"{time.time() - start}")
    start = time.time()
    for i in a:
        udf2(i)
    print(f"{time.time() - start}")
    start = time.time()
    df2 = df.with_columns(
        pl.col("b").map_elements(udf2).alias("bb")
    )
    print(df2.head())
    print(f"{time.time() - start}")
    start = time.time()
    df2 = df.with_columns(
        pl.col("b").map_batches(udf_batch).alias("bb")
    )
    print(df2.head())
    print(f"{time.time() - start}")

窗口函数
In [67]: df.select(pl.col('names','id','distance').sort_by("distance", descendin
    ...: g=False).over('id'))
Out[67]: 
shape: (5, 3)
┌───────┬─────┬──────────┐
│ names ┆ id  ┆ distance │
│ ---   ┆ --- ┆ ---      │
│ str   ┆ str ┆ f64      │
╞═══════╪═════╪══════════╡
│ ham   ┆ A   ┆ 0.074971 │
│ foo   ┆ A   ┆ 0.664746 │
│ cc    ┆ B   ┆ 0.151077 │
│ egg   ┆ C   ┆ 0.355057 │
│ spam  ┆ B   ┆ 0.308573 │
└───────┴─────┴──────────┘
select
In [199]: out = filtered.select(
     ...:     pl.col(["Name",'Type 1','Speed']).sort_by("Speed", descending=True
     ...: ).over("Type 1",
     ...:    mapping_strategy = 'group_to_rows'),
     ...:    pl.col('Name').cumcount().sort_by("Speed", descending=False).over("
     ...: Type 1",   mapping_strategy = 'group_to_rows')
     ...: .alias('rn')).sort('Type 1',descending=False)
     ...: print(out)
shape: (7, 4)
┌─────────────────────┬────────┬───────┬─────┐
│ Name                ┆ Type 1 ┆ Speed ┆ rn  │
│ ---                 ┆ ---    ┆ ---   ┆ --- │
│ str                 ┆ str    ┆ i64   ┆ u32 │
╞═════════════════════╪════════╪═══════╪═════╡
│ Exeggutor           ┆ Grass  ┆ 55    ┆ 0   │
│ Exeggcute           ┆ Grass  ┆ 40    ┆ 1   │
│ Jynx                ┆ Ice    ┆ 95    ┆ 0   │
│ Starmie             ┆ Water  ┆ 115   ┆ 0   │
│ Slowbro             ┆ Water  ┆ 30    ┆ 1   │
│ SlowbroMega Slowbro ┆ Water  ┆ 30    ┆ 2   │
│ Slowpoke            ┆ Water  ┆ 15    ┆ 3   │
└─────────────────────┴────────┴───────┴─────┘
group_by
In [84]: df.group_by('id').agg(pl.col('names').sort_by('distance',descending=Fals
    ...: e),pl.col('distance').sort_by('distance',descending=False))
Out[84]: 
shape: (3, 3)
┌─────┬────────────────┬──────────────────────┐
│ id  ┆ names          ┆ distance             │
│ --- ┆ ---            ┆ ---                  │
│ str ┆ list[str]      ┆ list[f64]            │
╞═════╪════════════════╪══════════════════════╡
│ B   ┆ ["cc", "spam"] ┆ [0.151077, 0.308573] │
│ C   ┆ ["egg"]        ┆ [0.355057]           │
│ A   ┆ ["ham", "foo"] ┆ [0.074971, 0.664746] │
└─────┴────────────────┴──────────────────────┘
In [77]:  df.sort("distance",descending=False).groupby("id").agg(pl.col('names')
    ...: ,pl.col('distance'))
Out[77]: 
shape: (3, 3)
┌─────┬────────────────┬──────────────────────┐
│ id  ┆ names          ┆ distance             │
│ --- ┆ ---            ┆ ---                  │
│ str ┆ list[str]      ┆ list[f64]            │
╞═════╪════════════════╪══════════════════════╡
│ B   ┆ ["cc", "spam"] ┆ [0.151077, 0.308573] │
│ A   ┆ ["ham", "foo"] ┆ [0.074971, 0.664746] │
│ C   ┆ ["egg"]        ┆ [0.355057]           │
└─────┴────────────────┴──────────────────────┘
drop rename replace_strict
    logs_df = logs_df.drop(['geom_wkt', 'check_field_val'])
    wk_mod_flag_map = {0: "未操作", 1: "需要修改", 2: "误报", 3: "暂不修改"}
    logs_df = logs_df.with_columns(pl.col("wk_mod_flag").replace_strict(wk_mod_flag_map).alias("wk_mod_flag"))
    df_renamed = logs_df.rename(
        {"check_id": "检查id", "task_id": "检查任务id", "mesh_id": "图幅", "rule_id": "检查项id",
         "rule_logic": "检查逻辑", "log_msg": "错误描述", "log_level": "错误等级", "error_type": "错误类型",
         "check_layer": "检查图层", "id_col_val": "要素id", "create_time": "创建时间", "update_time": "更新时间",
         "wk_mod_flag": "标记结果", "wk_operator": "作业员", "wk_mod_msg": "作业员备注信息",
         "data_task_id": "pms任务id"})
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容