spark程序中调用shell脚本

scala直接调用shell脚本是不行的,但是可以利用java调用shell脚本然后在spark代码中引入java代码实现。

  • 参考:java代码调用shell脚本
  • shell脚本必须在spark的driver端调用,在worker端只能处理数据。因此必须在spark的DAG引擎开始或者结束以及两个job之间调用shell脚本。
  • 根据以上前提,spark执行shell脚本代码只能在以下几个位置:
  1. SparkContext创建之前和创建之后但是创建RDD之前。
  2. 每一个job的Spark的action函数执行之后以及下一个job的transformation函数执行之前。
  3. 最后一个job的action函数执行结束之后。

demo

  • 先写一个java类,拥有一个调用shell脚本的方法。
public class MyJavaClass {

    public void executeShell(String shpath) {
        try {
            Process ps = Runtime.getRuntime().exec(shpath);
            ps.waitFor();
            BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
            StringBuffer sb = new StringBuffer();
            String line;
            while ((line = br.readLine()) != null) {
                sb.append(line).append("\n");
            }
            String result = sb.toString();
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • spark程序的主类
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("wordCount").setMaster("yarn")
    val sc=new SparkContext(conf)

    val rdd=sc.textFile("hdfs:///test/Hamlet.txt")
    val rdd1=rdd.flatMap(x=>x.split(" "))
      .filter(x=>x.size>1)
      .map(x=>(x,1))
      .reduceByKey(_+_)
      .map{case(x,y)=>(y,x)}
    rdd1.sortByKey(false)
      .map{case(a,b)=>(b,a)}
      .saveAsTextFile("hdfs:///test/output.txt")//saveAsTextFile是个action,真正开始提交job,
    // 调用shell脚本
    val shpath = "/data/spark-runshell.sh";
    val javaClass = new MyJavaClass()
    val addResult = javaClass.executeShell(shpath)
    println(addResult);
  }
}
  • 注意:在maven工程中,scala包里不能写java类,否则编译不通过,必须将java类放在java包里才能编译通过。


    maven工程.png
  • jar包提交到spark集群
#!/bin/bash
spark-submit \
--master yarn \
--class com.kouyy.test.SparkWordCount /data/sparkdemo-1.0-SNAPSHOT.jar
服务器上spark程序Jar包及运行脚本.png
  • spark-runshell.sh内容
#!/bin/bash
echo 'spark调用shell脚本运行成功'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'hahahahahhaha'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
echo 'spark调用shell脚本运行成功'
  • 运行spark程序结果


    运行成功.png
调用shell脚本成功且在driver端运行.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容