Flink 2.x 实时 Data + AI

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

Flink 2.1.0新增了AI功能,标志着实时数据引擎向统一 Data + AI 的里程碑式跨越。

目前Flink 2.1.x版本实现了基于OpenAI接口的流式推理任务。支持使用函数调用LLM,实现chat_competion功能。

本篇为大家带来 Flink + AI 功能的使用演示。

环境配置和使用

环境信息:

  • Flink 2.1.1
  • JDK17

Flink 2.x以上版本不再提供JDK 8 的支持,请各位用户将JDK升级为JDK 17。

使用如下命令下载Flink,并解压到任意目录:

wget https://dlcdn.apache.org/flink/flink-2.1.1/flink-2.1.1-bin-scala_2.12.tgz

然后配置JDK 17 JAVA_HOME:

export JAVA_HOME=/path/to/jdk17

接下来下载依赖:Maven Repository: org.apache.flink » flink-model-openai » 2.1.1。放入Flink安装目录的lib中。

为了演示方便,建议使用Flink的Standalone模式运行。配置方法如下:

  1. 编辑FLINK_HOMEconf/mastersconf/workers文件,指定需要启动的JobManager,TaskManager的位置(哪个host上)和个数。
  2. 执行FLINK_HOME/bin/start-cluster.sh,启动集群。
  3. 集群启动成功之后,执行FLINK_HOME/bin/sql-client.sh embedded,启动Flink SQL client。

注意:如果想要在Yarn上运行Flink 2.x版本,因为当前Yarn只支持JDK 8,需要在JDK 8 的Yarn上运行JDK 17的Flink。具体配置方法请参考Flink 使用之配置与调优中的Flink on Yarn 模式配置JDK一节。

使用Flink SQL调用大语言模型的步骤:

  1. 创建Model。需要指明模型提供方和系统提示词。
  2. 调用ML_PREDICT函数。指定Model和作为提示词传入的字段。

官网示例如下:

-- 创建Model
-- 修改endpoint,api-key和model与真实环境对应后执行
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
    'provider'='openai',
    'endpoint'='http://ip:10000/v1/chat/completions',
    'api-key' = 'your-api-key',
    'model'='qwen3',
    'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label./no_think'
);

-- 示例数据,一个只有一行数据的临时view
-- 真实使用时可以是Kafka表,Hudi表等等
CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment, actual_label) AS VALUES
  (1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It is a very romantic narrative compared to other movies I have seen. Very gentle and full of love.', 'positive');

-- 执行推理,显示推理结果
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
  TABLE movie_comment,
  MODEL ai_analyze_sentiment,
  DESCRIPTOR(user_comment));

配置参数

以下参数摘自官网列出的常规配置参数。这些参数为接入LLM的常规配置项。不再一一解释。

Common #

Option Required Default Type Description
provider required (none) String Specifies the model function provider to use, must be 'openai'.
endpoint required (none) String Full URL of the OpenAI API endpoint, e.g. https://api.openai.com/v1/chat/completions or https://api.openai.com/v1/embeddings.
api-key required (none) String OpenAI API key for authentication.
model required (none) String Model name, e.g. gpt-3.5-turbo, text-embedding-ada-002.

Chat Completions #

Option Required Default Type Description
system-prompt optional "You are a helpful assistant." String The input message for the system role.
temperature optional null Double Controls randomness of output, range [0.0, 1.0]. See temperature
top-p optional null Double Probability cutoff for token selection (used instead of temperature). See top_p
stop optional null String Stop sequences, comma-separated list. See stop
max-tokens optional null Long Maximum number of tokens to generate. See max tokens

此外,Flink官方还提供了Async predict(异步推理支持)。能够平衡吞吐量和调用延迟。

使用示例如下:

SELECT * FROM ML_PREDICT(TABLE input, MODEL mdl, descriptor(f1, f2), MAP['async', 'true']);

官方提供配置项主要是和请求的并行度,以及请求超时和失败重试机制相关。示例配置项如下所示:

table.exec.async-ml-predict.max-concurrent-operations: 10
table.exec.async-ml-predict.timeout: 30s
table.exec.async-ml-predict.retry-strategy: FIXED_DELAY
table.exec.async-ml-predict.fixed-delay: 10s
table.exec.async-ml-predict.max-attempts: 3

参考文献:OpenAI | Apache Flink

社区未来规划

FLIP-548: Introduce Tool and Agent in Flink SQL - Apache Flink - Apache Software Foundation:提供tools和agent管理调用功能。
FLIP-540: Support VECTOR_SEARCH in Flink SQL - Apache Flink - Apache Software Foundation:提供向量检索功能。
FLIP-525: Model ML_PREDICT, ML_EVALUATE Implementation Design - Apache Flink - Apache Software Foundation:完善LM_PREDICT和ML_EVALUATE。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容