Doris的导入(Load)功能就是将用户的原始数据导入到 Doris表中。Doris底层实现了统一的流式导入框架,而在这个框架之上,Doris提供了非常丰富的导入方式以适应不同的数据源和数据导入需求。Stream Load是Doris用户最常用的数据导入方式之一,它是一种同步的导入方式, 允许用户通过Http访问的方式将CSV格式或JSON格式的数据批量地导入Doris,并返回数据导入的结果。用户可以直接通过Http请求的返回体判断数据导入是否成功,也可以通过在客户端执行查询SQL来查询历史任务的结果。另外,Doris还为Stream Load提供了结果审计功能,可以通过审计日志对历史的Stream Load任务信息进行审计。
stream load
curl命令
- csv
test.csv
2,2,neo,10
// csv
curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/testDb/testTbl/_stream_load
curl --location-trusted -u user:pwd -T test.csv -H "label:test" -H "column_separator:," -XPUT http://ip:port/api/dbName/tbName/_stream_load --output log.txt
- json
test.json
[{"siteid":5,"city":"5","username":"neo5", "pv": 50},{"siteid":6,"city":"6","username":"neo6", "pv": 60}]
curl --location-trusted -u user:pwd -H "columns: siteid, city, username, pv" -H "label:test3" -H "format: json" -H "jsonpaths: [\"$.siteid\",\"$.city\",\"$.username\",\"$.pv\"]" -H "strip_outer_array: true" -T test.json -XPUT http://192.161.1.1:8030/api/test/site_visit/_stream_load --output log.txt
====>
{
"TxnId": 538641,
"Label": "test3",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 106,
"LoadTimeMs": 113,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 93,
"CommitAndPublishTimeMs": 15
}
Java代码
- DorisStreamLoad
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
private String user;
private String passwd;
private String loadUrlStr;
private String hostPort;
private String db;
private String tbl;
private String authEncoding;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}
public String getLoadUrlStr() {
return loadUrlStr;
}
public String getHostPort() {
return hostPort;
}
public void setHostPort(String hostPort) {
this.hostPort = hostPort;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl);
}
private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
// csv文件,逗号分隔
conn.addRequestProperty("column_separator", ",");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
}
public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;
public LoadResponse(int status, String respMsg, String respContent) {
this.status = status;
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status: ").append(status);
sb.append(", resp msg: ").append(respMsg);
sb.append(", resp content: ").append(respContent);
return sb.toString();
}
}
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}", loadResponse);
if (loadResponse.status != 200) {
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
} else {
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
throw new StreamLoadException("stream load error: " + respContent.getMessage());
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
}
public LoadResponse loadBatch(Object value) {
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(value.toString().getBytes());
bos.close();
// get respond
int status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
// log.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}",label, status, respMsg, response.toString());
return new LoadResponse(status, respMsg, response.toString());
} catch (Exception e) {
e.printStackTrace();
String err = "failed to load audit via AuditLoader plugin with label: " + label;
LOG.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
} finally {
if (feConn != null) {
feConn.disconnect();
}
if (beConn != null) {
beConn.disconnect();
}
}
}
public static void main(String[] args) {
// String hostPort = "192.168.1.192:9030";
// String hostPort = "192.168.1.192:8030";
String hostPort = "192.168.1.192:8040";
String db = "test";
String tbl = "site_visit";
String user = "test";
String passwd = "test";
// DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, db, tbl, user, passwd);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("192.168.1.192:8040", "test", "site_visit", "test", "test");
dorisStreamLoad.loadBatch(mockLoadData());
}
public static String mockLoadData() {
Integer siteid = 2;
Integer city = 2;
String username = "neo2";
Integer pv = 10;
StringBuffer sb = new StringBuffer();
// csv
sb.append(2).append(",")
.append(2).append(",")
.append("neo2").append(",")
.append(20);
// json
/*String json = "" +
"[\n" +
" {\n" +
" \"siteid\": 2,\n" +
" \"city\": 2,\n" +
" \"username\": \"ls_json\",\n" +
" \"pv\": 30\n" +
" }\n" +
"]";
sb.append(json);*/
return sb.toString();
}
}
- DorisSink
import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DorisSink extends RichSinkFunction<Object> {
private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private DorisStreamLoad dorisStreamLoad;
private String columns;
private String jsonFormat;
public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
this.dorisStreamLoad = dorisStreamLoad;
this.columns = columns;
this.jsonFormat = jsonFormat;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
/**
* 判断StreamLoad是否成功
* @param respContent streamload返回的响应信息(JSON格式)
* @return
*/
public static Boolean checkStreamLoadStatus(RespContent respContent) {
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
&& respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
return true;
} else {
return false;
}
}
@Override
public void invoke(Object value, Context context) throws Exception {
// DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value);
if (loadResponse != null && loadResponse.status == 200) {
RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
if (!checkStreamLoadStatus(respContent)) {
log.error("Stream Load fail{}:", loadResponse);
}
} else {
log.error("Stream Load Request failed:{}", loadResponse);
}
}
}
- httpClient + json
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.UUID;
public class DorisStreamLoader {
// 1. 对于公有云公户,这里填写 Compute Node 地址以及 HTTP 协议访问端口(8040)。
// 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
private final static String HOST = "191.168.1.192";
private final static int PORT = 8040;
private final static String DATABASE = "test"; // 要导入的数据库
private final static String TABLE = "site_visit"; // 要导入的表
private final static String USER = "test"; // Doris 用户名
private final static String PASSWD = "test"; // Doris 密码
private final static String LOAD_FILE_NAME = "/Users/test.json"; // 要导入的本地文件路径
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// 如果连接目标是 FE,则需要处理 307 redirect。
return true;
}
});
public void loadJsonFile(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
put.setHeader("format", "json");
put.setHeader("label","label");
put.setHeader("column_separator",",");
// strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
put.setHeader("strip_outer_array", "true");
// 设置导入文件。
// 这里也可以使用 StringEntity 来传输任意数据。
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
public void loadJson(String jsonData) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
Calendar calendar = Calendar.getInstance();
String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
put.setHeader("format", "json");
put.setHeader("label", label);
put.setHeader("column_separator", ",");
// strip_outer_array: 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false
put.setHeader("strip_outer_array", "true");
// 设置导入文件。
// 这里也可以使用 StringEntity 来传输任意数据。
StringEntity entity = new StringEntity(jsonData);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public static void main(String[] args) throws Exception{
DorisStreamLoader loader = new DorisStreamLoader();
File file = new File(LOAD_FILE_NAME);
loader.loadJsonFile(file);
}
}
stream load导入中的问题
【注意】
1,端口号用的可能不对,9030是mysql协议,8030是http协议
2,BE和FE都支持Http协议连接,使用FE出现重定向问题,可以连接BE的http端口
参考
【1】Doris 数据导入- Stream Load 通过 http 协议进行流式数据导入:https://blog.csdn.net/weixin_43161811/article/details/107241337
【2】Apache Doris Stream load 数据导入方式:https://www.jianshu.com/p/01e47ae333d8
【3】https://www.kancloud.cn/dorisdb/dorisdb/2146002
【4】Apache Doris:https://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html
【5】 Apache Doris 官网 编译与部署
【6】【Doris全面解析】Doris Stream Load原理解析:https://zhuanlan.zhihu.com/p/390042763
【7】百度数据仓库 DORIS(百度公有云文档,含重定向问题解决)
【8】Apache Doris Stream load 数据导入方式:https://blog.csdn.net/hf200012/article/details/120459111 (最佳实践)
【9】Apache Doris 使用sream load导入本地json文件:https://blog.csdn.net/u011095039/article/details/120842693