Spring Cloud Netflix Hystrix
断路器论文图:
服务短路(CircuitBreaker)
QPS: Query Per Second
TPS: Transaction Per Second
QPS: 经过全链路压测,计算单机极限QPS,集群 QPS = 单机 QPS * 集群机器数量 * 可靠性比率
全链路压测 除了压 极限QPS,还有错误数量
全链路:一个完整的业务流程操作
JMeter:可调整型比较灵活
Spring Cloud Hystrix Client
官网:https://github.com/Netflix/Hystrix
Reactive Java 框架:
- Java 9 Flow API
- Reactor
- RxJava(Reactive X)
注意:Hystrix 可以是服务端实现,也可以是客户端实现,类似于 AOP 封装:正常逻辑、容错处理。
激活 Hystrix
通过 @EnableHystrix
激活
Hystrix 配置信息wiki:https://github.com/Netflix/Hystrix/wiki/Configuration
Hystrix
注解方式(Annotation)
@RestController
public class HystrixDemoController {
private final static Random random = new Random();
/**
* 当{@link #helloWorld} 方法调用超时或失败时,
* fallback 方法{@link #errorContent()}作为替代返回
*
* @return
*/
@GetMapping("hello-world")
@HystrixCommand(
fallbackMethod = "errorContent",
commandProperties = {
// Hystrix配置
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",
value = "100")
}
)
public String helloWorld() throws Exception {
// 如果随机时间 大于 100 ,那么触发容错
int value = random.nextInt(200);
System.out.println("helloWorld() costs "+value+" ms.");
Thread.sleep(value);
return "Hello,World";
}
public String errorContent() {
return "Fault";
}
}
编程方式
/**
* 当{@link #helloWorld} 方法调用超时或失败时,
* fallback 方法{@link #errorContent()}作为替代返回
*
* @return
*/
@GetMapping("hello-world-2")
public String helloWorld2() {
return new HelloWorldCommand().execute();
}
/**
* 编程方式
*/
private class HelloWorldCommand extends com.netflix.hystrix.HystrixCommand<String> {
protected HelloWorldCommand() {
super(HystrixCommandGroupKey.Factory.asKey("HelloWorld"),
100);
}
@Override
protected String run() throws Exception {
// 如果随机时间 大于 100 ,那么触发容错
int value = random.nextInt(200);
System.out.println("helloWorld() costs " + value + " ms.");
Thread.sleep(value);
return "Hello,World";
}
//容错执行
protected String getFallback() {
return HystrixDemoController.this.errorContent();
}
}
对比 其他 Java 执行方式:
Future
public class FutureDemo {
public static void main(String[] args) {
Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(() -> { // 正常流程
// 如果随机时间 大于 100 ,那么触发容错
int value = random.nextInt(200);
System.out.println("helloWorld() costs " + value + " ms.");
Thread.sleep(value);
return "Hello,World";
});
try {
future.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// 超时流程
System.out.println("超时保护!");
}
service.shutdown();
}
}
RxJava
public class RxJavaDemo {
public static void main(String[] args) {
Random random = new Random();
Single.just("Hello,World") // just 发布数据
.subscribeOn(Schedulers.immediate()) // 订阅的线程池 immediate = Thread.currentThread()
.subscribe(new Observer<String>() {
@Override
public void onCompleted() { // 正常结束流程
System.out.println("执行结束!");
}
@Override
public void onError(Throwable e) { // 异常流程(结束)
System.out.println("熔断保护!");
}
@Override
public void onNext(String s) { // 数据消费 s = "Hello,World"
// 如果随机时间 大于 100 ,那么触发容错
int value = random.nextInt(200);
if (value > 100) {
throw new RuntimeException("Timeout!");
}
System.out.println("helloWorld() costs " + value + " ms.");
}
});
}
}
Health Endpoint(/health
)
{
status: "UP",
diskSpace: {
status: "UP",
total: 500096983040,
free: 304113217536,
threshold: 10485760
},
refreshScope: {
status: "UP"
},
hystrix: {
status: "UP"
}
}
激活熔断保护
@EnableCircuitBreaker
激活 :@EnableHystrix
+ Spring Cloud 功能
@EnableHystrix
激活,没有一些 Spring Cloud 功能,如 /hystrix.stream
端点
Hystrix Endpoint(/hystrix.stream
)
data: {
"type": "HystrixThreadPool",
"name": "HystrixDemoController",
"currentTime": 1509545957972,
"currentActiveCount": 0,
"currentCompletedTaskCount": 14,
"currentCorePoolSize": 10,
"currentLargestPoolSize": 10,
"currentMaximumPoolSize": 10,
"currentPoolSize": 10,
"currentQueueSize": 0,
"currentTaskCount": 14,
"rollingCountThreadsExecuted": 5,
"rollingMaxActiveThreads": 1,
"rollingCountCommandRejections": 0,
"propertyValue_queueSizeRejectionThreshold": 5,
"propertyValue_metricsRollingStatisticalWindowInMilliseconds": 10000,
"reportingHosts": 1
}
Spring Cloud Hystrix Dashboard
激活
@EnableHystrixDashboard
@SpringBootApplication
@EnableHystrixDashboard
public class SpringCloudHystrixDashboardDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudHystrixDashboardDemoApplication.class, args);
}
}
Spring Cloud Feign
申明式 Web 服务客户端:Feign
申明式:接口声明、Annotation 驱动
Web 服务:HTTP 的方式作为通讯协议
客户端:用于服务调用的存根
Feign:原生并不是 Spring Web MVC的实现,基于JAX-RS(Java REST 规范)实现。Spring Cloud 封装了Feign ,使其支持 Spring Web MVC。RestTemplate
、HttpMessageConverter
RestTemplate
以及 Spring Web MVC 可以显示地自定义HttpMessageConverter
实现。
假设,有一个Java 接口 PersonService
, Feign 可以将其声明它是以 HTTP 方式调用的。
注册中心(Eureka Server):服务发现和注册
a. 应用名称:spring-cloud-eureka-server
b. 服务端口:12345
application.properties:
spring.application.name = spring-cloud-eureka-server
## Eureka 服务器端口
server.port =12345
### 取消服务器自我注册
eureka.client.register-with-eureka=false
### 注册中心的服务器,没有必要再去检索服务
eureka.client.fetch-registry = false
management.security.enabled = false
Feign 声明接口(契约):定义一种 Java 强类型接口
模块:person-api
PersonService
@FeignClient(value = "person-service") // 服务提供方应用的名称
public interface PersonService {
/**
* 保存
*
* @param person {@link Person}
* @return 如果成功,<code>true</code>
*/
@PostMapping(value = "/person/save")
boolean save(@RequestBody Person person);
/**
* 查找所有的服务
*
* @return
*/
@GetMapping(value = "/person/find/all")
Collection<Person> findAll();
}
Feign 客户(服务消费)端:调用Feign 申明接口
应用名称:person-client
依赖:person-api
创建客户端 Controller
@RestController
public class PersonClientController implements PersonService {
private final PersonService personService;
@Autowired
public PersonClientController(PersonService personService) {
this.personService = personService;
}
@Override
public boolean save(@RequestBody Person person) {
return personService.save(person);
}
@Override
public Collection<Person> findAll() {
return personService.findAll();
}
}
创建启动类
package com.gupao.spring.cloud.feign.client;
import com.gupao.spring.cloud.feign.api.service.PersonService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(clients = PersonService.class)
public class PersonClientApplication {
public static void main(String[] args) {
SpringApplication.run(PersonClientApplication.class,args);
}
}
配置 application.properties
spring.application.name = person-client
server.port = 8080
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
management.security.enabled = false
Feign 服务(服务提供)端:不一定强制实现 Feign 申明接口
应用名称:person-service
依赖:person-api
创建 PersonServiceController
@RestController
public class PersonServiceProviderController {
private final Map<Long, Person> persons = new ConcurrentHashMap<>();
/**
* 保存
*
* @param person {@link Person}
* @return 如果成功,<code>true</code>
*/
@PostMapping(value = "/person/save")
public boolean savePerson(@RequestBody Person person) {
return persons.put(person.getId(), person) == null;
}
/**
* 查找所有的服务
*
* @return
*/
@GetMapping(value = "/person/find/all")
public Collection<Person> findAllPersons() {
return persons.values();
}
}
创建服务端应用
@SpringBootApplication
@EnableEurekaClient
public class PersonServiceProviderApplication {
public static void main(String[] args) {
SpringApplication.run(PersonServiceProviderApplication.class,args);
}
}
配置 application.properties
## 提供方的应用名称需要和 @FeignClient 声明对应
spring.application.name = person-service
## 提供方端口 9090
server.port = 9090
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
## 关闭管理安全
management.security.enabled = false
Feign 客户(服务消费)端、Feign 服务(服务提供)端 以及 Feign 声明接口(契约) 存放在同一个工程目录。
调用顺序
PostMan -> person-client -> person-service
person-api 定义了 @FeignClients(value="person-service") , person-service 实际是一个服务器提供方的应用名称。
person-client 和 person-service 两个应用注册到了Eureka Server
person-client 可以感知 person-service 应用存在的,并且 Spring Cloud 帮助解析 PersonService
中声明的应用名称:“person-service”,因此 person-client 在调用 ``PersonService服务时,实际就路由到 person-service 的 URL
整合 Netflix Ribbon
官方参考文档:http://cloud.spring.io/spring-cloud-static/Dalston.SR4/single/spring-cloud.html#spring-cloud-ribbon
关闭 Eureka 注册
Ribbon可以不使用eureka
调整 person-client 关闭 Eureka
ribbon.eureka.enabled = false
定义服务 ribbon 的服务列表(服务名称:person-service)
由于关闭了eureka,需要手动配置服务列表
person-service.ribbon.listOfServers = http://localhost:9090,http://localhost:9090,http://localhost:9090
完全取消 Eureka 注册
//@EnableEurekaClient //注释 @EnableEurekaClient
自定义 Ribbon 的规则
接口和 Netflix 内部实现
- IRule
- 随机规则:RandomRule
- 最可用规则:BestAvailableRule
- 轮训规则:RoundRobinRule
- 重试实现:RetryRule
- 客户端配置:ClientConfigEnabledRoundRobinRule
- 可用性过滤规则:AvailabilityFilteringRule
- RT权重规则:WeightedResponseTimeRule
- 规避区域规则:ZoneAvoidanceRule
实现 IRule
public class FirstServerForeverRule extends AbstractLoadBalancerRule {
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
@Override
public Server choose(Object key) {
ILoadBalancer loadBalancer = getLoadBalancer();
List<Server> allServers = loadBalancer.getAllServers();
return allServers.get(0);
}
}
暴露自定义实现为 Spring Bean
@Bean
public FirstServerForeverRule firstServerForeverRule(){
return new FirstServerForeverRule();
}
激活这个配置
@SpringBootApplication
//@EnableEurekaClient
@EnableFeignClients(clients = PersonService.class)
@RibbonClient(value = "person-service", configuration = PersonClientApplication.class)
public class PersonClientApplication {
public static void main(String[] args) {
SpringApplication.run(PersonClientApplication.class, args);
}
@Bean
public FirstServerForeverRule firstServerForeverRule() {
return new FirstServerForeverRule();
}
}
检验结果
通过调试可知:
ILoadBalancer loadBalancer = getLoadBalancer();
// 返回三个配置 Server,即:
// person-service.ribbon.listOfServers = \
// http://localhost:9090,http://localhost:9090,http://localhost:9090
List<Server> allServers = loadBalancer.getAllServers();
return allServers.get(0);
再次测试还原 Eureka注册的结果
注释掉关闭eruka的配置
注册三台服务提供方服务器:
PERSON-SERVICE n/a (3) (3) UP (3) - 192.168.1.103:person-service:9090 , 192.168.1.103:person-service:9094 , 192.168.1.103:person-service:9097
整合 Netflix Hystrix
调整 Feign 接口
@FeignClient(value = "person-service",fallback = PersonServiceFallback.class) // 服务提供方应用的名称
public interface PersonService {
/**
* 保存
*
* @param person {@link Person}
* @return 如果成功,<code>true</code>
*/
@PostMapping(value = "/person/save")
boolean save(@RequestBody Person person);
/**
* 查找所有的服务
*
* @return
*/
@GetMapping(value = "/person/find/all")
Collection<Person> findAll();
}
添加 Fallback 实现
public class PersonServiceFallback implements PersonService {
@Override
public boolean save(Person person) {
return false;
}
@Override
public Collection<Person> findAll() {
return Collections.emptyList();
}
}
调整客户端(激活Hystrix)
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(clients = PersonService.class)
@EnableHystrix
//@RibbonClient(value = "person-service", configuration = PersonClientApplication.class)
public class PersonClientApplication {
public static void main(String[] args) {
SpringApplication.run(PersonClientApplication.class, args);
}
@Bean
public FirstServerForeverRule firstServerForeverRule() {
return new FirstServerForeverRule();
}
}
问答部分
-
能跟dubbo一样,消费端像调用本地接口方法一样调用服务端提供的服务么?还有就是远程调用方法参数对象不用实现序列化接口么?
答: FeignClient 类似 Dubbo,不过需要增加以下 @Annotation,和调用本地接口类似
-
Feign通过注释驱动弱化了调用Service细节,但是Feign的Api设定会暴露service地址,那还有实际使用价值么?
答:实际价值是存在的,Feign API 暴露 URI,比如:"/person/save"
-
整合ribbon不是一定要关闭注册中心吧?
答: Ribbon 对于 Eureka 是不强依赖,不过也不排除
-
生产环境上也都是feign的?
答:据我所知,不少的公司在用,需要 Spring Cloud 更多整合:
Feign 作为客户端
Ribbon 作为负载均衡
Eureka 作为注册中心
Zuul 作为网管
Security 作为安全 OAuth 2 认证
-
Ribbon直接配置在启动类上是作用所有的controller,那如果想作用在某个呢?
答:Ribbon 是控制全局的负载均衡,主要作用于客户端 Feign,Controller是调用 Feign 接口,可能让人感觉直接作用了 Controller。
-
其实eureka也有ribbon中简单的负载均衡吧
答:Eureka 也要 Ribbon 的实现,可以参考
com.netflix.ribbon:ribbon-eureka
-
如果服务提供方,没有接口,我客户端一般咋处理?要根据服务信息,自建feign接口?
答:当然可以,可是 Feign 的接口定义就是要求强制实现
-
无法连接注册中心的老服务,如何调用cloud服务
答:可以通过域名的配置 Ribbon 服务白名单
-
eureka 有时监控不到宕机的服务 正确的启动方式是什么
答:这可以调整的心跳检测的频率
Spring Cloud Zuul
Zuul 基本使用
@EnableEurekaClient
@EnableDiscoveryClient
Nginx + Lua
Lua:控制规则(A/B Test)
Spring Cloud 学习技巧:
善于定位应用:Feign、Config Server、Eureka、Zuul 、Ribbon定位应用,配置方式是不同
增加 @EnableZuulProxy
@SpringBootApplication
@EnableZuulProxy
public class SpringCloudZuulDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudZuulDemoApplication.class, args);
}
}
配置路由规则
基本模式:zuul.routes.${app-name} = /${app-url-prefix}/**
整合 Ribbon
启动应用
调用链路
zuul -> person-service
配置方式
## Zuul 服务端口
server.port = 7070
## Zuul 基本配置模式
# zuul.routes.${app-name}: /${app-url-prefix}/**
## Zuul 配置 person-service 服务调用
zuul.routes.person-service = /person-service/**
## Ribbon 取消 Eureka 整合
ribbon.eureka.enabled = false
## 配置 "person-service" 的负载均衡服务器列表
person-service.ribbon.listOfServers = \
http://localhost:9090
注意:http://localhost:7070/person-service/person/find/all
person-service 的 app-url-prefix : /person-service/
/person/find/all 是 person-service 具体的 URI
整合 Eureka
引入 spring-cloud-starter-eureka 依赖
<!-- 增加 Eureka 客户端的依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
激活服务注册、发现客户端
@SpringBootApplication
@EnableZuulProxy
@EnableDiscoveryClient
public class SpringCloudZuulDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudZuulDemoApplication.class, args);
}
}
配置服务注册、发现客户端
## 设置应用名称
spring.application.name = spring-cloud-zuul
## Zuul 服务端口
server.port = 7070
## Zuul 基本配置模式
# zuul.routes.${app-name}: /${app-url-prefix}/**
## Zuul 配置 person-service 服务调用
zuul.routes.person-service = /person-service/**
## 整合 Eureka
## 目标应用的serviceId = person-service
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
### Ribbon 取消 Eureka 整合
#ribbon.eureka.enabled = false
### 配置 "person-service" 的负载均衡服务器列表
#person-service.ribbon.listOfServers = \
# http://localhost:9090
整合 Hystrix
服务端提供方:person-service
激活 Hystrix
@SpringBootApplication
@EnableEurekaClient
@EnableHystrix
public class PersonServiceProviderApplication {
public static void main(String[] args) {
SpringApplication.run(PersonServiceProviderApplication.class,args);
}
}
配置 Hystrix 规则
@RestController
public class PersonServiceProviderController {
private final Map<Long, Person> persons = new ConcurrentHashMap<>();
private final static Random random = new Random();
/**
* 保存
*
* @param person {@link Person}
* @return 如果成功,<code>true</code>
*/
@PostMapping(value = "/person/save")
public boolean savePerson(@RequestBody Person person) {
return persons.put(person.getId(), person) == null;
}
/**
* 查找所有的服务
*
* @return
*/
@GetMapping(value = "/person/find/all")
@HystrixCommand(fallbackMethod = "fallbackForFindAllPersons",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",
value = "100")
}
)
public Collection<Person> findAllPersons() throws Exception {
// 如果随机时间 大于 100 ,那么触发容错
int value = random.nextInt(200);
System.out.println("findAllPersons() costs " + value + " ms.");
Thread.sleep(value);
return persons.values();
}
/**
* {@link #findAllPersons()} Fallback 方法
*
* @return 返回空集合
*/
public Collection<Person> fallbackForFindAllPersons() {
System.err.println("fallbackForFindAllPersons() is invoked!");
return Collections.emptyList();
}
}
整合 Feign
服务消费端:person-client
调用链路
spring-cloud-zuul -> person-client -> person-service
person-client 注册到 EurekaServer
端口信息:
spring-cloud-zuul 端口:7070
person-client 端口:8080
person-service 端口:9090
Eureka Server 端口:12345
spring.application.name = person-client
server.port = 8080
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
management.security.enabled = false
网关应用:spring-cloud-zuul
增加路由应用到 person-client
## Zuul 配置 person-client 服务调用
zuul.routes.person-client = /person-client/**
测试链路
http://localhost:7070/person-client/person/find/all
spring-cloud-zuul(7070) -> person-client(8080) -> person-service(9090)
等价的 Ribbon(不走注册中心)
## Ribbon 取消 Eureka 整合
ribbon.eureka.enabled = false
## 配置 "person-service" 的负载均衡服务器列表
person-service.ribbon.listOfServers = \
http://localhost:9090
## 配置 "person-client" 的负载均衡服务器列表
person-client.ribbon.listOfServers = \
http://localhost:8080
整合 Config Server
前面的例子展示 Zuul 、Hystrix、Eureka 以及 Ribbon 能力,可是配置相对是固定,真实线上环境需要一个动态路由,即需要动态配置。
配置服务器:spring-cloud-config-server
端口信息:
spring-cloud-zuul 端口:7070
person-client 端口:8080
person-service 端口:9090
Eureka Server 端口:12345
Config Server 端口:10000
调整配置项
### 配置服务器配置项
spring.application.name = spring-cloud-config-server
### 定义HTTP服务端口
server.port = 10000
### 本地仓库的GIT URI 配置
spring.cloud.config.server.git.uri = \
file:///${user.dir}/src/main/resources/configs
### 全局关闭 Actuator 安全
# management.security.enabled = false
### 细粒度的开放 Actuator Endpoints
### sensitive 关注是敏感性,安全
endpoints.env.sensitive = false
endpoints.health.sensitive = false
为 spring-cloud-zuul 增加配置文件
三个 profile 的配置文件:
- zuul.properties
- zuul-test.properties
- zuul-prod.properties
zuul.properties
## 应用 spring-cloud-zuul 默认配置项(profile 为空)
## Zuul 基本配置模式
# zuul.routes.${app-name}: /${app-url-prefix}/**
## Zuul 配置 person-service 服务调用
zuul.routes.person-service = /person-service/**
zuul-test.properties
## 应用 spring-cloud-zuul 默认配置项(profile == "test")
## Zuul 基本配置模式
# zuul.routes.${app-name}: /${app-url-prefix}/**
## Zuul 配置 person-client 服务调用
zuul.routes.person-client = /person-client/**
zuul-prod.properties
## 应用 spring-cloud-zuul 默认配置项(profile == "prod")
## Zuul 基本配置模式
# zuul.routes.${app-name}: /${app-url-prefix}/**
## Zuul 配置 person-service 服务调用
zuul.routes.person-service = /person-service/**
## Zuul 配置 person-client 服务调用
zuul.routes.person-client = /person-client/**
初始化 ${user.dir}/src/main/resources/configs 为 git 根目录
- 初始化
$ git init
Initialized empty Git repository in ${user.dir}/src/main/resources/configs/.git/
- 增加上述三个配置文件到 git 仓库
$ git add *.properties
- 提交到本地 git 仓库
$ git commit -m "Temp commit"
[master (root-commit) be85bb0] Temp commit
3 files changed, 21 insertions(+)
create mode 100644 zuul-prod.properties
create mode 100644 zuul-test.properties
create mode 100644 zuul.properties
以上操作为了让 Spring Cloud Git 配置服务器实现识别 Git 仓库,否则添加以上三个文件也没有效果。
注册到 Eureka 服务器
增加 spring-cloud-starter-eureka 依赖
<!-- 增加 Eureka 客户端的依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
激活服务注册、发现客户端
package com.gupao.springcloudconfigserverdemo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.context.refresh.ContextRefresher;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.Set;
@SpringBootApplication
@EnableConfigServer
@EnableDiscoveryClient
public class SpringCloudConfigServerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudConfigServerDemoApplication.class, args);
}
}
调整配置项
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
测试配置
http://localhost:10000/zuul/default
http://localhost:10000/zuul/test
http://localhost:10000/zuul/prod
配置网关服务:spring-cloud-zuul
端口信息:
spring-cloud-zuul 端口:7070
person-client 端口:8080
person-service 端口:9090
Eureka Server 端口:12345
增加 spring-cloud-starter-config 依赖
将之前:
zuul.routes.person-service
zuul.routes.person-client
配置注释
<!-- 增加 配置客户端的依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
创建 bootstrap.properties
配置 config 客户端信息
## 整合 Eureka
## Eureka Server 服务 URL,用于客户端注册
## application.properties 会继承bootstrap.properties 属性
## 因此,application.properties 没有必要配置 eureka.client.serviceUrl.defaultZone
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
### bootstrap 上下文配置
# 配置客户端应用名称: zuul , 可当前应用是 spring-cloud-zuul
spring.cloud.config.name = zuul
# profile 是激活配置
spring.cloud.config.profile = prod
# label 在Git中指的分支名称
spring.cloud.config.label = master
# 采用 Discovery client 连接方式
## 激活 discovery 连接配置项的方式
spring.cloud.config.discovery.enabled=true
## 配置 config server 应用名称
spring.cloud.config.discovery.serviceId = spring-cloud-config-server
测试链路
http://localhost:7070/person-client/person/find/all
spring-cloud-zuul -> person-client -> person-service
http://localhost:7070/person-service/person/find/all
spring-cloud-zuul -> person-service
abc.acme.com -> abc
def.acme.com -> def
需要自定义实现 ZuulFilter
通过 Groovy 来实现动态配置规则
问答部分
-
看下来过程是:通过url去匹配zuul中配置的serviceId然后没整合ribbon时,直接去eureka中找服务实例去调用,如果整合了ribbon时,直接从listofService中取得一个实例,然后调用返回,对不?
答:大致上可以这么理解,不过对应的listOfServicers 不只是单个实例,而可能是一个集群,主要可以配置域名。
-
为什么要先调用client而不直接调用server,还是不太理解
答:这个只是一个演示程序,client 在正式使用场景中,并不是一简单的调用,它可能是一个聚合服务。
-
zuul 是不是更多的作为业务网关
答:是的,很多企业内部的服务通过 Zuul 做个服务网关
-
渡劫RequestContext已经存在ThreadLocal中了,为什么还要使用ConcurrentHashMap?
答:
ThreadLocal
只能管当前线程,不能管理子线程,子线程需要使用InheritableThreadLocal
。ConcurrentHashMap
实现一下,如果上下文处于多线程线程的环境,比如传递到子线程。比如:T1 在管理 RequestContext,但是 T1 又创建了多个线程(t1、t2),这个时候,把上下文传递到了子线程 t1 和 t2 .
Java 的进程所对应的线程 main 线程(group:main),main 线程是所有子线程的父线程,main线程 T1 ,T1 又可以创建 t1 和 t2 :
@Override public Object run() { RequestContext ctx = RequestContext.getCurrentContext(); // T1 线程 ServiceExecutor executor = ...; executor.submit(new MyRunnable(ctx){ public void run(){ ctx // t1 线程 } }); return null; }
-
ZuulServlet已经管理了RequestContext的生命周期了,为什么ContextLifecycleFilter还要在做一遍?
答:
ZuulServelt
最终也会清理掉RequestContext:} finally { RequestContext.getCurrentContext().unset(); }
为什么
ContextLifecycleFilter
也这么干?} finally { RequestContext.getCurrentContext().unset(); }
不要忽略了
ZuulServletFilter
,也有这个处理:finally { RequestContext.getCurrentContext().unset(); }
RequestContext
是 任何 Servlet 或者 Filter 都能处理,那么为了防止不正确的关闭,那么ContextLifecycleFilter
相当于兜底操作,就是防止ThreadLocal
没有被remove 掉。ThreadLocal
对应了一个 Thread,那么是不是意味着者Thread 处理完了,那么ThreadLocal
也随之 GC?所有 Servlet 均采用线程池,因此,不清空的话,可能会出现意想不到的情况。除非,每次都异常!(这种情况也要依赖于线程池的实现)。
Spring Cloud Stream(上)
Kafka
官方网页
主要用途
- 消息中间件
- 流式计算处理
- 日志
- 。。。
下载地址
执行脚本目录 /bin
windows 执行脚本 在其单独的目录
快速上手
下载并且解压 kafka 压缩包
运行服务
以 Windows 为例,首先打开 cmd
:
- 启动
zookeeper
:
第一次使用,需要复制 config/zoo_sampe.cfg ,并且重命名为"zoo.cfg"
bin/zkServer.cmd
- 启动
kafka
:
bin/windows/kafka-server-start.bat
创建主题
bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao\
Created topic "gupao".
生产者发送消息
bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic gupao
>xiaomage
消费者:接受消息
bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao --from-beginning
xiaomage
同类产品比较
- ActiveMQ:JMS(Java Message Service) 规范实现
- RabbitMQ:AMQP(Advanced Message Queue Protocol)规范实现
- Kafka:并非某种规范实现,它灵活和性能相对是优势
使用 Kafka 标准 API
package com.gupao.springcloudstreamkafka.raw.api;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Kafka Producer Demo(使用原始API)
*
* @author 小马哥 QQ 1191971402
* @copyright 咕泡学院出品
* @since 2017/11/12
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 初始化配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
// 创建 Kafka Producer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
// 创建 Kakfa 消息 = ProducerRecord
String topic = "gupao";
Integer partition = 0;
Long timestamp = System.currentTimeMillis();
String key = "message-key";
String value = "gupao.com";
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(topic, partition, timestamp, key, value);
// 发放 Kakfa 消息
Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);
// 强制执行
metadataFuture.get();
}
}
Spring Kafka
官方文档
设计模式
Spring 社区对 data(spring-data
) 操作,有一个基本的模式, Template 模式:
- JDBC :
JdbcTemplate
- Redis :
RedisTemplate
- Kafka :
KafkaTemplate
- JMS :
JmsTemplate
- Rest:
RestTemplate
XXXTemplate 一定实现 XXXOpeations
KafkaTemplate 实现了 KafkaOperations
Maven 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot Kafka
Maven 依赖
自动装配器:KafkaAutoConfiguration
其中KafkaTemplate
会被自动装配:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
创建生产者
增加生产者配置
application.properties
全局配置:
## Spring Kafka 配置信息 spring.kafka.bootstrapServers = localhost:9092
### Kafka 生产者配置
# spring.kafka.producer.bootstrapServers = localhost:9092
spring.kafka.producer.keySerializer =org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueSerializer =org.apache.kafka.common.serialization.StringSerializer
编写发送端实现
package com.gupao.springcloudstreamkafka.web.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka 生产者 Controller
*
* @author 小马哥 QQ 1191971402
* @copyright 咕泡学院出品
* @since 2017/11/12
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
@Autowired
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping("/message/send")
public Boolean sendMessage(@RequestParam String message) {
kafkaTemplate.send(topic, message);
return true;
}
}
创建消费者
增加消费者配置
### Kafka 消费者配置
spring.kafka.consumer.groupId = gupao-1
spring.kafka.consumer.keyDeserializer =org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valueDeserializer =org.apache.kafka.common.serialization.StringDeserializer
编写消费端实现
package com.gupao.springcloudstreamkafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* Kafka 消费者监听器
*
* @author 小马哥 QQ 1191971402
* @copyright 咕泡学院出品
* @since 2017/11/12
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics ="${kafka.topic}")
public void onMessage(String message) {
System.out.println("Kafka 消费者监听器,接受到消息:" + message);
}
}
Spring Cloud Stream
Spring Cloud Stream Binder : Kafka
问答部分
-
当使用Future时,异步调用都可以使用get()方式强制执行吗?
答:是的,get() 等待当前线程执行完毕,并且获取返回接口
-
@KafkaListener
和KafkaConsumer
有啥区别答:没有实质区别,主要是 编程模式。
@KafkaListener
采用注解驱动KafkaConsumer
采用接口编程 -
消费者接受消息的地方在哪?
答:订阅并且处理后,就消失。
-
在生产环境配置多个生产者和消费者只需要定义不同的group就可以了吗?
答:group 是一种,要看是不是相同 Topic
-
为了不丢失数据,消息队列的容错,和排错后的处理,如何实现的?
答:这个依赖于 zookeeper
-
异步接受除了打印还有什么办法处理消息吗
答:可以处理其他逻辑,比如存储数据库
-
kafka适合什么场景下使用?
答:高性能的 Stream 处理
-
Kafka消息一直都在,内存占用会很多吧,消息量不停产生消息咋办?
答:Kafka 还是会删除的,并不一致一直存在
-
怎么没看到 broker 配置?
答:Broker 不需要设置,它是单独启动
-
consumer 为什么要分组?
答:consumer 需要定义不同逻辑分组,以便于管理
Spring Cloud Stream(下)
RabbitMQ:AMQP、JMS 规范
Kafka : 相对松散的消息队列协议
《企业整合模式》: Enterprise Integration Patterns
Spring Cloud Stream
基本概念
Source:来源,近义词:Producer、Publisher
Sink:接收器,近义词:Consumer、Subscriber
Processor:对于上流而言是 Sink,对于下流而言是 Source
Reactive Streams :
- Publisher
- Subscriber
- Processor
Spring Cloud Stream Binder : Kafka
继续沿用 spring-cloud-stream-kakfa-demo 工程
启动 Zookeeper
启动 Kafka
消息大致分为两个部分:
- 消息头(Headers)
- 消息体(Body/Payload)
定义标准消息发送源
@Component
@EnableBinding({Source.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT) // Bean 名称
private MessageChannel messageChannel;
@Autowired
private Source source;
}
自定义标准消息发送源
public interface MessageSource {
/**
* 消息来源的管道名称:"gupao"
*/
String OUTPUT = "gupao";
@Output(OUTPUT)
MessageChannel gupao();
}
@Component
@EnableBinding({Source.class,MessageSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT) // Bean 名称
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
private MessageSource messageSource;
@Autowired
@Qualifier(MessageSource.OUTPUT) // Bean 名称
private MessageChannel gupaoMessageChannel;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
// 通过消息管道发送消息
// messageChannel.send(MessageBuilder.withPayload(message).build());
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息到 Gupao
* @param message 消息内容
*/
public void sendToGupao(String message){
// 通过消息管道发送消息
gupaoMessageChannel.send(MessageBuilder.withPayload(message).build());
}
}
实现标准 Sink
监听
绑定 Sink
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT) // Bean 名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
}
通过 SubscribableChannel
订阅消息
// 当字段注入完成后的回调
@PostConstruct
public void init(){
// 实现异步回调
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
});
}
通过 @ServiceActivator
订阅消息
//通过@ServiceActivator
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message) {
System.out.println("@ServiceActivator : " + message);
}
通过 @StreamListener
订阅消息
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("@StreamListener : " + message);
}
这三种方式每次只能有一个接收到消息
Spring Cloud Stream Binder : RabbitMQ
producer
@Component
@EnableBinding({Source.class,MessageSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT) // Bean 名称
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
private MessageSource messageSource;
@Autowired
@Qualifier(MessageSource.NAME) // Bean 名称
private MessageChannel gupaoMessageChannel;
/**
* 发送消息
* @param message 消息内容
*/
public void send(String message){
// 通过消息管道发送消息
// messageChannel.send(MessageBuilder.withPayload(message).build());
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 发送消息到 Gupao
* @param message 消息内容
*/
public void sendToGupao(String message){
// 通过消息管道发送消息
gupaoMessageChannel.send(MessageBuilder.withPayload(message).build());
}
}
consumer
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT) // Bean 名称
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
// 当字段注入完成后的回调
@PostConstruct
public void init() {
// 实现异步回调
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("subscribe : " + message.getPayload());
}
});
}
//通过@ServiceActivator
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message) {
System.out.println("@ServiceActivator : " + message);
}
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("@StreamListener : " + message);
}
}
配置文件
## 定义应用的名称
spring.application.name = spring-cloud-stream-rabbitmq
## 配置 Web 服务端口
server.port = 8080
## 失效管理安全
management.security.enabled = false
## 配置需要的kafka 主题
kafka.topic = gupao
## 定义 Spring Cloud Stream Source 消息去向
### 针对 Kafka 而言,基本模式下
# spring.cloud.stream.bindings.${channel-name}.destination = ${kafka.topic}
spring.cloud.stream.bindings.output.destination = ${kafka.topic}
spring.cloud.stream.bindings.gupao.destination = test
spring.cloud.stream.bindings.input.destination = ${kafka.topic}
启动 RabbitMQ
重构工程,删除kafka 的强依赖的实现
问答部分
-
@EnableBinding
有什么用?答:
@EnableBinding
将Source
、Sink
以及Processor
提升成相应的代理 -
@Autorwired Source source这种写法是默认用官方的实现?
答:是官方的实现
-
这么多消息框架 各自优点是什么 怎么选取
答:RabbitMQ:AMQP、JMS 规范
Kafka : 相对松散的消息队列协议
ActiveMQ:AMQP、JMS 规范
AMQP v1.0 support
MQTT v3.1 support allowing for connections in an IoT environment.https://content.pivotal.io/rabbitmq/understanding-when-to-use-rabbitmq-or-apache-kafka
-
如果中间件如果有问题怎么办,我们只管用,不用维护吗。现在遇到的很多问题不是使用,而是维护,中间件一有问题,消息堵塞或丢失都傻眼了,都只有一键重启
答:消息中间件无法保证不丢消息,多数高一致性的消息背后还是有持久化的。
-
@EnableBinder, @EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?
答:不完全对,主要处理接口在
@Import
:-
ImportSelector
实现类 -
ImportBeanDefinitionRegistrar
实现类 -
@Configuration
标注类 -
BeanPostProcessor
实现类
-
-
我对流式处理还是懵懵的 到底啥是流式处理 怎样才能称为流式处理 一般应用在什么场景?
答:Stream 处理简单地说,异步处理,消息是一种处理方式。
提交申请,机器生成,对于高密度提交任务,多数场景采用异步处理,Stream、Event-Driven。举例说明:审核流程,鉴别黄图。
-
如果是大量消息 怎么快速消费 用多线程吗?
答:确实是使用多线程,不过不一定奏效。依赖于处理具体内容,比如:一个线程使用了
25% CPU,四个线程就将CPU 耗尽。因此,并发 100个处理,实际上,还是 4个线程在处理。I/O 密集型、CPU 密集型。
-
如果是大量消息 怎么快速消费 用多线程吗
答:大多数是多线程,其实也单线程,流式非阻塞。
-
购物车的价格计算可以使用流式计算来处理么?能说下思路么?有没有什么高性能的方式推荐?
答:当商品添加到购物车的时候,就可以开始计算了。
Spring Cloud Sleuth
基础
Google Dapper
Spring Cloud Sleuth 整合
引入Maven 依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
日志发生的变化
当应用ClassPath 下存在org.springframework.cloud:spring-cloud-starter-sleuth
时候,日志会发生调整。
整体流程
spring-cloud-starter-sleuth
会自动装配一个名为TraceFilter
组件(在 Spring WebMVC DispatcherServlet
之前),它会增加一些 slf4j MDC
MDC : Mapped Diagnostic Context
org.springframework.cloud:spring-cloud-starter-sleuth
会调整当前日志系统(slf4j)的 MDC Slf4jSpanLogger#logContinuedSpan(Span)
:
@Override
public void logContinuedSpan(Span span) {
MDC.put(Span.SPAN_ID_NAME, Span.idToHex(span.getSpanId()));
MDC.put(Span.TRACE_ID_NAME, span.traceIdString());
MDC.put(Span.SPAN_EXPORT_NAME, String.valueOf(span.isExportable()));
setParentIdIfPresent(span);
log("Continued span: {}", span);
}
Zipkin 整合
创建 Spring Cloud Zipkin 服务器
增加 Maven 依赖
<!-- Zipkin 服务器依赖 -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-server</artifactId>
</dependency>
<!-- Zipkin 服务器UI控制器 -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>
<scope>runtime</scope>
</dependency>
激活 Zipkin 服务器
package com.gupao.springcloudzipkinserverdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import zipkin.server.EnableZipkinServer;
@SpringBootApplication
@EnableZipkinServer
public class SpringCloudZipkinServerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudZipkinServerDemoApplication.class, args);
}
}
HTTP 收集(HTTP 调用)
简单整合spring-cloud-sleuth
增加 Maven 依赖
<!-- Zipkin 客户端依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
Spring Cloud 服务大整合
端口信息
spring-cloud-zuul 端口:7070
person-client 端口:8080
person-service 端口:9090
Eureka Server 端口:12345
Zipkin Server 端口:23456
启动服务
- Zipkin Server
- Eureka Server
- spring-cloud-config-server
- person-service
- person-client
- spring-cloud-zuul
spring-cloud-sleuth-demo 改造
增加 Eureka 客户端依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
激活 Eureka 客户端
package com.gupao.springcloudsleuthdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class SpringCloudSleuthDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudSleuthDemoApplication.class, args);
}
}
调整配置
spring.application.name = spring-cloud-sleuth
## 服务端口
server.port = 6060
## Zipkin 服务器配置
zipkin.server.host = localhost
zipkin.server.port = 23456
## 增加 ZipKin 服务器地址
spring.zipkin.base-url = \
http://${zipkin.server.host}:${zipkin.server.port}/
## Eureka Server 服务 URL,用于客户端注册
eureka.client.serviceUrl.defaultZone=\
http://localhost:12345/eureka
调整代码连接 spring-cloud-zuul
/**
* 完整的调用链路:
* spring-cloud-sleuth
* -> spring-cloud-zuul
* -> person-client
* -> person-service
*
* @return
*/
@GetMapping("/to/zuul/person-client/person/find/all")
public Object toZuul() {
logger.info("spring-cloud-sleuth#toZuul()");
// spring-cloud-zuul : 7070
String url = "http://spring-cloud-zuul/person-client/person/find/all";
return restTemplate.getForObject(
url, Object.class);
}
spring-cloud-zuul
上报到 Zipkin 服务器
person-client
上报到 Zipkin 服务器
person-service
上报到 Zipkin 服务器
<!-- Zipkin 客户端依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
## Zipkin 服务器配置
zipkin.server.host = localhost
zipkin.server.port = 23456
## 增加 ZipKin 服务器地址
spring.zipkin.base-url = \
http://${zipkin.server.host}:${zipkin.server.port}/
Spring Cloud Stream 收集(消息)
调整spring-cloud-zipkin-server
通过 Stream 来收集
增加 Maven 依赖
<!-- Zipkin 服务器通过Stream 收集跟踪信息 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
</dependency>
<!-- 使用 Kafka 作为 Stream 服务器 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
激活 Zipkin Stream
package com.gupao.springcloudzipkinserverdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.sleuth.zipkin.stream.EnableZipkinStreamServer;
import zipkin.server.EnableZipkinServer;
@SpringBootApplication
//@EnableZipkinServer
@EnableZipkinStreamServer
public class SpringCloudZipkinServerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudZipkinServerDemoApplication.class, args);
}
}
启动 Zookeeper
启动 Kafka 服务器
启动spring-cloud-zipkin-server
调整 spring-cloud-zuul
增加依赖
<!-- 添加 sleuth Stream 收集方式 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<!-- Kafka Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
注意把前面 HTTP 上报URL配置注释:
### 增加 ZipKin 服务器地址 #spring.zipkin.base-url = \ # http://${zipkin.server.host}:${zipkin.server.port}/
问答部分
-
只有第一个被访问的应用的pom引用了spring-cloud-starter-sleuth包么?后面的应用spanid是怎么生成的?上报是汇集到一起上报么?
答:每个应用都需要依赖
spring-cloud-starter-sleuth
。SpanId 是当前应用生成的,Trace ID 是有上个应用传递过来(Spring Cloud 通过请求头Headers) -
sleuth配合zipkin主要解决的是分布式系统下的请求的链路跟踪,这么理解没错吧?
答:可以这么理解,问题排查、性能、调用链路跟踪
-
sleuth 与 eureka 没关联吧?
答:没有直接关系,Sleuth 可以利用 Eureka 做服务发现
-
生产上类sleuth的log日志应该看哪些资料?opentsdb,也有调用链路等相关信息么?
答:OpenTsdb 主要是存储 JSON 格式,存放Metrics 信息
-
排查问题是根据traceid来查找有所日志吧
答:可以通过TraceId 查找调用链路,通过 Span Id 定位在什么环节
-
整合显示是sleuth做的,zipkin用来收集信息对吧?
答:zipkin 是一种用整合方式
-
问个题外话,两个系统对接需要通过json进行数据传输,但是两个系统之间的json数据格式是不同的,有没有好一点的开源项目可以解决这个问题,类似xslt转换xml的方式。
答:A -> B, data1 , B -> A data2, 接受方使用Map。JSON Path
{"id":1, abc:{ "name":"xiaomage", "age":32 } }
Spring Boot + Spring Cloud 整体回顾
Spring Boot
JPA
JPA
Java 持久化的标准
Java Persistence API
Hibernate Session#save
EntityManager#persit
EJB3.0 JPA 1.0
Hibernate
定义实体
@Entity
@Table(name = "persons")
public class Person {
@Id
@GeneratedValue
private Long id;
@Column
private String name;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
注入EntityManager
到服务层
@Service
public class PersonService {
/**
* 通过标准的JPA的方式注入
*/
@PersistenceContext
private EntityManager entityManager;
/**
* 存储 {@link Person}
* @param person
*/
public void save(Person person) {
entityManager.persist(person);
}
}
配置 JPA 数据源
## 增加 MySQL 数据源(DataSource)
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/test
spring.datasource.username = root
spring.datasource.password = 123456
增加 MySQL JDBC 驱动依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
通过 JPA 自动创建表
## 配置 JPA 行为
spring.jpa.generateDdl = true
spring.jpa.showSql = true
定义 Person 仓储接口
@Repository
public interface PersonRepository extends PagingAndSortingRepository<Person,Long>{
}
实现 Person 分页算法
@GetMapping("/person/list")
public Page<Person> list(Pageable pageable){
return personRepository.findAll(pageable);
}
Spring Data JPA
Spring Boot JPA
Spring Cloud
监控
监控信息埋点:Sleuth
HTTP 上报
sleuth-zipkin
Spring Cloud Stream 上报
Rabbit MQ Binder
Kafka Binder
监控信息接受:Zipkin
通过 HTTP 收集
sleuth-zipkin
Spring Cloud Stream 收集
Rabbit MQ Binder
Kafka Binder
日志收集
ELK
日志格式调整,从普通单行日志,变成 JSON 格式(Base on ElasticSearch)。
限流
Hystrix
@HystrixCommand
HystrixCommand
管理平台:hystrix-dashboard
数据聚合:Turbine
分布式配置
Spring Cloud Config 客户端
直接连接方式
利用 Discovery Client
Spring Cloud Config 服务端
利用 Discovery Client 让其他配置客户端,发现服务端
Git Base 实现
Consul 实现
Zookeeper 实现
服务网关
Zuul
Zuul 类似于 Filter
或者 Servlet
服务治理
Eureka 客户端
Eureka 服务端
问答部分
-
springBoot 和springCound区别和联
答:Spring Framework,封装了大多数 Java EE 标准技术以及开源实现方法,提高生成力。不过 Spring 应用需要配置的相关组件,Spring Boot 帮助简化了配置步骤,采用默认的配置能够快速的启动或者构建一个Java EE 应用。Spring Boot 单机应用,Spring Cloud 是将 Spring Boot 变成云应用,那么里面需要增强一些组件,监控、限流、服务注册和发现等等。
-
springData-jpa和JTA持久化框架区别
答:JTA 主要关注分布式事务,事务操作背后可能资源是 数据库、消息、或者缓存等等。
从数据库角度,JTA 使用 JPA作为存储,但是可以使用 JDBC。JTA 还能操作 JMS。
-
feign 和 ribbon 区别?feign内部实现依靠的是ribbon的嘛?
答:Feign 作为声明式客户端调用,Ribbon 主要负责负载均衡。Feign 可以整合 Ribbon,但是不是强依赖。Spring Cloud 对 Feign 增强,Feign 原始不支持 Spring WebMVC,而是支持标准 JAX-RS(Rest 标准)
-
整合图,zuul 换成 nginx ,nginx应该怎么配置才能使用sleuth,从网关开始监控?
答:nginx 需要增加 HTTP 上报监控信息到 Zipkin Server
-
spring-data-jpa里面有个地方我觉得特别不好用,就是实现Repository的实现。比如写了一个接口,里面有方法findByFieldAAndFieldB(String fieldA, String fieldB),如果fieldA或者fieldB是null,data-jpa的实现是当你select * from table where field_a = fied_a and field_b is null。这在做查询的时候特别不好用,大部分情况下都是希望是select * from table where field_a = field_a。除了用JpaSpecificationExecutor有没有别的方法?因为去写Specification太麻烦了。
答:我采用Native SQL 处理。
-
SpringCloud 服务治理能和dubbo共存,或者替换成dubbo吗
答:这个问题在社区里面有人问题,目前暂时没有确定答复。
-
想和mybatis一样,可以根据条件生成不同的sql?
答:Spring Data JPA 不太好实现,满足90%的CRUD 需求。
-
记得在原生MyBatis的mapper.xml文件中可以使用标签来判断是不是为null。如果是null的话就会舍弃该查询字段。
答:MyBatis 里面 Mapper.xml 可以增加 if 或者for each 这样的语句
-
spring-data-jpa-reactive里面的实现为什么方法的返回值不能使用Page了?比如接口只能声明Flux<T> findAll(Pageable pageable)而不能使用Page<T> findAll(Pageable pageable)或者Mono<Page<T>> findAll(Pageable pageable)
答:Reactive 是推模式,所以被动更新。Page 是 Iterable 接口,它是拉的模式,如果在reactive 中返回Page ,那么违反了设计的初中。
-
spring boot 和spring cloud,生成环境怎么部署比较好,是直接java 运行还是放到tomcat中间件里统一启动?
答:直接通过 java 或者 jar 命令启动
-
刚才小马哥讲的JPA分页查询,他实现的是页面全部刷新还是局部刷新?类似于ajax做分页查询
答:全量更新,查询数据库。
-
使用spring cloud这一套框架,怎么进行API鉴权,认证成功的用户信息怎么保存,各个微服务的数据状态又怎么保存
答:Spring Security OAuth2 验证,Spring Session 管理用户状态,会话状态,不需要长期保存,在短时间内保存,比如 Spring Session + Redis(30分钟)。
-
Spring Security 为什么不能跨版本使用
答:Spring Security 兼容不是特别好,一般建议统一版本。
-
Shiro 和 Spring Security 的区别?
答:Shiro 是纯后端的安全校验和认证框架,Spring Security 是前端 + 后端安全校验和认证框架。
-
POJO可以和JPA解耦么,annotation往往还得引入hb?除了xml配置。耦合得有点闹心啊?
答:所以应该把 DTO 对象 和 Entity 解耦,不要继承也不要组合。
-
小马哥,说说你怎么区别spring(包括最近讲的)里大量注解的理解和使用?
答:作用域来区别,职责。
数据:JDBC、JPA、Cache、NoSQL、Message
安全:认证和鉴别
监控:数据埋点、数据收集
Web:Servlet、Spring WebMVC、JAX-RS、WebSocket、WebServices
配置:System Properties、Properties、YAML、启动参数、CSV、XML