配置文件
Spring:
kafka:
listener:
#设置是否批量消费,默认 single(单条),batch(批量)
type: single
# 集群地址
#bootstrap-servers: kafka:XXX
bootstrap-servers: 192.168.XXX.XXX:XXX
# 生产者配置
producer:
# 重试次数
retries: 3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: all
# 批量处理的最大大小 单位 byte
batch-size: 4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
buffer-memory: 33554432
# 客户端ID
client-id: hello-kafka
# Key 序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
compression-type: gzip
properties:
partitioner:
#指定自定义分区器###
class: com.rd.gateway.config.MyPartitioner
linger:
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
ms: 1000
max:
block:
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
ms: 6000
# 消费者配置
consumer:
# 默认消费者组
group-id: GatewayGroup
# 自动提交 offset 默认 true
enable-auto-commit: false
# 自动提交的频率 单位 ms
auto-commit-interval: 1000
# 批量消费最大数量
max-poll-records: 100
# Key 反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: latest
# properties:
# interceptor:
# classes: com.example.demo.service.MyConsumerInterceptor
# session:
# timeout:
# # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
# ms: 120000
# request:
# timeout:
# # 请求超时
# ms: 120000
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
logging:
config: classpath:logback.xml
logback.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志文件路径 -->
<property name="logPath" value="C://Users//wangw//Desktop//aliyun-tts//"/>
<!-- 日志文件名称 -->
<property name="logName" value="sp-ipage-test"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger Line:%-3L - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- debug 日志文件 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文档的路径及文档名 -->
<file>${logPath}${logName}.log</file>
<!--日志文档输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level --- [%thread] %logger Line:%-3L - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${logPath}${logName}-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文档保留天数-->
<maxHistory>10</maxHistory>
</rollingPolicy>
</appender>
<appender name="KAFKA_APPENDER" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout">
<!--开启的话会包含hostname等logback的context信息-->
<includeContext>true</includeContext>
<!--是否包含日志来源-->
<includeCallerData>true</includeCallerData>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</layout>
<charset>UTF-8</charset>
</encoder>
<!--kafka topic 需要与配置文件里面的topic一致 否则kafka不认识-->
<topic>gatewayLogs</topic>
<!--主键分区策略-->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
<!--kafka消息提交策略,logback-kafka-appender为我们提供了两种策略,
异步提交策略(AsynchronousDeliveryStrategy)
阻塞提交策略(BlockingDeliveryStrategy)
-->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<!--bootstrap.servers 为kafka 部署地址,服务端需要使用对应的IP地址,不能使用localhost -->
<producerConfig>bootstrap.servers=192.168.XXX.XXX:XXX</producerConfig>
</appender>
<appender name="kafkaAppenderAsync" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="KAFKA_APPENDER"/>
</appender>
<!--记录行为日志到 kafka-->
<logger name="KafkaPipeline" level="INFO">
<appender-ref ref="kafkaAppenderAsync"/>
</logger>
<!-- 开发、测试环境,额外指定不同包下不同的日志等级 -->
<springProfile name="dev,test">
<logger name="org.springframework.web" level="ERROR">
</logger>
<logger name="org.springboot.sample" level="ERROR">
</logger>
<logger name="com.ipage.work" level="INFO">
</logger>
</springProfile>
<!-- 生产环境 -->
<springProfile name="prod">
<logger name="org.springframework.web" level="ERROR">
</logger>
<logger name="org.springboot.sample" level="ERROR">
</logger>
<logger name="com.ipage.work" level="INFO">
</logger>
</springProfile>
<logger name="org.springframework.web" level="INFO"/>
<logger name="org.springboot.sample" level="TRACE"/>
<!-- 基础日志等级 -->
<root level="INFO">
<appender-ref ref="FILE"/>
<appender-ref ref="CONSOLE"/>
<appender-ref ref="kafkaAppenderAsync"/>
</root>
</configuration>
常用变量
//日志前缀
String GATEWAY_LOG_CS = "gatewayLog参数#:";
实体类
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
public class LogUserInfo {
@ApiModelProperty("用户id")
private Long userId;
@ApiModelProperty("请求类型:request/reponse")
private String type;
@ApiModelProperty("请求类id")
private String requestId;
@ApiModelProperty("请求类型:get/post")
private String method;
@ApiModelProperty("请求路径")
private String url;
@ApiModelProperty("传输数据体")
private String content;
}
HttpRequestFilter
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rd.common.constant.CommonConstant;
import com.rd.common.constant.Constant;
import com.rd.common.constant.TokenConstants;
import com.rd.common.model.LogUserInfo;
import com.rd.common.model.ResultInfoEnum;
import com.rd.common.utils.JwtUtils;
import io.jsonwebtoken.Claims;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
/**
* @author
* @date 2023/2/3 - 10:54
* @描述 请求参数日志打印
*/
@Component
@Slf4j
@AllArgsConstructor
public class HttpRequestFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);
//裁剪前缀 token是否合法
if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {
token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
}
if(StringUtils.isBlank(token)){
return unauthorizedResponse(exchange, "令牌不能为空");
}
//解析token
Claims claims = JwtUtils.parseToken(token);
if (claims == null) {
return unauthorizedResponse(exchange, "令牌已过期或验证不正确!");
}
//用户校验空
String userid = JwtUtils.getUserId(claims);
String method = request.getMethodValue();
String contentType = request.getHeaders().getFirst("Content-Type");
String url = JSONObject.toJSONString(request.getURI());
log.info("网关请求路径: "+url);// 请求路径打印
if ("POST".equals(method)) {
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
try {
String bodyString = new String(bytes, "utf-8");
//log.info("请求参数: "+bodyString);//打印请求参数
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("request");
logUserInfo.setRequestId(request.getId());
logUserInfo.setMethod("post");
logUserInfo.setContent(bodyString);
log.info(CommonConstant.GATEWAY_LOG_CS+ JSON.toJSONString(logUserInfo));
exchange.getAttributes().put("POST_BODY", bodyString);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory()
.wrap(bytes);
return Mono.just(buffer);
});
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest)
.build());
});
}else if ("GET".equals(method)) {
MultiValueMap<String, String> queryParams = request.getQueryParams();
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("request");
logUserInfo.setRequestId(request.getId());
logUserInfo.setMethod("get");
logUserInfo.setContent(JSON.toJSONString(queryParams));
log.info(CommonConstant.GATEWAY_LOG_CS+JSONObject.toJSONString(logUserInfo));
//log.info("请求参数:" + queryParams);
log.info("****************************************************************************\n");
return chain.filter(exchange);
}
//get请求无效,拿不到参数
log.info("****************************************************************************\n");
return chain.filter(exchange);
}
@Override
public int getOrder() {
return -200;
}
private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {
log.error("[鉴权异常处理]请求路径:{}", exchange.getRequest().getPath());
return webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);
}
public static Mono<Void> webFluxResponseWriter(ServerHttpResponse response, String contentType, HttpStatus status) {
return response.writeWith(Mono.just(getDataBuffer(response, contentType)));
}
public static DataBuffer getDataBuffer(ServerHttpResponse response, String value) {
JSONObject json = new JSONObject();
json.put("code", ResultInfoEnum.USER_INFO_ERROR.getCode());
json.put("msg", value);
byte[] bytes = json.toJSONString().getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}
}
WrapperResponseGlobalFilter
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.base.Joiner;
import com.alibaba.nacos.shaded.com.google.common.base.Throwables;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.rd.common.constant.CommonConstant;
import com.rd.common.constant.TokenConstants;
import com.rd.common.model.LogUserInfo;
import com.rd.common.model.ResultInfoEnum;
import com.rd.common.utils.JwtUtils;
import com.rd.common.utils.StringUtils;
import io.jsonwebtoken.Claims;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
/**
* @author
* @date 2023/2/3 - 10:54
* @描述 返回参数日志打印
*/
@Slf4j
@Component
public class WrapperResponseGlobalFilter implements GlobalFilter, Ordered {
@Override
public int getOrder() {
// -1 is response write filter, must be called before that
return -2;
}
private static Joiner joiner = Joiner.on("");
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);
String url = JSONObject.toJSONString(request.getURI());
//裁剪前缀 token是否合法
if (org.apache.commons.lang3.StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {
token = token.replaceFirst(TokenConstants.PREFIX, org.apache.commons.lang3.StringUtils.EMPTY);
}
if(org.apache.commons.lang3.StringUtils.isBlank(token)){
return unauthorizedResponse(exchange, "令牌不能为空");
}
//解析token
Claims claims = JwtUtils.parseToken(token);
if (claims == null) {
return unauthorizedResponse(exchange, "令牌已过期或验证不正确!");
}
//用户校验空
String userid = JwtUtils.getUserId(claims);
ServerHttpResponseDecorator response = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (getStatusCode().equals(HttpStatus.OK) && body instanceof Flux) {
// 获取ContentType,判断是否返回JSON格式数据
String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (StringUtils.isNotBlank(originalResponseContentType) && originalResponseContentType.contains("application/json")) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
//(返回数据内如果字符串过大,默认会切割)解决返回体分段传输
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
List<String> list = Lists.newArrayList();
dataBuffers.forEach(dataBuffer -> {
try {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
DataBufferUtils.release(dataBuffer);
list.add(new String(content, "utf-8"));
} catch (Exception e) {
log.info("加载Response字节流异常,失败原因:{}", Throwables.getStackTraceAsString(e));
}
});
String responseData = joiner.join(list);
//log.info("返回参数responseData:"+responseData);
//System.out.println("responseData:"+responseData);
LogUserInfo logUserInfo = new LogUserInfo();
logUserInfo.setUserId(Long.valueOf(userid));
logUserInfo.setUrl(url);
logUserInfo.setType("response");
logUserInfo.setRequestId(request.getId());
logUserInfo.setContent(responseData);
String jsonString = JSON.toJSONString(logUserInfo);
log.info(CommonConstant.GATEWAY_LOG_CS+ jsonString);
//jsonString = "返回参数: " + jsonString;
//LogUserInfo info = JSON.parseObject(jsonString.substring(5, jsonString.length()), LogUserInfo.class);
byte[] uppedContent = new String(responseData.getBytes(), Charset.forName("UTF-8")).getBytes();
originalResponse.getHeaders().setContentLength(uppedContent.length);
return bufferFactory.wrap(uppedContent);
}));
}
}
return super.writeWith(body);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
}
};
return chain.filter(exchange.mutate().response(response).build());
}
private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {
log.error("[鉴权异常处理]请求路径:{}", exchange.getRequest().getPath());
return webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);
}
public static Mono<Void> webFluxResponseWriter(ServerHttpResponse response, String contentType, HttpStatus status) {
return response.writeWith(Mono.just(getDataBuffer(response, contentType)));
}
public static DataBuffer getDataBuffer(ServerHttpResponse response, String value) {
JSONObject json = new JSONObject();
json.put("code", ResultInfoEnum.USER_INFO_ERROR.getCode());
json.put("msg", value);
byte[] bytes = json.toJSONString().getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}
}
MyPartitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*
* @author zzkk
* @create 2021/5/26 13:40
**/
public class MyPartitioner implements Partitioner {
/**
* 分区策略核心方法
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//具体分区逻辑,这里全部发送到0号分区
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
//————————————————
//
//版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
//
//原文链接:https://blog.csdn.net/qq_39091806/article/details/129956913
依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--logstash 整合logback-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--logback 整合 kafka-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.11</version>
</dependency>