实体类:

图片.png
package bean;
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
public WaterSensor(String id, Long ts, Integer vc) {
this.id = id;
this.ts = ts;
this.vc = vc;
}
@Override
public String toString() {
return "WaterSensor{" +
"id='" + id + '\'' +
", ts=" + ts +
", vc=" + vc +
'}';
}
}
测试代码:
import bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
public class Source_Custom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new MySocketSource("hadoop162", 9999)).print();
env.execute();
}
static class MySocketSource implements SourceFunction<WaterSensor> {
private String host;
private int port;
private boolean isCanceled;
public MySocketSource(String host, int port) {
this.host = host;
this.port = port;
this.isCanceled = false;
}
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
Socket socket = new Socket(host, port);
InputStream is = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String line = br.readLine();
while (line != null && !isCanceled) {
String[] arr = line.split(",");
WaterSensor sensor = new WaterSensor(arr[0], Long.valueOf(arr[1]), Integer.valueOf(arr[2]));
ctx.collect(sensor);
line = br.readLine();
}
}
@Override
public void cancel() {
isCanceled = true;
}
}
}
输入输出:

图片.png