开发之前
在之前的分享中,我们已经完成了公众号的基本框架的搭建,也完成了基于NLP知识的图文检索功能,可以说之前的内容都是原生开发,无论是公众号基础能力建设还是图文检索能力,本章分享,将会在之前的基础上,通过云服务商为我们提供的AI能力,将智能聊天接入其中。
首先假设一个场景:用户关注这个公众号之后,他给公众号发送文本消息,我们首先进行图文检索,如果没找到合适的结果,我们就默认进入“聊天功能”;如果用户发送了语音,我们同样先进行图文检索,如果没有找得到相似图文,则通过语音进入“聊天功能”,这样看来是不是整个功能变得非常有趣?
功能预览
开始开发
聊天功能增加
聊天功能我们可以借助云厂商提供的聊天机器人服务:
开通和使用这个服务,可以为我们创建一个简单的机器人:
创建完成机器人,我们可以通过云API对其进行代码的编写,云API代码比较难写也不怕,有API Explorer:
系统会为我们自动编写好基本的代码,我们只需要稍加修改,就可以复制到项目中:
在最外层进行相关初始化:
tbpClient = tbp_client.TbpClient(credential.Credential(secret_id, secret_key), region)
初始化完成,增加聊天机器人函数:
def chatBot(user, content):
'''
开发文档:https://cloud.tencent.com/document/product/1060/37438
:param user: 用户id
:param content: 聊天内容
:return: 返回机器人说的话,如果出现故障返回None
'''
try:
req = tbp_models.TextProcessRequest()
params = '{"BotId":"%s","BotEnv":"release","TerminalId":"%s","InputText":"%s"}' % (
bot_id, user, content
)
req.from_json_string(params)
resp = tbpClient.TextProcess(req)
return json.loads(resp.to_json_string())['ResponseMessage']['GroupList'][0]['Content']
except Exception as e:
print(e)
return None
文本转音频功能增加
同样的方法,这不过是使用的另一个产品:
同样通过Explorer编写代码,然后初始化:
ttsClient = tts_client.TtsClient(credential.Credential(secret_id, secret_key), region)
增加相关的方法实现文本到函数的转换:
def text2Voice(text):
'''
文档地址:https://cloud.tencent.com/document/product/1073/37995
:param text: 带转换的文本
:return: 返回转换后的文件地址
'''
try:
req = tts_models.TextToVoiceRequest()
params = '{"Text":"%s","SessionId":"%s","ModelType":1,"VoiceType":1002}' % (
text, "".join(random.sample('zyxwvutsrqponmlkjihgfedcba', 7)))
req.from_json_string(params)
resp = ttsClient.TextToVoice(req)
file = '/tmp/' + "".join(random.sample('zyxwvutsrqponmlkjihgfedcba', 7)) + ".wav"
with open(file, 'wb') as f:
f.write(base64.b64decode(json.loads(resp.to_json_string())["Audio"]))
return file
except Exception as e:
print(e)
return None
增加微信的素材相关逻辑
由于我的账号是未认证的订阅号,所以可以使用的功能有限。在这里我需要先将生成的语音素材上传到公众号后台作为永久素材。因为语音类素材最大量为1000个,所以我还要顺便删除多余的素材。
此处我的做法很简单,先上传素材,然后获得素材总数,接下来根据素材中的时间戳:
{'
media_id': 'HQOG98Gpaa4KcvU1L0MPEW4Zvngs4kBqOyTRzNWBNME',
'name': 'ljpmybc.wav',
'update_time': 1582896372,
'tags': []
}
就是update_time
这个参数,和现在的时间进行判断,超过60S则认为这个素材已经过期,就可以删除,这样保证我们的素材数量不会溢出:
增加永久素材:
def addingOtherPermanentAssets(file, fileType):
'''
文档地址:https://developers.weixin.qq.com/doc/offiaccount/Asset_Management/Adding_Permanent_Assets.html
返回结果:{
"media_id":"HQOG98Gpaa4KcvU1L0MPEcyy31LSuHhRi8gD3pvebhI",
"url":"http:\/\/mmbiz.qpic.cn\/sz_mmbiz_png\/icxY5TTGTBibSyZPfLAEZmeaicUczsoGUpqLgBlRbNxeic4R8r94j60BiaxDLEZTAK7I7qubG3Ik808P8jYLdFJTcOA\/0?wx_fmt=png",
"item":[]
}
:param file:
:return:
'''
typeDict = {
"voice": "wav"
}
url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token=%s&type=%s" % (
getAccessToken(), fileType)
boundary = '----WebKitFormBoundary7MA4YWxk%s' % "".join(random.sample('zyxwvutsrqponmlkjihgfedcba', 7))
with open(file, 'rb') as f:
fileData = f.read()
data = {'media': (os.path.split(file)[1], fileData, typeDict[fileType])}
headers = {
"Content-Type": "multipart/form-data; boundary=%s" % boundary,
"User-Agent": "okhttp/3.10.0"
}
reqAttr = urllib.request.Request(url=url,
data=encode_multipart_formdata(data, boundary=boundary)[0],
headers=headers)
responseData = json.loads(urllib.request.urlopen(reqAttr).read().decode("utf-8"))
try:
for eveVoice in getMaterialsList("voice", getTheTotalOfAllMaterials()['voice_count']):
try:
if int(time.time()) - int(eveVoice["update_time"]) > 60:
deletingPermanentAssets(eveVoice['media_id'])
except:
pass
except:
pass
return responseData['media_id'] if "media_id" in responseData else None
删除素材:
def deletingPermanentAssets(media_id):
'''
文档地址:https://developers.weixin.qq.com/doc/offiaccount/Asset_Management/Deleting_Permanent_Assets.html
:return:
'''
url = 'https://api.weixin.qq.com/cgi-bin/material/del_material?access_token=%s' % (getAccessToken())
data = {
"media_id": media_id
}
postData = json.dumps(data).encode("utf-8")
reqAttr = urllib.request.Request(url=url, data=postData)
print(urllib.request.urlopen(reqAttr).read())
至此,基础代码已经完成,剩下的逻辑就是在main_handler中进行组合:
文本消息部分的组合逻辑:
media_id = searchNews(event["Content"])
result = getNewsResult(media_id, event)
if not result:
chatBotResponse = chatBot(event["FromUserName"], event["Content"])
result = textXML({"msg": chatBotResponse if chatBotResponse else "目前还没有类似的文章被发布在这个公众号上"}, event)
return response(body=result)
语音消息部分组合逻辑:
media_id = searchNews(event["Recognition"])
result = getNewsResult(media_id, event)
if not result:
chatBotResponse = chatBot(event["FromUserName"], event["Recognition"])
if chatBotResponse:
voiceFile = text2Voice(chatBotResponse)
if voiceFile:
uploadResult = addingOtherPermanentAssets(voiceFile, 'voice')
if uploadResult:
result = voiceXML({"media_id": uploadResult}, event)
if not result:
result = textXML({"msg": "目前还没有类似的文章被发布在这个公众号上"}, event)
return response(body=result)
老规矩,今日份的代码同样更新到Github上,欢迎大家三连击: