在现在的微服务使用的过程中,经常会遇到依赖的服务不可用,那么如果依赖的服务不可用的话,会导致把自己的服务也会拖死,那么就产生了熔断,熔断顾名思义就是当服务处于不可用的时候采取半开关的状态,达到一定数量后就熔断器就打开。这就相当于家里边的保险丝,如果电压过高的话,保险丝就会断掉,起到保护电器的作用。
目前支持熔断,降级的就是Hystrix,当然还有resilience4j还有Sentinel。今天咱们以Hystrix为主吧。其他的大家可以自行研究。
Hystrix主要实现三个功能,接下来咱们继续展开。
1. 资源隔离
2. 熔断
3. 降级
资源隔离分为两种,一种是线程池隔离,一种是信号量semaphore隔离。线程池以请求的线程和执行的线程分为不同的线程执行,信号量是请求和执行采用相同的线程。
当然,涉及到线程池的话,那么就支持异步,支持异步Future的话也就支持get的时候支持超时获取。信号量这些功能不支持,但是信号量是支持熔断,限流。他们的区别如下:
线程切换 异步 超时 熔断 限流 资源开销
线程池 是 是 是 是 是 大
信号量 否 否 否 是 是 小
HystrixCommand的命令执行大致如下图:
依赖的pom如下:
<!-- 依赖版本 --> <hystrix.version>1.3.16</hystrix.version> <
hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version
> <dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</
artifactId> <version>${hystrix.version}</version> </dependency> <dependency> <
groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-metrics-event-stream</
artifactId> <version>${hystrix-metrics-event-stream.version}</version> </
dependency>
支持同步,异步,观察事件拦截,以及订阅方式,下面咱们直接看代码实现吧。大家一看就明白了:
import com.netflix.hystrix.HystrixCommand; import
com.netflix.hystrix.HystrixCommandGroupKey;import rx.Observable; import
rx.Subscriber;import rx.functions.Action1; import
java.util.concurrent.ExecutionException;import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import
java.util.concurrent.TimeoutException;/** * @author huangqingshi * @Date
2019-03-17*/ public class HelloWorldCommand extends HystrixCommand<String> {
private final String name; public HelloWorldCommand(String name) { //指定命令组名
super(HystrixCommandGroupKey.Factory.asKey("myGroup")); this.name = name; }
@Overrideprotected String run() throws Exception { //逻辑封装在run里边 return "Hello:"
+ name + " thread:" + Thread.currentThread().getName(); } public static void
main(String[] args)throws InterruptedException, ExecutionException,
TimeoutException {//每个Command只能调用一次,不能重复调用。重复调用会报异常。 HelloWorldCommand
helloWorldCommand =new HelloWorldCommand("Synchronous-hystrix"); //execute同步调用
等同于:helloWorldCommand.queue().get(); String result =
helloWorldCommand.execute(); System.out.println("result:" + result);
helloWorldCommand= new HelloWorldCommand("Asynchronous-hystrix"); //异步调用
Future<String> future = helloWorldCommand.queue(); //get可以指定获取的时间100毫秒,默认为1秒
result = future.get(100, TimeUnit.MILLISECONDS); System.out.println("result:" +
result); System.out.println("main thread:" + Thread.currentThread().getName());
testObserve(); }public static void testObserve() { //注册观察者事件拦截
Observable<String> observable =new HelloWorldCommand("observe").observe(); //
注册回调事件 observable.subscribe(new Action1<String>() { @Override public void
call(String result) {//result就是调用HelloWorldCommand的结果
System.out.println("callback:" + result); } }); //注册完成版的事件 observable.subscribe(
new Subscriber<String>() { @Override public void onCompleted() {
System.out.println("onCompleted调用:onNext : onError之后调用"); } @Override public
void onError(Throwable throwable) { //异常产生了之后会调用 System.out.println("onError:" +
throwable.getMessage()); } @Overridepublic void onNext(String s) { //获取结果后回调
System.out.println("onNext:" + s); } }); } }
执行结果如下:
result:Hello:Synchronous-hystrix thread:hystrix-myGroup-1
result:Hello:Asynchronous-hystrix thread:hystrix-myGroup-2
main thread:main
callback:Hello:observe thread:hystrix-myGroup-3
onNext:Hello:observe thread:hystrix-myGroup-3
onCompleted调用:onNext : onError之后调用
接下来是线程池隔离的例子:
import com.netflix.hystrix.*; /** * @author huangqingshi * @Date 2019-03-17 */
public class ThreadPoolCommand extends HystrixCommand<String> { private String
name;public ThreadPoolCommand(String name) { super
(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔断终端5秒后会进入半打开状态
.withCircuitBreakerErrorThresholdPercentage(50)//错误率达到50开启熔断保护
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
//10个核心线程
).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10
)) );this.name = name; } @Override protected String run() throws Exception {
return "threadPoolCommand name:" + name; } public static void main(String[]
args) { ThreadPoolCommand threadPoolCommand= new ThreadPoolCommand("threadPool"
); String result= threadPoolCommand.execute(); System.out.println("result:" +
result); } } 执行结果: result:threadPoolCommand name:threadPool
信号量隔离例子:
/** * @author huangqingshi * @Date 2019-03-17 */ public class SemaphoreCommand
extends HystrixCommand<String> { private String name; public
SemaphoreCommand(String name) {super
(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("semaphoreGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("semaphoreCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()//
至少10个请求,熔断器才会进行错误率的计算 默认值20 .withCircuitBreakerRequestVolumeThreshold(10) //
熔断器中断请求5秒后会自动进入半打开状态,放部分流量进行重试 默认值5000ms
.withCircuitBreakerSleepWindowInMilliseconds(5000) //错误率达到50开启熔断保护
.withCircuitBreakerErrorThresholdPercentage(50) //设置隔离策略
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
//最大并发量10 .withExecutionIsolationSemaphoreMaxConcurrentRequests(10) ) ); this
.name = name; } @Override protected String run() throws Exception { return
"semaphore success name:" + name; } @Override protected String getFallback() {
return "semaphore fallback name:" + name; } public static void main(String[]
args) { SemaphoreCommand semaphoreCommand= new
SemaphoreCommand("semaphoreCommand"); String result =
semaphoreCommand.execute(); System.out.println(result); } } 执行结果: semaphore
success name:semaphoreCommand
在执行的过程中,如果出现调用服务的时候出现错误的时候会先进行熔断,就是如果流量达到设置的量的时候进行统计,比如10个请求,然后如果出现错误率超过配置的错误率就会进行将熔断进行打开,打开之后会进行调用降级方法fallback。过了一段时间后,可以放行部分流量,如果流量正常了,则会将熔断器开关关闭。下图是来自官方文档截图,里边维护者一个bucket,每秒一个bucket,里边记录着成功,失败,超时,拒绝。这个周期是通过withCircuitBreakerSleepWindowInMilliseconds配置的。
接下来咱们看一下降级,也就是熔断器打开的时候,会走fallback方法,继续看例子。
import com.netflix.hystrix.*; /** * @author huangqingshi * @Date 2019-03-17 */
public class ThreadPoolCommand extends HystrixCommand<String> { private String
name;public ThreadPoolCommand(String name) { super
(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("threadPoolGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("threadPoolCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10) //至少10个请求,熔断器才进行错误计算 默认值20
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔断终端5秒后会进入半打开状态
.withCircuitBreakerErrorThresholdPercentage(50)//错误率达到50开启熔断保护
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
//10个核心线程
).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10
)) );this.name = name; } @Override protected String run() throws Exception {
return "threadPoolCommand name:" + name; } public static void main(String[]
args) { ThreadPoolCommand threadPoolCommand= new ThreadPoolCommand("threadPool"
); String result= threadPoolCommand.execute(); System.out.println("result:" +
result); } } 执行结果: result:executed fallback 并且抛出超时异常。因为程序故意设计超时的。
当然,Hystrixcommand还支持primary或secondary的方式,可以先看看流程图:
是否执行primary是通过参数primarySecondary.userPrimary为true时执行。false的时候执行secondary方式。
/** * @author huangqingshi * @Date 2019-03-18 */ public class
PrimarySecondaryFacadeextends HystrixCommand<String> { private final static
DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().
getBooleanProperty("primarySecondary.usePrimary", true); private int id; public
PrimarySecondaryFacade(int id) { super
(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("primarySecondCommand")) //
此处采用信号量,primary、secondary采用线程池
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) ) );this.id =
id; } @Overrideprotected String run() throws Exception { if(usePrimary.get()) {
return new PrimaryCommand(id).execute(); } else { return new
SecondaryCommand(id).execute(); } } @Overrideprotected String getFallback() {
return "static-fallback-" + id; } @Override protected String getCacheKey() {
return String.valueOf(id); } private static class PrimaryCommand extends
HystrixCommand<String> { private final int id; private PrimaryCommand(int id) {
super(Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
withExecutionIsolationThreadTimeoutInMilliseconds(600))); this.id = id; }
@Overrideprotected String run() { return "responseFromPrimary-" + id; } }
private static class SecondaryCommand extends HystrixCommand<String> { private
final int id; private SecondaryCommand(int id) { super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("facadeGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().
withExecutionIsolationThreadTimeoutInMilliseconds(600))); this.id = id; }
@Overrideprotected String run() { return "responseFromSecondary-" + id; } }
public static class UnitTest { @Test public void testPrimary() {
HystrixRequestContext context= HystrixRequestContext.initializeContext(); try {
ConfigurationManager.getConfigInstance().setProperty(
"primarySecondary.usePrimary",true); assertEquals("responseFromPrimary-100", new
PrimarySecondaryFacade(100).execute()); } finally { context.shutdown();
ConfigurationManager.getConfigInstance().clear(); } } @Testpublic void
testSecondary() { HystrixRequestContext context=
HystrixRequestContext.initializeContext();try {
ConfigurationManager.getConfigInstance().setProperty(
"primarySecondary.usePrimary",false); assertEquals("responseFromSecondary-100",
new PrimarySecondaryFacade(100).execute()); } finally { context.shutdown();
ConfigurationManager.getConfigInstance().clear(); } } } }
好了,这个基本上就是Hystrix的基本功能,但是有个问题就是Hystrix已经不维护了,但是目前的稳定版大家也都在使用,所以列出来了。当然也推荐大家使用Sentinel,功能比较强大,就是自适应限流功能等,功能也非常强大,后续研究之后再出相关文章吧。这个文章就当大家的一个敲门砖吧,有问题请及时告知,谢谢。
热门工具 换一换