flink自定义数据源

实体类:


图片.png
package bean;


public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
}


测试代码:

import bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

public class Source_Custom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(new MySocketSource("hadoop162", 9999)).print();

        env.execute();
    }

    static class MySocketSource implements SourceFunction<WaterSensor> {

        private String host;
        private int port;
        private boolean isCanceled;

        public MySocketSource(String host, int port) {
            this.host = host;
            this.port = port;
            this.isCanceled = false;
        }

        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            Socket socket = new Socket(host, port);
            InputStream is = socket.getInputStream();
            InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
            BufferedReader br = new BufferedReader(isr);

            String line = br.readLine();
            while (line != null && !isCanceled) {
                String[] arr = line.split(",");
                WaterSensor sensor = new WaterSensor(arr[0], Long.valueOf(arr[1]), Integer.valueOf(arr[2]));
                ctx.collect(sensor);

                line = br.readLine();
            }

        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }
}

输入输出:


图片.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容