今天这篇文章简单介绍,图论在风控中的反查应用。
在风控的日常工作中,经常遇到团伙作案,他们拥有比较多的设备、手机号码、银行卡等。我们通过一些列的反查识别策略,将这些团伙尽可能的抓住。
举个例子,抓到一个欺诈店铺A,我们就可以查找这个设备上使用过的店铺B;通过B查到C,C查到D。。。直至遍历所有店铺。
上图就是这个graph,注意,这应该是个无向图。
在业务中,我们经常通过写一段比较长的代码,查到共用同一设备的店铺,如下面的sql:
select distinct shop_id from pay_order_info d
where device_id in (
select distinct device_id from pay_order_info a
where shop_id = A
)
limit200
这个结果显示,只能通过A查到B,没法查到C和D。
要解决上面的问题,先利用networkx构建全图,然后从graph中查关联的nodes,示例如下(python):
import networkx as nx
G = nx.Graph()
nodes = ['A','B','C','D','E']
edges = [('A','B'),('B','C'),('C','D')]
G.add_nodes_from(nodes)
G.add_edges_from(edges)
def git_antinodes(shop_id):
list = []
try:
list.append(shop_id)
for i in list:
for l in G.neighbors(i):
if l not in list:
list.append(l)
return list
except Exception, e:
return "Not in the graph"
def git_antilinks(shop_id):
antilinks = []
try:
G.neighbors(shop_id)
antilinks =[e for e in G.edges_iter(git_antinodes(shop_id))]
return antilinks
except Exception, e:
return "Not in the graph"
上面定义了两个函数
git_antinodes:获取关联的店铺
git_antilinks:获取链路
git_antinodes['A']
输出结果:['A', 'B', 'C', 'D']
git_antilinks['A']
输出结果:[('A', 'B'), ('B', 'C'), ('C', 'D')]
好了,上面示例满足基本需求,下面就要把所有的nodes和edges存储下来
这期我们这边的应用在B端,所以一个node是指的一个店铺,edge是共用设备的关系。数据存储在mysql上,nodes的设计和存储比较简单,不在赘述,下面简单说下edges的存储。
左边的存储会有大量的重复,因为A-B之间无指向关系,为了节约存储空间,我这边采用右边的存储方案,edges的伪代码。
for linen in datanode:
shop_id = linen[0]
batchSelect ="""
select shop_id,count(distinct shop_id) from risk_cashstore_links_mid d
where device_id in (
select distinct device_id from risk_cashstore_links_mid a
where shop_id = %d
)
group by shop_id
"""% (shop_id)
data = execute(batchSelect)
datalink = fetchmany(data)
listlink = []
batchInsertlink ='''
INSERT INTO cashstore_links (src, target, weight)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
target = VALUES(target),
weight = VALUES(weight);
'''
src = shop_id
print "src:%d"% (src)
for linel in datalink:
if linel[0] <= src:
continue
target = linel[0]
weight = linel[1]
ell = []
ell.append(src)
ell.append(target)
ell.append(weight)
listlink.append(ell)
try:
currcSta.executemany(batchInsertlink,listlink)
connrcSta.commit()
exceptMySQLdb.Error,e:
printstr(datetime.datetime.now()) +" Mysql Error %d: %s"% (e.args[0],e.args[1])
connrcSta.rollback()
这部分是用pyspark完成的。每天的定时任务,全量更新。
上面第二段已经讲过获取相关联的订单,下面就实现一个接口。这边我写了个post的接口,具体代码不讲了,直接上结果。
curl -d 'shop_id=19736647' http://0.0.0.0:5063/cash/cashstoreantishop/list
输出:{"msg": "ok", "code": 0, "data": {"nodes": [19736647, 19737903, 19745957, 20173401], "edges": [[19736647, 19737903], [19737903, 19745957], [19745957, 20173401]]}}(hadoop_env)
反查工具一期,基本完成了,二期就是可视化了,先去学习echarts了,see u next month!