java
package fm.lizhi.audio.rtc.common.data.analysis.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class FeishuTalk {
// 图片推送凭证,需要申请
private static final String app_id = "";
private static final String app_secret = "";
// 机器人webhook
private static final String webhook_url = "";
private String token;
public FeishuTalk() {
String url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal";
String payloadData = "{\"app_id\": \"" + app_id + "\", \"app_secret\": \"" + app_secret + "\"}";
String contentType = "application/json; charset=utf-8";
try {
HttpClient httpClient = HttpClientBuilder.create().build();
HttpPost httpPost = new HttpPost(url);
httpPost.setHeader("Content-Type", contentType);
httpPost.setEntity(new StringEntity(payloadData));
HttpResponse response = httpClient.execute(httpPost);
String responseBody = EntityUtils.toString(response.getEntity());
JSONObject json = JSON.parseObject(responseBody);
this.token = json.getString("tenant_access_token");
} catch (IOException e) {
e.printStackTrace();
}
}
public void uploadImageByBinarySource(byte[] imageData) {
try {
HttpClient httpClient = HttpClientBuilder.create().build();
HttpPost httpPost = new HttpPost("https://open.feishu.cn/open-apis/im/v1/images");
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
builder.addTextBody("image_type", "message");
builder.addBinaryBody("image", imageData, ContentType.DEFAULT_BINARY, "image.jpg");
HttpEntity multipartEntity = builder.build();
httpPost.setEntity(multipartEntity);
httpPost.setHeader("Authorization", "Bearer " + this.token);
HttpResponse response = httpClient.execute(httpPost);
String responseBody = EntityUtils.toString(response.getEntity());
JSONObject json = JSON.parseObject(responseBody);
String imageKey = json.getJSONObject("data").getString("image_key");
System.out.println("image_key: " + imageKey);
JSONObject form = new JSONObject();
form.put("msg_type", "image");
JSONObject content = new JSONObject();
content.put("image_key", imageKey);
form.put("content", content);
HttpPost webhookPost = new HttpPost(webhook_url);
webhookPost.setHeader("Authorization", "Bearer " + this.token);
webhookPost.setEntity(new StringEntity(form.toString(), ContentType.APPLICATION_JSON));
HttpResponse webhookResponse = httpClient.execute(webhookPost);
String webhookResponseBody = EntityUtils.toString(webhookResponse.getEntity());
System.out.println("Webhook Response: " + webhookResponseBody);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
FeishuTalk feishuTalk = new FeishuTalk();
Path filePath = Paths.get("/Users/lishuaikai/Documents/myProject/rtc_common_data_analysis/rtc_common_data_analysis-provider/src/main/java/fm/lizhi/audio/rtc/common/data/analysis/utils/test.png");
byte[] fileData = Files.readAllBytes(filePath);
feishuTalk.uploadImageByBinarySource(fileData);
}
}
python
class FeishuTalk:
app_id = ""
app_secret = ""
# 机器人webhook
webhook_url = ''
def __init__(self):
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {
"Content-Type": "application/json; charset=utf-8",
}
payload_data = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
response = requests.post(url=url, data=json.dumps(payload_data), headers=headers).json()
self.token = response['tenant_access_token']
def upload_image_by_binary_source(self, image_data):
image_key_headers = {
'Authorization': 'Bearer ' + self.token,
}
get_image_key_url = "https://open.feishu.cn/open-apis/im/v1/images"
form = {'image_type': 'message',
'image': image_data}
multi_form = MultipartEncoder(form)
print(multi_form.content_type)
image_key_headers['Content-Type'] = multi_form.content_type
response = requests.request("POST", get_image_key_url, headers=image_key_headers, data=multi_form).json()
# print(response.headers['X-Tt-Logid']) # for debug or oncall
image_key = response['data']['image_key']
print("image_key:", image_key)
url = self.webhook_url
form = {'msg_type': 'image',
'content':
{"image_key": image_key}
}
headers = {
'Authorization': 'Bearer ' + self.token
}
response = requests.post(url=url, data=json.dumps(form), headers=headers)
return response.json()