with_columns
log_df = log_df.with_columns(pl.struct(pl.all()).map_elements(lambda row: gen_correct_sql(row, rule_id_correction_dict),
return_dtype=pl.String).alias("operate_sql"))
with_columns
def udf_concurrency():
def udf2(a: int) -> str:
time.sleep(1)
return f"{a}_{a}"
def udf_batch(series) -> str:
print("xx" * 100, len(series))
res = []
for a in series:
time.sleep(1)
res.append(f"{a}_{a}")
return pl.Series(res)
conn.create_function("udf2", udf2,
null_handling="special", exception_handling="return_null")
a = [{"a": 1, "b": 1, "c": 'c1', "d": 'd1'},
{"a": 1, "b": 1, "c": 'c2', "d": 'd2'},
{"a": 2, "b": 2, "c": 'c3', "d": 'd3'},
{"a": 2, "b": 2, "c": 'c4', "d": 'd4'},
{"a": 3, "b": 3, "c": 'c5', "d": 'd5'}]
df = pl.from_dicts(a)
start = time.time()
conn.execute("PRAGMA threads=4;")
conn.sql("select a,udf2(b) from df").show()
print(f"{time.time() - start}")
start = time.time()
for i in a:
udf2(i)
print(f"{time.time() - start}")
start = time.time()
df2 = df.with_columns(
pl.col("b").map_elements(udf2).alias("bb")
)
print(df2.head())
print(f"{time.time() - start}")
start = time.time()
df2 = df.with_columns(
pl.col("b").map_batches(udf_batch).alias("bb")
)
print(df2.head())
print(f"{time.time() - start}")
窗口函数
In [67]: df.select(pl.col('names','id','distance').sort_by("distance", descendin
...: g=False).over('id'))
Out[67]:
shape: (5, 3)
┌───────┬─────┬──────────┐
│ names ┆ id ┆ distance │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 │
╞═══════╪═════╪══════════╡
│ ham ┆ A ┆ 0.074971 │
│ foo ┆ A ┆ 0.664746 │
│ cc ┆ B ┆ 0.151077 │
│ egg ┆ C ┆ 0.355057 │
│ spam ┆ B ┆ 0.308573 │
└───────┴─────┴──────────┘
select
In [199]: out = filtered.select(
...: pl.col(["Name",'Type 1','Speed']).sort_by("Speed", descending=True
...: ).over("Type 1",
...: mapping_strategy = 'group_to_rows'),
...: pl.col('Name').cumcount().sort_by("Speed", descending=False).over("
...: Type 1", mapping_strategy = 'group_to_rows')
...: .alias('rn')).sort('Type 1',descending=False)
...: print(out)
shape: (7, 4)
┌─────────────────────┬────────┬───────┬─────┐
│ Name ┆ Type 1 ┆ Speed ┆ rn │
│ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 ┆ u32 │
╞═════════════════════╪════════╪═══════╪═════╡
│ Exeggutor ┆ Grass ┆ 55 ┆ 0 │
│ Exeggcute ┆ Grass ┆ 40 ┆ 1 │
│ Jynx ┆ Ice ┆ 95 ┆ 0 │
│ Starmie ┆ Water ┆ 115 ┆ 0 │
│ Slowbro ┆ Water ┆ 30 ┆ 1 │
│ SlowbroMega Slowbro ┆ Water ┆ 30 ┆ 2 │
│ Slowpoke ┆ Water ┆ 15 ┆ 3 │
└─────────────────────┴────────┴───────┴─────┘
group_by
In [84]: df.group_by('id').agg(pl.col('names').sort_by('distance',descending=Fals
...: e),pl.col('distance').sort_by('distance',descending=False))
Out[84]:
shape: (3, 3)
┌─────┬────────────────┬──────────────────────┐
│ id ┆ names ┆ distance │
│ --- ┆ --- ┆ --- │
│ str ┆ list[str] ┆ list[f64] │
╞═════╪════════════════╪══════════════════════╡
│ B ┆ ["cc", "spam"] ┆ [0.151077, 0.308573] │
│ C ┆ ["egg"] ┆ [0.355057] │
│ A ┆ ["ham", "foo"] ┆ [0.074971, 0.664746] │
└─────┴────────────────┴──────────────────────┘
In [77]: df.sort("distance",descending=False).groupby("id").agg(pl.col('names')
...: ,pl.col('distance'))
Out[77]:
shape: (3, 3)
┌─────┬────────────────┬──────────────────────┐
│ id ┆ names ┆ distance │
│ --- ┆ --- ┆ --- │
│ str ┆ list[str] ┆ list[f64] │
╞═════╪════════════════╪══════════════════════╡
│ B ┆ ["cc", "spam"] ┆ [0.151077, 0.308573] │
│ A ┆ ["ham", "foo"] ┆ [0.074971, 0.664746] │
│ C ┆ ["egg"] ┆ [0.355057] │
└─────┴────────────────┴──────────────────────┘
drop rename replace_strict
logs_df = logs_df.drop(['geom_wkt', 'check_field_val'])
wk_mod_flag_map = {0: "未操作", 1: "需要修改", 2: "误报", 3: "暂不修改"}
logs_df = logs_df.with_columns(pl.col("wk_mod_flag").replace_strict(wk_mod_flag_map).alias("wk_mod_flag"))
df_renamed = logs_df.rename(
{"check_id": "检查id", "task_id": "检查任务id", "mesh_id": "图幅", "rule_id": "检查项id",
"rule_logic": "检查逻辑", "log_msg": "错误描述", "log_level": "错误等级", "error_type": "错误类型",
"check_layer": "检查图层", "id_col_val": "要素id", "create_time": "创建时间", "update_time": "更新时间",
"wk_mod_flag": "标记结果", "wk_operator": "作业员", "wk_mod_msg": "作业员备注信息",
"data_task_id": "pms任务id"})