Flink 使用介绍相关文档目录
前言
Flink 2.1.0新增了AI功能,标志着实时数据引擎向统一 Data + AI 的里程碑式跨越。
目前Flink 2.1.x版本实现了基于OpenAI接口的流式推理任务。支持使用函数调用LLM,实现chat_competion功能。
本篇为大家带来 Flink + AI 功能的使用演示。
环境配置和使用
环境信息:
- Flink 2.1.1
- JDK17
Flink 2.x以上版本不再提供JDK 8 的支持,请各位用户将JDK升级为JDK 17。
使用如下命令下载Flink,并解压到任意目录:
wget https://dlcdn.apache.org/flink/flink-2.1.1/flink-2.1.1-bin-scala_2.12.tgz
然后配置JDK 17 JAVA_HOME:
export JAVA_HOME=/path/to/jdk17
接下来下载依赖:Maven Repository: org.apache.flink » flink-model-openai » 2.1.1。放入Flink安装目录的lib中。
为了演示方便,建议使用Flink的Standalone模式运行。配置方法如下:
- 编辑
FLINK_HOME中conf/masters和conf/workers文件,指定需要启动的JobManager,TaskManager的位置(哪个host上)和个数。 - 执行
FLINK_HOME/bin/start-cluster.sh,启动集群。 - 集群启动成功之后,执行
FLINK_HOME/bin/sql-client.sh embedded,启动Flink SQL client。
注意:如果想要在Yarn上运行Flink 2.x版本,因为当前Yarn只支持JDK 8,需要在JDK 8 的Yarn上运行JDK 17的Flink。具体配置方法请参考Flink 使用之配置与调优中的
Flink on Yarn 模式配置JDK一节。
使用Flink SQL调用大语言模型的步骤:
- 创建Model。需要指明模型提供方和系统提示词。
- 调用ML_PREDICT函数。指定Model和作为提示词传入的字段。
官网示例如下:
-- 创建Model
-- 修改endpoint,api-key和model与真实环境对应后执行
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
'provider'='openai',
'endpoint'='http://ip:10000/v1/chat/completions',
'api-key' = 'your-api-key',
'model'='qwen3',
'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label./no_think'
);
-- 示例数据,一个只有一行数据的临时view
-- 真实使用时可以是Kafka表,Hudi表等等
CREATE TEMPORARY VIEW movie_comment(id, movie_name, user_comment, actual_label) AS VALUES
(1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It is a very romantic narrative compared to other movies I have seen. Very gentle and full of love.', 'positive');
-- 执行推理,显示推理结果
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
TABLE movie_comment,
MODEL ai_analyze_sentiment,
DESCRIPTOR(user_comment));
配置参数
以下参数摘自官网列出的常规配置参数。这些参数为接入LLM的常规配置项。不再一一解释。
Common #
Chat Completions #
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| system-prompt | optional | "You are a helpful assistant." | String | The input message for the system role. |
| temperature | optional | null | Double | Controls randomness of output, range [0.0, 1.0]. See temperature
|
| top-p | optional | null | Double | Probability cutoff for token selection (used instead of temperature). See top_p |
| stop | optional | null | String | Stop sequences, comma-separated list. See stop |
| max-tokens | optional | null | Long | Maximum number of tokens to generate. See max tokens |
此外,Flink官方还提供了Async predict(异步推理支持)。能够平衡吞吐量和调用延迟。
使用示例如下:
SELECT * FROM ML_PREDICT(TABLE input, MODEL mdl, descriptor(f1, f2), MAP['async', 'true']);
官方提供配置项主要是和请求的并行度,以及请求超时和失败重试机制相关。示例配置项如下所示:
table.exec.async-ml-predict.max-concurrent-operations: 10
table.exec.async-ml-predict.timeout: 30s
table.exec.async-ml-predict.retry-strategy: FIXED_DELAY
table.exec.async-ml-predict.fixed-delay: 10s
table.exec.async-ml-predict.max-attempts: 3
社区未来规划
FLIP-548: Introduce Tool and Agent in Flink SQL - Apache Flink - Apache Software Foundation:提供tools和agent管理调用功能。
FLIP-540: Support VECTOR_SEARCH in Flink SQL - Apache Flink - Apache Software Foundation:提供向量检索功能。
FLIP-525: Model ML_PREDICT, ML_EVALUATE Implementation Design - Apache Flink - Apache Software Foundation:完善LM_PREDICT和ML_EVALUATE。