作者:梁皓然
Xanthous Tech 创始人,前亚马逊全栈工程师。2016年回国创业,组建团队在全球范围内为大公司提供Chatbot咨询开发服务,应用RASA对话系统,并基于微信将Chatbot和MiniProgram进行了深度整合。
设想
一个聊天机器人(Chatbot)需要理解自然语言,并作出对应的回复。一个chatbot模块可以拆解成如下部分:
在开发者的世界里面,现在已经有不少开源的工具可以制作chatbot模块,各大云平台上也已经有各种各样的云服务来支持,对接到市面上的聊天平台上。在工作中,也 经常和Slack上面的机器人打交道,并且通过机器人在开发和运维流程里面做各种提醒和自动化。
现在各种各样的语音助手也开始出现在我们的身边,像小度和小爱,像Siri,还有Alexa和Google Home等设备。我还记得我买回来的第一个Amazon Echo, 尝试对着它说各种各样的话,看看怎么样回复,朋友也经常恶作剧,来到我家通过Echo给我在亚马逊下了各种各样的订单。手机上的Hey Siri和OK Google也非常 方便,尽管只是设一下闹钟或者是做一些功能。
作为一个开发者,和漫威电影的爱好者,我经常在想有没有办法做一个属于自己的语音助手,像钢铁侠电影里面的Jarvis和Friday一样。对于我来说,一个 voice chatbot可以拆解成下面的部分:
看起来,我只需要把每个部件连接起来,然后放到一个机器上面跑就可以了!但是想了一下,又想到了一个问题,这个语音助手需要像市面上的设备一样,需要唤醒。 如果没有唤醒步骤,一直做监听的话,对存储资源和网络连接的需求是非常大的。经过一番搜索之后,我找到了Snowboy。
Snowboy是kitt.ai制作的一个热词检测库 (Hotwords Detection Library)。通过训练热词之后,可以离线运行,并且 功耗很低,可以支持在树莓派等设备上运行。官方提供Python, Golang, NodeJS, iOS 和Android的wrapper可以整合到代码里面。
实践
于是我就拿出了尘封已久的树莓派,连上了麦克风和音箱,开始自己倒腾能不能做出来一个简单的能听懂我说话的小Jarvis。最近也入购了一个iPad Pro,所以 我准备直接通过iPad Pro连接树莓派进入ssh编程,顺便练一下vim,哈哈。
下面列举一下配置:
Board: NanoPi K1 Plus - 特别喜欢友善之臂的板子, 性价比高。这个板子有2G内存,有Wi-Fi + Ethernet(需要网线接口连接iPad),甚至带有板载麦克风。搭配的OS是UbuntuCore 16.04 LTS,可以通过apt安装绝大部分的依赖。
Microphone: Blue Snowball - 因为我主要在家办公,所以经常需要视频会议。 Blue的麦克风是USB连接的,在Linux下可以免驱直接使用。
根据上图Voice Chatbot的拆解,我决定把以下这几个服务连接起来测试一下完整流程:
Hotword Detection: Snowboy
Speech-to-Text: 科大讯飞语音听写
Chatbot: 图灵机器人
Text-to-Speech: 科大讯飞在线语音合成
机器启动之后安装nvm 用最新版的NodeJS v10 LTS。然后创建 package.json 并安装 snowboy nodejs wrapper:
npm init
npm install snowboy --save
需要详细读取文档安装所有Snowboy编译所需的依赖(TODO)。依赖安装完之后,我们参考一下Snowboy的sample代码:
// index.js
const record = require('node-record-lpcm16');
const Detector = require('snowboy').Detector;
const Models = require('snowboy').Models;
const models = new Models();
models.add({
file: 'resources/models/snowboy.umdl',
sensitivity: '0.5',
hotwords : 'snowboy'
});
const detector = new Detector({
resource: "resources/common.res",
models: models,
audioGain: 2.0,
applyFrontend: true
});
detector.on('silence', function () {
console.log('silence');
});
detector.on('sound', function (buffer) {
// <buffer> contains the last chunk of the audio that triggers the "sound"
// event. It could be written to a wav stream.
console.log('sound');
});
detector.on('error', function () {
console.log('error');
});
detector.on('hotword', function (index, hotword, buffer) {
// <buffer> contains the last chunk of the audio that triggers the "hotword"
// event. It could be written to a wav stream. You will have to use it
// together with the <buffer> in the "sound" event if you want to get audio
// data after the hotword.
console.log(buffer);
console.log('hotword', index, hotword);
});
const mic = record.start({
threshold: 0,
verbose: true
});
mic.pipe(detector);
因为这个sample没有指定node-record-lpcm16的版本号,经过一番调试发现新版1.x版本已经改了API,所以我这边翻了一下文档才发现API的改动:
// index.js
const { record } = require('node-record-lpcm16');
const mic = record({
sampleRate: 16000,
threshold: 0.5,
recorder: 'rec',
device: 'plughw:CARD=Snowball',
}).stream();
这里加了一些新的参数,首先是指定Snowball的硬件ID,这个硬件ID可以通过arecord -L命令找到。另外设置了16k的采样率, 因为Snowboy的model都是用16k采样率的音频来训练的,采样率不一致就识别不出来。另外把阈值调高了一些,阻挡一些噪音。
按照文档修改使用Jarvis的模型,并调整灵敏度参数:
// index.js
models.add({
file: 'snowboy/resources/models/jarvis.umdl',
sensitivity: '0.8,0.80',
hotwords : ['jarvis', 'jarvis'],
});
使用Jarvis模型测试之后发现已经可以识别Jarvis的hotword,并且触发hotword回调。这里我想了一下,我需要把音频流保存下来,然后传到讯飞进行听写获取文字。 所以当hotword事件触发的时候,需要把mic的流转移到一个fsWriteStream里面写入音频文件。Snowboy的Detector也有sound和silence的回调,所以我通过一个 简单的flag来实现了语音录制,并在说话结束的时候传到讯飞的听写API。
// index.js
const { xunfeiTranscriber } = require('./xunfei_stt');
let audios = 0;
let duplex;
let silenceCount;
let speaking;
const init = () => {
const filename = `audio${audios}.wav`;
duplex = fs.createWriteStream(filename, { binary: true });
silenceCount = 0;
speaking = false;
console.log(`initialized audio write stream to ${filename}`);
};
const transcribe = () => {
console.log('transcribing');
const filename = `audio${audios}.wav`;
xunfeiTranscriber.push(filename);
};
detector.on('silence', function () {
if (speaking) {
if (++silenceCount > MAX_SILENCE_COUNT) {
mic.unpipe(duplex);
duplex.destroy();
transcribe();
audios++;
init();
}
}
console.log('silence', speaking, silenceCount);
});
detector.on('sound', function (buffer) {
if (speaking) {
silenceCount = 0;
}
console.log('sound');
});
detector.on('hotword', function (index, hotword, buffer) {
if (!speaking) {
silenceCount = 0;
speaking = true;
mic.pipe(duplex);
}
console.log('hotword', index, hotword);
});
mic.pipe(detector);
init();
上面这段代码里面xunfeiTranscriber就是我们的讯飞听写模块。因为现在存的是一个音频文件,所以如果API是直接把整个音频传过去然后获得文字的话,是 最舒服的。但是很遗憾,讯飞弃用了REST API,而转用了基于WebSocket的流式听写API,所以只能老老实实手撸一个client。这里我用了EventEmitter来做 消息通信,这样可以比较快地和主程序互通信息。
// xunfei_stt.js
const EventEmitter = require('events');
const WebSocket = require('ws');
let ws;
let transcriptionBuffer = '';
class XunfeiTranscriber extends EventEmitter {
constructor() {
super();
this.ready = false;
this.on('ready', () => {
console.log('transcriber ready');
this.ready = true;
});
this.on('error', (err) => {
console.log(err);
});
this.on('result', () => {
cleanupWs();
this.ready = false;
init();
});
}
push(audioFile) {
if (!this.ready) {
console.log('transcriber not ready');
return;
}
this.emit('push', audioFile);
}
}
function init() {
const host = 'iat-api.xfyun.cn';
const path = '/v2/iat';
const xunfeiUrl = () => {
return `ws://${host}${path}?host=${host}&date=${encodeURIComponent(dateString)}&authorization=${authorization}`;
};
const url = xunfeiUrl();
console.log(url);
ws = new WebSocket(url);
ws.on('open', () => {
console.log('transcriber connection established');
xunfeiTranscriber.emit('ready');
});
ws.on('message', (data) => {
console.log('incoming xunfei transcription result');
const payload = JSON.parse(data);
if (payload.code !== 0) {
cleanupWs();
init();
xunfeiTranscriber.emit('error', payload);
return;
}
if (payload.data) {
transcriptionBuffer += payload.data.result.ws.reduce((acc, item) => {
return acc + item.cw.map(cw => cw.w);
}, '');
if (payload.data.status === 2) {
xunfeiTranscriber.emit('result', transcriptionBuffer);
}
}
});
ws.on('error', (error) => {
console.log(error);
cleanupWs();
});
ws.on('close', () => {
console.log('closed');
init();
});
}
const xunfeiTranscriber = new XunfeiTranscriber();
init();
module.exports = {
xunfeiTranscriber,
};
处理push事件这个地方比较棘手,经过测试发现,讯飞听写API只支持每条websocket消息发送13k的音频信息。音频信息是通过base64编码的,所以每条最多只能发大概9k字节。 这里需要根据讯飞API文档进行分批发送,并且在最后一定需要发end frame,不然API会超时导致关闭。返回的文字也是分段的,所以需要一个buffer来存储,等全部文字都返回之后再拼接输出。
// xunfei_stt.js
const fs = require('fs');
xunfeiTranscriber.on('push', function pushAudioFile(audioFile) {
transcriptionBuffer = '';
const audioPayload = (statusCode, audioBase64) => ({
common: statusCode === 0 ? {
app_id: process.env.XUNFEI_APPID,
} : undefined,
business: statusCode === 0 ? {
language: 'zh_cn',
domain: 'iat',
ptt: 0,
} : undefined,
data: {
status: statusCode,
format: 'audio/L16;rate=16000',
encoding: 'raw',
audio: audioBase64,
},
});
const chunkSize = 9000;
const buffer = new Buffer(chunkSize);
fs.open(audioFile, 'r', (err, fd) => {
if (err) {
throw err;
}
let i = 0;
function readNextChunk() {
fs.read(fd, buffer, 0, chunkSize, null, (errr, nread) => {
if (errr) {
throw errr;
}
if (nread === 0) {
console.log('sending end frame');
ws.send(JSON.stringify({
data: { status: 2 },
}));
return fs.close(fd, (err) => {
if (err) {
throw err;
}
});
}
let data;
if (nread < chunkSize) {
data = buffer.slice(0, nread);
} else {
data = buffer;
}
const audioBase64 = data.toString('base64');
console.log('chunk', i, 'size', audioBase64.length);
const payload = audioPayload(i >= 1 ? 1 : 0, audioBase64);
ws.send(JSON.stringify(payload));
i++;
readNextChunk();
});
}
readNextChunk();
});
});
细心的同学应该留意到有些重启逻辑在这段代码里面,这是因为测试过程中,发现讯飞这个API每个连接只支持发送一条消息,接受新的音频流需要重新连接API。。。 所以只好在每条消息发送完之后主动关闭WebSocket连接。
接下来是整合图灵机器人获取回复的部分了,xunfeiTranscriber提供一个result事件,所以这里通过监听result事件,把消息收到之后传入图灵机器人。
// index.js
const { tulingBot } = require('./tuling_bot');
xunfeiTranscriber.on('result', async (data) => {
console.log('transcriber result:', data);
const response = await tulingBot(data);
console.log(response);
});
// tuling_bot.js
const axios = require('axios');
const url = 'http://openapi.tuling123.com/openapi/api/v2';
async function tulingBot(text) {
const response = await axios.post(url, {
reqType: 0,
perception: {
inputText: {
text,
},
},
userInfo: {
apiKey: process.env.TULING_API_KEY,
userId: 'myUser',
},
});
console.log(JSON.stringify(response.data, null, 2));
return response.data;
}
module.exports = {
tulingBot,
};
对接完图灵机器人之后,我们需要把图灵机器人返回的文字进行语音合成。这里讯飞语音合成的WebAPI还是基于REST的,也已经有人做了对应的开源实现了,所以比较简单。
// index.js
const { xunfeiTTS } = require('./xunfei_tts');
xunfeiTranscriber.on('result', async (data) => {
console.log('transcriber result:', data);
const response = await tulingBot(data);
const playVoice = (filename) => {
return new Promise((resolve, reject) => {
const speaker = new Speaker({
channels: 1,
bitDepth: 16,
sampleRate: 16000,
});
const outStream = fs.createReadStream(filename);
// this is just to activate the speaker, 2s delay
speaker.write(Buffer.alloc(32000, 10));
outStream.pipe(speaker);
outStream.on('end', resolve);
});
};
for (let i = 0; i < response.results.length; i++) {
const result = response.results[i];
if (result.values && result.values.text) {
const outputFilename = await xunfeiTTS(result.values.text, `${audios-1}-${i}`);
if (outputFilename) {
await playVoice(outputFilename);
}
}
}
});
// xunfei_tts.js
const fs = require('fs');
const xunfei = require('xunfeisdk');
const { promisify } = require('util');
const writeFileAsync = promisify(fs.writeFile);
const client = new xunfei.Client(process.env.XUNFEI_APPID);
client.TTSAppKey = process.env.XUNFEI_TTS_KEY;
async function xunfeiTTS(text, audios) {
console.log('turning following text into speech:', text);
try {
const result = await client.TTS(
text,
xunfei.TTSAufType.L16_16K,
xunfei.TTSAueType.RAW,
xunfei.TTSVoiceName.XiaoYan,
);
console.log(result);
const filename = `response${audios}.wav`;
await writeFileAsync(filename, result.audio);
console.log(`response written to ${filename}`);
return filename;
} catch (err) {
console.log(err.response.status);
console.log(err.response.headers);
console.log(err.response.data);
return null;
}
}
module.exports = {
xunfeiTTS,
};
最后这个机器人就可以听懂我说的话啦!
下面附上完整代码
后记
我觉得整体的运行效果还是不错的,并且可以高度自定义。我希望后面再测试一下其他不同厂商的语音API,并且对接上Rasa和Wechaty,这样在家里就可以和机器人对话, 并且能够在微信里面获得一些图文的信息。讯飞的API整合出乎意料之外地复杂,并且有一个我觉得比较致命的问题是,讯飞的WebAPI连接延时特别严重,我一开始以为是 板子的问题,后面发现单独调用图灵API和讯飞API,发现图灵API的响应速度非常快,但是讯飞API就在连接上就花了很长时间,所以现在的STT模块需要预热,等连接准备 好才可以说话。后面我想换用其他厂商的API,看看能不能改善一下体验。
希望这个demo能够起到一个抛砖引玉的作用,在未来可以看到更多更酷炫的语音助手和机器人。