public class K8sApplicationClient {
protected static final String KUBE_CONFIG_FILE = System.getProperty("user.dir") + "/river-flink/src/main/resources/k8s/config";
protected static final String JOB_MANAGER_SERVICE_ACCOUNT = "flink";
protected static final String NAMESPACE = "default";
protected static final String CLUSTER_ID = "my-first-application-cluster";
protected static final String CONTAINER_IMAGE = "flinkjobdemo:001";
protected static String JOB_JAR = "local:///opt/flink/usrlib/FlinkJobdemo.jar";
protected static final KubernetesConfigOptions.ImagePullPolicy CONTAINER_IMAGE_PULL_POLICY =
KubernetesConfigOptions.ImagePullPolicy.IfNotPresent;
protected static final int JOB_MANAGER_MEMORY = 768;
protected static final int TOTAL_FLINK_MEMORY = 768;
private static KubernetesClusterDescriptor descriptor;
private static ClusterSpecification clusterSpecification =
new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
private static ApplicationConfiguration appConfig =
new ApplicationConfiguration(new String[0], null);
private static Configuration flinkConfig = new Configuration();
private static File flinkConfDir = new File(System.getProperty("user.dir") + "/river-flink/flinkJobTemp/test-job001");
public static void main(String[] args) {
try {
setup();
submit();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void submit () throws Exception {
KubernetesClusterClientFactory factory = new KubernetesClusterClientFactory();
descriptor = factory.createClusterDescriptor(flinkConfig);
descriptor.deployApplicationCluster(clusterSpecification, appConfig);
}
public static void setup() throws Exception {
setupFlinkConfig();
writeFlinkConfiguration();
}
protected static void setupFlinkConfig() {
// 作业镜像地址
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(JOB_JAR));
// 配置k8s
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
// k8s config path
flinkConfig.setString(KubernetesConfigOptions.KUBE_CONFIG_FILE, KUBE_CONFIG_FILE);
// k8s 服务账号
flinkConfig.setString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, JOB_MANAGER_SERVICE_ACCOUNT);
// namespace
flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, NAMESPACE);
// cluster_id
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
// docker镜像
flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, CONTAINER_IMAGE);
// 镜像拉取策略
// flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);
// jobManager memory
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
// taskManager memory
flinkConfig.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.ofMebiBytes(TOTAL_FLINK_MEMORY));
// flinkConf 路径
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, flinkConfDir.toString());
}
protected static void writeFlinkConfiguration() throws IOException {
BootstrapTools.writeConfiguration(
flinkConfig, new File(flinkConfDir, "flink-conf.yaml"));
}
}
flink on k8s Application模式 java代码提交作业
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- flink1.13于5月4日发布,版本迭代之快,更是体现了其势不可挡的发展速度。社区活跃度逐步攀升,当然少不了与云...
- 生成 docker镜像 下载链接中的配置文件参考:https://github.com/apache/flink/...
- 0.背景 关于 Flink 的 Application Mode 自己还比较陌生,像 -d 等参数自己也不熟悉,决...
- 本文基于 Flink-1.13 介绍 Flink on Kubernetes 的部署模式,重点讲述 Session...