原理
- shell bolt 将数据和命令序列化成 json 后输出到shell 进程中,shell 进程接收 json 解析后将结果输出到 stdout 中,shell bolt 获取 stdout 的输出进行解析然后调用相应的 api 进行后续操作(emit,ack,fail 等)
- 相关协议见:http://storm.apache.org/releases/1.1.1/Multilang-protocol.html
实现:
- SimpleShellBolt
public class SimpleShellBolt extends ShellBolt implements IRichBolt {
public SimpleShellBolt() {
// 划重点:此处注意传参的方式 不要写成一个 string, 比如"sh start.sh"
// start.sh 作用是对二进制文件 加执行权限后进行调用,否则会提示没执行权限
super("sh", "start.sh");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
- shell 脚本
- shell 脚本需要放置到 src/main/resources/resources 目录中
- submit 时 storm 会将 jar 包中的 resources 目录拷贝到 storm 工作目录下的resources 目录下(比如:storm-local/supervisor/stormdist/kafka-2-shell-topic-consumer-38-1513754394/resources/)
- shell bolt 运行 shell 进程时会 cd 到storm 工作目录下的resources进行启动
踩过的坑
-
使用的 storm cpp 程序中对 heart beat 的判断不准(估计是 shellbolt 代码升级所致)导致抛出『Anchored onto * after ack/fail』 的异常
- stormcpp 源码https://github.com/honeyligo/StormCpp
- 修改
vim https://github.com/honeyligo/StormCpp/blob/master/public/Storm.h bool IsHeartbeatTuple() { // if (this->_task == -1 && this->_stream.compare("__heartbeat") == 0) if (this->_stream.compare("__heartbeat") == 0) return true; return false; }
-
shellbolt 调用 shell 命令时遇到 no such file 的异常
- 原因1.:二进制和脚本放在src/main/resources/ 打包后不会上传到集群中,解决方案为放到src/main/resources/resources目录下
- 原因2:使用 super("sh start.sh"); 整个字符串是作为一个文件路径进行查找的,实际应为 super("sh", "start.sh");