在spark做批处理时,有时需要定时提交spark任务,可以采用写shell脚本和java代码两种方式。本文介绍使用java代码的方式。
- 定时器
@Scheduled(cron = "0 20 0 * * *")
public void tnvDSumSchd(){
try{
List<String> taskType_files_list = new ArrayList<>();
taskType_files_list.add("TESTIN_VIDEO_D_SUMMAY");
String legalDir = tools.getLegalDir(testinHdfsDir);
taskType_files_list.addAll(tools.fileLists(legalDir, new TestInFilterPath(FileType.VIDEO.getType() + getDay())));
String[] sparkParam = new String[4];
sparkParam[0] = "1g";
sparkParam[1] = "2g";
sparkParam[2] = "2";
sparkParam[3] = "1";
if (taskType_files_list.size() > 1){
submitLancher.submit(taskType_files_list,sparkParam,"TESTIN_VIDEO_D_SUMMAY");
}else {
log.info("{} does not have any testin video file",legalDir);
}
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
} catch (IOException e) {
log.error(e.getMessage(),e);
}
}
- 提交spark任务的类
@Component
@Slf4j
public class SubmitLancher {
@Value("${spark.home}")
private String sparkHome;//sparkhome的路径
@Value("${spark.submit.appResource}")
private String submitAppResource;//提交到spark上的jar包
public void submit(List<String> taskType_files_list, String[] sparkParam,String appName) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
String[] taskType_files = taskType_files_list.toArray(new String[taskType_files_list.size()]);
final CountDownLatch countDownLatch = new CountDownLatch(1);
new SparkLauncher()
.setSparkHome(sparkHome)
.setMaster("yarn")
.setAppName(appName)
.setConf("spark.driver.memory", sparkParam[0])
.setConf("spark.executor.memory", sparkParam[1])
.setConf("spark.executor.cores", sparkParam[2])
.setConf("spark.driver.cores",sparkParam[3])
.setAppResource(submitAppResource)
.setMainClass("submit.SubmitMain")//程序主入口
.setDeployMode("cluster")
.addAppArgs(taskType_files)
.startApplication(new SparkAppHandle.Listener(){
@Override
public void stateChanged(SparkAppHandle handle) {
if (handle.getState().isFinal()){
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
}
});
countDownLatch.await();
long end = System.currentTimeMillis();
log.info("{} summary finished,and used time:{} ms",taskType_files_list.get(0),end - start);
}
}
- 注意
- countDownLatch.await(); 这行代码会一直阻塞代码,直到countDownLatch 的值减到0结束阻塞。这样做是为了防止spark任务在exector上未运行结束,但是driver代码已经停止导致的任务异常结束。
- 使用java代码调度时尽量不用使用springboot框架,因为spark任务是从master上发送任务到executor上,在executor上不能创建spring容器。
- 由于spark依赖都是使用线上的spark环境,所以在启动程序时需要加上 SparkLauncher 类,因此需要将spark-launcher_2.10-1.6.2.jar 一起作为依赖包提交执行。