最近工作中遇到了一个问题:将mysql的数据同步到elasticsearch中,现在有很多方案, logstash-jdbc , elasticsearch-jdbc , go-mysql-elasticsearch,本来原来是使用logstash-jdbc的,但是由于其配置文件是基于ruby语法的,导致遇到问题需要查很多资料,加上logstash调试困难(很可能是我用的姿势不对。。。),所以决定手动在elasticsearch中建表,然后写脚本定期更新数据,那么问题就来了:第一次插入需要一次性插入以前的所有数据,以前使用pymysql时用的都是DictCursor游标,原理是一次性讲数据加载到内存中,但是现有表中有几张有数千万行,几个G大小,一次性读到内存中很不明智,google后,发现一篇blog不错,翻译共享一下(渣英语,不要笑,原文链接在文末)。
当使用sql查询的结果有非常多行时,如果使用默认的cursor,你的程序在接受数据的的时候很可能卡住或者被杀死,原因是mysql客户端(Java,Pyhton)默认在内存里缓存下所有行然后再处理,如果内存溢出后,你的程序就会被杀死。
解决方式是实用流式游标,在Python中,你可以使用pymysql.cursors.SSCursor(或者SSDictCursor)来解决这个问题
import pymysql
conn = pymysql.connect(...)
cursor = pymysql.cursors.SSCursor(conn)
cursor.execute(...)
while True:
row = cursor.fetchone()
if not row:
break
...
这里有两点需要注意下:
- 使用pymysql.cursors.SSCursor代替默认的cursor。可以使用以上代码,或者这样写:conn.cursor(pymysql.cursors.SSCursor)
- 使用fetchone去每次只获得一行,别使用fetchall。也可以使用fetchmay,但是这样其实是多次调用fetchone。
对于SSCursor有一个错误的理解,就是SSCursor是服务端一次性读出所有数据然后一条一条返给客户端,其实不是这样的,这个cursor实际上没有缓存下来任何数据,它不会读取所有所有到内存中,它的做法是从储存块中读取记录,并且一条一条返回给你。这里有一个更适合的名字:流式游标。
因为SSCursor是没有缓存的游标,这里有几条约束:
- 这个connection只能读完所有行之后才能处理其他sql。如果你需要并行执行sql,在另外一个connection中执行,否则你会遇到 error 2014 , "Commands out of sync; you can't run this command now."
- 必须一次性读完所有行,每次读取后处理数据要快,不能超过60s,否则mysql将会断开这次连接( error2013 , “Lost connection to MySQL server during query),也可以修改 SET NET_WRITE_TIMEOUT = xx 来增加超时间隔。
参考:Techualization: Retrieving million of rows from MySQL(原文更加详细)