def get_where_sql(fields_dict):
sub_sqls = []
for k, v in fields_dict.items():
if isinstance(v, str):
sub_sqls.append(f"{k}='{v}'")
elif isinstance(v, int):
sub_sqls.append(f"{k}={v}")
elif isinstance(v, list) and all(isinstance(item, str) for item in v):
sub_sqls.append(f"{k} in ({list_join_in_str(v, ',')})")
elif isinstance(v, list) and all(isinstance(item, int) for item in v):
sub_sqls.append(f"{k} in ({list_join_in_int(v, ',')})")
if sub_sqls:
return f"where {' and '.join(sub_sqls)} "
return ""
def get_and_sql(fields_dict):
sub_sqls = []
for k, v in fields_dict.items():
if isinstance(v, str):
sub_sqls.append(f"{k}='{v}'")
elif isinstance(v, int):
sub_sqls.append(f"{k}={v}")
elif isinstance(v, list) and all(isinstance(item, str) for item in v):
sub_sqls.append(f"{k} in ({list_join_in_str(v, ',')})")
elif isinstance(v, list) and all(isinstance(item, int) for item in v):
sub_sqls.append(f"{k} in ({list_join_in_int(v, ',')})")
if sub_sqls:
return f"and {' and '.join(sub_sqls)} "
return ""
@contextmanager
def get_cursor(dbcfg):
try:
conn = psycopg2.connect(database=dbcfg['database'], user=dbcfg['user'], password=dbcfg['password'],
host=dbcfg['host'], port=dbcfg['port'])
cursor = conn.cursor(cursor_factory=DictCursor)
yield cursor
conn.commit()
finally:
close_conn(cursor)
close_conn(conn)