1. 环境说明
- flink 2.12(scala)-1.14.2
- flink集群以session模式运行在YARN上
2. programArgs设置方法
调用Flink的REST接口运行指定的jar以运行Job。
下面是关于运行jar包的API的官方说明
/jars/:jarid/run |
---|
Verb: POST ............... Response code: 200 OK
|
Submits a job by running a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters. |
Path parameters |
- jarid - String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars). |
Query parameters |
- allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job. |
- savepointPath (optional): String value that specifies the path of the savepoint to restore the job from. |
- program-args (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan |
- programArg (optional): Comma-separated list of program arguments. |
- entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest. |
- parallelism (optional): Positive integer value that specifies the desired parallelism for the job. |
- Request
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
"properties" : {
"allowNonRestoredState" : {
"type" : "boolean"
},
"entryClass" : {
"type" : "string"
},
"jobId" : {
"type" : "any"
},
"parallelism" : {
"type" : "integer"
},
"programArgs" : {
"type" : "string"
},
"programArgsList" : {
"type" : "array",
"items" : {
"type" : "string"
}
},
"savepointPath" : {
"type" : "string"
}
}
}
- Response
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
"properties" : {
"jobid" : {
"type" : "any"
}
}
}
说明:
- 文档中描述的"Query parameters"和Request请求体是两部分内容,前者是QueryParam参数,后者是HttpBody的格式。
- 有3种方法设置programArgs:
方法1:用programArg这个QueryParam参数设置。示例代码:
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.queryParam("programArg" , "--paramA,valueA,--paramB,valueB,--paramC,valueC")
) ;
方法2:在HttpBody中用programArgs属性设置。注意它的类型是“string”。示例代码:
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.setJsonEntity(new JSONObject().put("programArgs", "--paramA,valueA,--paramB,valueB,--paramC,valueC"))
) ;
方法3:在HttpBody中用programArgsList属性设置。注意它的类型是“array”。示例代码:
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.setJsonEntity(new JSONObject().put("programArgsList", new JSONArray()
.put("--paramA").put("valueA")
.put("--paramB").put("valueB")
.put("--paramC").put("valueC")))
) ;
3. 从Flink客户端代码理解参数设置
-
org.apache.flink.client.cli.CliFrontend
-
org.apache.flink.client.cli.ProgramOptions
4.Flink服务端相关类
- org.apache.flink.runtime.webmonitor.handlers.JarRequestBody
- org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
- org.apache.flink.runtime.webmonitor.handlers.JarRunHandler