1. 环境说明
- flink 2.12(scala)-1.14.2
- flink集群以session模式运行在YARN上
2. programArgs设置方法
/jars/:jarid/run |
Verb: POST ............... Response code: 200 OK
Submits a job by running a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters. |
Path parameters |
- jarid - String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars). |
Query parameters |
- allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job. |
- savepointPath (optional): String value that specifies the path of the savepoint to restore the job from. |
- program-args (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan |
- programArg (optional): Comma-separated list of program arguments. |
- entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest. |
- parallelism (optional): Positive integer value that specifies the desired parallelism for the job. |
- Request
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
"properties" : {
"allowNonRestoredState" : {
"type" : "boolean"
"entryClass" : {
"type" : "string"
"jobId" : {
"type" : "any"
"parallelism" : {
"type" : "integer"
"programArgs" : {
"type" : "string"
"programArgsList" : {
"type" : "array",
"items" : {
"type" : "string"
"savepointPath" : {
"type" : "string"
- Response
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
"properties" : {
"jobid" : {
"type" : "any"
- 文档中描述的"Query parameters"和Request请求体是两部分内容,前者是QueryParam参数,后者是HttpBody的格式。
- 有3种方法设置programArgs:
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.queryParam("programArg" , "--paramA,valueA,--paramB,valueB,--paramC,valueC")
) ;
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.setJsonEntity(new JSONObject().put("programArgs", "--paramA,valueA,--paramB,valueB,--paramC,valueC"))
) ;
getClient().ask(Request.POST().path(sPOST_runJar , mMainJarId)
.queryParam("entry-class" , "org.sailboat.flink.Test")
.setJsonEntity(new JSONObject().put("programArgsList", new JSONArray()
) ;
3. 从Flink客户端代码理解参数设置
- org.apache.flink.runtime.webmonitor.handlers.JarRequestBody
- org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils
- org.apache.flink.runtime.webmonitor.handlers.JarRunHandler