首先python读取csv格式文件的包是csv,直接使用import csv
思想:
1.首先遍历存放csv文件的文件夹,然后拼接每一个文件的路径地址
2.然后读取csv文件,使用csv.reader(open(file_path,"r",encoding="utf-8"))方法
3.读取的结果是一个二维列表,该列表存放所有的行,每一行是一个列表,但需要注意的是
在处理csv文件的时候先把每一行打印出来,看看行的格式是否是我们需要的格式.
如果格式不匹配需要重新使用字符串和列表的方法转换为我们想要的格式.
下面的例子是我读取csv文件,然后存储到数据库的一个例子
for root,dirs,files in os.walk("./data"):
for file in files:
# ------------------组合路径并取出文件名-----------
file_path = os.path.join(root,file)
feed_name = re_compile.match(file).group("filename")
# ------------------读取csv文件----------------
csv_reader = csv.reader(io.open(file_path,"r",encoding="UTF-8"))
i = 0
pub_time_index = 0
title_index = 0
url_index = 0
source_index = 0
keywords_index = 0
summary_index = 0
try:
for row in csv_reader:
row = ",".join(row).split("\t")
if i == 0 :
# ---------------取数据在文件中的索引位置------------
try:
pub_time_index = row.index("Date")
title_index = row.index("Headline")
url_index = row.index("URL")
source_index = row.index("Source")
keywords_index = row.index("Keywords")
summary_index = row.index("Opening Text")
except Exception as e:
error_dict = {"pass_filename":str(feed_name)+".csv"}
error_dict = json.dumps(error_dict,ensure_ascii=False)
pass_file_list.append(error_dict)
logger_root.error("文件{}数据不全,自动跳过该文件的插入,请检查文件的参数".format(feed_name))
break
else:
# ----------------处理数据转换数据-----------
time_str = row[pub_time_index]
pub_time = convert_time(time_str)
title = row[title_index]
url = row[url_index]
source = row[source_index]
keywords = json.dumps(row[keywords_index].replace("\\n","").split(","),ensure_ascii=False)
summary = row[summary_index]
#------------------执行数据库插入---------------
sql_str = "insert into test_csv(pub_time,title,url,source,keywords,summary,feed_name) values (%s,%s,%s,%s,%s,%s,%s)"
sql_params = (pub_time,title,url,source,keywords,summary,feed_name)
print(sql_params)
res, err = mysql_conn.execute_write(sql_str=sql_str,sql_params=sql_params)
if res is None:
logger_root.error("执行sql语句{}插入的时候出现错误,错误的原因是{}".format(sql_str%sql_params,err))
row_info = {"filename":feed_name,"row_num":i,"error_account":err}
row_info= json.dumps(row_info,ensure_ascii=False)
insert_row_error_list.append(row_info)
logger_root.info("执行插入文件:{}的第{}行成功".format(feed_name,i))
i += 1
except Exception as e:
open_file_error_list.append(json.dumps({"filename":feed_name},ensure_ascii=False))
logger_root.error("执行文件{}打开失败,错误的原因是{}".format(feed_name,e))
continue