Hystrix 源码解析 —— 命令合并执行

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


������关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令合并执行

《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的 概念原理使用场景优缺点 已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。

命令合并执行整体流程如下图 :

FROM 《【翻译】Hystrix文档-实现原理》「请求合并」


  • 第一步,提交 单个 命令请求到请求队列( RequestQueue )
  • 第二部,定时任务( TimerTask ) 固定周期 从请求队列获取 多个 命令执行,合并执行。

在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。

推荐 Spring Cloud 书籍:

2. HystrixCollapser

com.netflix.hystrix.HystrixCollapser命令 合并器 抽象父类

NOTE : com.netflix.hystrix.HystrixObservableCollapser另一种 命令合并器 抽象父类 ,本文暂不解析。

2.1 构造方法

HystrixCollapser 构造方法 ,代码如下 :

public abstract class HystrixCollapser
        implements HystrixExecutable, HystrixObservable {

    private final RequestCollapserFactory collapserFactory;
    private final HystrixRequestCache requestCache;
    private final HystrixCollapserBridge collapserInstanceWrapper;
    private final HystrixCollapserMetrics metrics;
    
    /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
        if (collapserKey == null || collapserKey.name().trim().equals("")) {
            String defaultKeyName = getDefaultNameFromClass(getClass());
            collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
        }

        HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
        this.collapserFactory = new RequestCollapserFactory(collapserKey, scope, timer, properties);
        this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

        if (metrics == null) {
            this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
        } else {
            this.metrics = metrics;
        }

        final HystrixCollapser self = this;

         /* strategy: HystrixMetricsPublisherCollapser */
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);

        /**
         * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
         */
        collapserInstanceWrapper = new HystrixCollapserBridge() {

            @Override
            public Collection<Collection<CollapsedRequest>> shardRequests(Collection<CollapsedRequest> requests) {
                Collection<Collection<CollapsedRequest>> shards = self.shardRequests(requests);
                self.metrics.markShards(shards.size());
                return shards;
            }

            @Override
            public Observable createObservableCommand(Collection<CollapsedRequest> requests) {
                final HystrixCommand command = self.createCommand(requests);

                command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());
                self.metrics.markBatch(requests.size());

                return command.toObservable();
            }

            @Override
            public Observable mapResponseToRequests(Observable batchResponse, final Collection<CollapsedRequest> requests) {
                return batchResponse.single().doOnNext(new Action1() {
                    @Override
                    public void call(BatchReturnType batchReturnType) {
                        // this is a blocking call in HystrixCollapser
                        self.mapResponseToRequests(batchReturnType, requests);
                    }
                }).ignoreElements().cast(Void.class);
            }

            @Override
            public HystrixCollapserKey getCollapserKey() {
                return self.getCollapserKey();
            }

        };
    }
    
}
  • BatchReturnType 泛型多个 命令合并执行返回结果类型。
  • ResponseType 泛型单个 命令执行返回结果类型。
  • RequestArgumentType 泛型单个 命令参数类型。
  • collapserFactory 属性,RequestCollapser 工厂 ,在「3. RequestCollapserFactory」详细解析。
  • requestCache 属性,TODO 【2012】【请求上下文】
  • collapserInstanceWrapper 属性, 命令 合并器包装器。
    • com.netflix.hystrix.collapser.HystrixCollapserBridge 接口 ,点击 链接 查看代码。
    • HystrixCollapserBridge ,为 RequestBatch 透明 调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见《桥接模式》 。
  • metrics 属性,TODO 【2002】【metrics】

2.2 执行命令方式

《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的 四种 执行命令方式。

HystrixCollapser 类似于 HystrixCommand ,也提供 四种 相同的执行命令方式,其中如下三种方式代码基本 类似 ,我们就给下 传送门 ,就不重复啰嗦了 :

下面一起来看看 #toObservable() 方法的实现,代码如下 :

1: public Observable toObservable() {
  2:     // when we callback with the data we want to do the work
  3:     // on a separate thread than the one giving us the callback
  4:     return toObservable(Schedulers.computation());
  5: }
  6: 
  7: public Observable toObservable(Scheduler observeOn) {
  8:     return Observable.defer(new Func0<Observable>() {
  9:         @Override
 10:         public Observable call() {
 11:             // // 缓存开关、缓存KEY
 12:             final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
 13:             final String cacheKey = getCacheKey();
 14: 
 15:             // 优先从缓存中获取
 16:             /* try from cache first */
 17:             if (isRequestCacheEnabled) {
 18:                 HystrixCachedObservable fromCache = requestCache.get(cacheKey);
 19:                 if (fromCache != null) {
 20:                     metrics.markResponseFromCache();
 21:                     return fromCache.toObservable();
 22:                 }
 23:             }
 24: 
 25:             // 获得 RequestCollapser
 26:             RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
 27: 
 28:             // 提交 命令请求
 29:             Observable response = requestCollapser.submitRequest(getRequestArgument());
 30: 
 31:             // 获得 缓存Observable
 32:             if (isRequestCacheEnabled && cacheKey != null) {
 33:                 HystrixCachedObservable toCache = HystrixCachedObservable.from(response);
 34:                 HystrixCachedObservable fromCache = requestCache.putIfAbsent(cacheKey, toCache);
 35:                 if (fromCache == null) {
 36:                     return toCache.toObservable();
 37:                 } else {
 38:                     toCache.unsubscribe(); // 取消订阅
 39:                     return fromCache.toObservable();
 40:                 }
 41:             }
 42: 
 43:             // 获得 非缓存Observable
 44:             return response;
 45:         }
 46:     });
 47: }
  • observeOn 方法参数,实际方法暂未用到,跳过无视。
  • 第 11 至 13 行 :缓存存开关、KEY 。
  • 反向 】第 32 至 41 行 :获得【缓存 Observable】。这块代码和 AbstractCommand#toObservavle(...) 类似,在 《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(…)」 有详细解析。
  • 反向 】第 44 行 :获得【非缓存 Observable】。
  • 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在 #queue() / #execute() 方法,通过 BlockingObservable 阻塞 等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。
  • 第 26 行 :调用 RequestCollapserFactory#getRequestCollapser() ,获得 RequestCollapser 。在「3. RequestCollapserFactory」详细解析。
  • 第 29 行 :提交 单个 命令请求到请求队列( RequestQueue ),即 命令合并执行整体流程第一步 。在「4. RequestCollapser」详细解析。

2.3 核心方法

  • #getRequestArgument(...) 抽象 方法,获得 单个 命令参数。代码如下 :

    public abstract RequestArgumentType getRequestArgument();
  • #createCommand(...) 抽象 方法,将 多个 命令请求 合并 ,创建 一个 HystrixCommand 。代码如下 :

    protected abstract HystrixCommand createCommand(Collection<CollapsedRequest> requests);
  • #mapResponseToRequests(...) 抽象 方法,将 一个 HystrixCommand 的执行结果, 映射 回对应的命令请求们。

    protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest> requests);
  • #shardRequests(...) 方法,将 多个 命令请求 分片N 个【 多个 命令请求】。默认实现下,不进行分片。代码如下 :

    protected Collection<Collection<CollapsedRequest>> shardRequests(Collection<CollapsedRequest> requests) {
        return Collections.singletonList(requests);
    }
  • 未重写 #shardRequests(...) 的情况下,整体方法流程如下 :


  • 重写 #shardRequests(...) 的情况下,整体方法流程如下 :


    • 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。

3. RequestCollapserFactory

com.netflix.hystrix.collapser.RequestCollapserFactory ,RequestCollapser 工厂

public class RequestCollapserFactory {
    
    private final CollapserTimer timer;
    private final HystrixCollapserKey collapserKey;
    private final HystrixCollapserProperties properties;
    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scope scope;
    
    public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {
         /* strategy: ConcurrencyStrategy */
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        this.timer = timer;
        this.scope = scope;
        this.collapserKey = collapserKey;
        this.properties = properties;
    }

调用 #getRequestCollapser() 方法,获得 RequestCollapser 。代码如下 :

public RequestCollapser getRequestCollapser(HystrixCollapserBridge commandCollapser) {
   if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
       return getCollapserForUserRequest(commandCollapser);
   } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
       return getCollapserForGlobalScope(commandCollapser);
   } else {
       logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());
       return getCollapserForUserRequest(commandCollapser);
   }
}
  • 根据 scope 不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从 缓存 中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到 缓存 并返回。
    • REQUEST :调用 #getCollapserForUserRequest() 方法,TODO 【2012】【请求上下文】。
    • GLOBAL :调用 #getCollapserForGlobalScope() 方法,点击 链接 查看 中文注释 的代码。

4. RequestCollapser

com.netflix.hystrix.collapser.RequestCollapser命令请求 合并器。主要用于 :

  • 提交 单个 命令请求到请求队列( RequestQueue )。
  • 接收 来自定时任务 提交的 多个 命令,合并执行。

4.1 构造方法

RequestCollapser 构造方法 ,代码如下 :

public class RequestCollapser {

    private final HystrixCollapserBridge commandCollapser;
    // batch can be null once shutdown
    private final AtomicReference<RequestBatch> batch = new AtomicReference<RequestBatch>();
    private final AtomicReference<Reference> timerListenerReference = new AtomicReference<Reference>();
    private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();
    private final CollapserTimer timer;
    private final HystrixCollapserProperties properties;
    private final HystrixConcurrencyStrategy concurrencyStrategy;
    
    RequestCollapser(HystrixCollapserBridge commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {
        this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need 
        this.concurrencyStrategy = concurrencyStrategy;
        this.properties = properties;
        this.timer = timer;
        batch.set(new RequestBatch(properties, commandCollapser, properties.maxRequestsInBatch().get()));
    }

}
  • commandCollapser 属性, 命令 合并器包装器。
  • batch 属性,RequestBatch, 即是本文一直说的请求队列 。在也会详细解析。
  • timerListenerReference 属性, 注册 在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个 监听器。该监听器( 实际上会使用该监听器创建定时任务 ) 固定周期 从请求队列获取 多个 命令执行,提交 RequestCollapser 合并执行。在也会详细解析。
  • timerListenerRegistered 属性, timerListenerReference 是否已经注册。
  • timer 属性,命令合并器的定时器。
  • properties 属性,命令合并器属性配置。
  • concurrencyStrategy 属性,并发策略。

4.2 RequestBatch

com.netflix.hystrix.collapser.RequestBatch ,命令请求队列。提供如下功能 :

  • 命令请求的添加
  • 命令请求的移除
  • 命令请求的 批量执行 。笔者把 RequestBatch 解释成 “命令请求队列”,主要方便大家理解。
    • 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」详细解析。

RequestBatch 构造方法 ,代码如下 :

public class RequestBatch {

    private final HystrixCollapserBridge commandCollapser;
    private final int maxBatchSize;
    private final AtomicBoolean batchStarted = new AtomicBoolean();

    private final ConcurrentMap<RequestArgumentType, CollapsedRequest> argumentMap =
            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest>();
    private final HystrixCollapserProperties properties;

    private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

    public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge commandCollapser, int maxBatchSize) {
        this.properties = properties;
        this.commandCollapser = commandCollapser;
        this.maxBatchSize = maxBatchSize;
    }
}
  • commandCollapser 属性, 命令 合并器包装器。
  • maxBatchSize 属性,队列最大长度。
  • batchStarted 属性,执行是否开始。
  • argumentMap 属性,命令请求参数映射( 队列 )。
  • properties 属性,命令合并器属性配置。
  • batchLock 属性, argumentMap 操作的 读写锁

RequestBatch 实现队列具体的操作方法,在「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」一起解析。

4.3 #submitRequest(arg)

#toObservable() 方法里,调用 #submitRequest(arg) 方法,提交 单个 命令请求到 RequestBatch 。代码如下 :

1: public Observable submitRequest(final RequestArgumentType arg) {
  2:     /*
  3:      * We only want the timer ticking if there are actually things to do so we register it the first time something is added.
  4:      */
  5:     if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {
  6:         /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
  7:         timerListenerReference.set(timer.addListener(new CollapsedTask()));
  8:     }
  9: 
 10:     // loop until succeed (compare-and-set spin-loop)
 11:     while (true) {
 12:         // 获得 RequestBatch
 13:         final RequestBatch b = batch.get();
 14:         if (b == null) {
 15:             return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
 16:         }
 17: 
 18:         // 添加到 RequestBatch
 19:         final Observable response;
 20:         if (arg != null) {
 21:             response = b.offer(arg);
 22:         } else {
 23:             response = b.offer( (RequestArgumentType) NULL_SENTINEL);
 24:         }
 25: 
 26:         // 添加成功,返回 Observable
 27:         // it will always get an Observable unless we hit the max batch size
 28:         if (response != null) {
 29:             return response;
 30:         } else {
 31:             // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch
 32:             // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
 33:             createNewBatchAndExecutePreviousIfNeeded(b);
 34:         }
 35:     }
 36: }
  • 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
  • 第 11 至 35 行 : 死循环 ,直到提交 单个 命令请求到 RequestBatch 成功
    • 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被 #shutdown() 后才会出现为 null 的情况。
    • 第 19 至 24 行 :调动 RequestBatch#offer(...) 方法,提交 单个 命令请求到 RequestBatch ,并获得 Observable 。这里对 arg == null 做了特殊处理,因为 RequestBatch.argumentMap 是 ConcurrentHashMap ,不允许值为 null 。另外, RequestBatch#offer(...) 方法的实现代码,在结束了当前方法,详细解析。
    • 第 28 至 29 行 :添加成功,返回 Observable 。
    • 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的 多个 命令合并执行,并创建 新的 RequestBatch 。在「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」详细解析。

RequestBatch#offer(...) 方法,代码如下 :

1: public Observable  offer(RequestArgumentType arg) {
  2:     // 执行已经开始,添加失败
  3:     /* short-cut - if the batch is started we reject the offer */
  4:     if (batchStarted.get()) {
  5:         return null;
  6:     }
  7: 
  8:     /*
  9:      * The 'read' just means non-exclusive even though we are writing.
 10:      */
 11:     if (batchLock.readLock().tryLock()) {
 12:         try {
 13:             // 执行已经开始,添加失败
 14:             /* double-check now that we have the lock - if the batch is started we reject the offer */
 15:             if (batchStarted.get()) {
 16:                 return null;
 17:             }
 18: 
 19:             // 超过队列最大长度,添加失败
 20:             if (argumentMap.size() >= maxBatchSize) {
 21:                 return null;
 22:             } else {
 23:                 // 创建 CollapsedRequestSubject ,并添加到队列
 24:                 CollapsedRequestSubject collapsedRequest = new CollapsedRequestSubject(arg, this);
 25:                 final CollapsedRequestSubject existing = (CollapsedRequestSubject) argumentMap.putIfAbsent(arg, collapsedRequest);
 26:                 /**
 27:                  * If the argument already exists in the batch, then there are 2 options:
 28:                  * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
 29:                  * be hooked up to that argument
 30:                  * B) If request caching is OFF: return an error to all duplicate argument requests
 31:                  *
 32:                  * This maintains the invariant that each batch has no duplicate arguments.  This prevents the impossible
 33:                  * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
 34:                  * of trying to figure out which argument of a set of duplicates should get attached to a response.
 35:                  *
 36:                  * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
 37:                  */
 38:                 if (existing != null) {
 39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();
 40:                     if (requestCachingEnabled) {
 41:                         return existing.toObservable();
 42:                     } else {
 43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "]  This is not supported.  Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
 44:                     }
 45:                 } else {
 46:                     return collapsedRequest.toObservable();
 47:                 }
 48: 
 49:             }
 50:         } finally {
 51:             batchLock.readLock().unlock();
 52:         }
 53:     } else {
 54:         return null;
 55:     }
 56: }
  • 第 4 至 6 行 :执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法的开头,优先 CAS 使 batchStarted = true

  • 第 11 行 :获得 读锁The 'read' just means non-exclusive even though we are writing. ,即使该方法实际在做**”写操作”**,不排他,线程安全,所以可以使用读锁。

  • 第 15 至 17 行 : double-check ,执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法,优先 CAS 使 batchStarted = true ,再获取 写锁 ,所以会出现该情况。

  • 第 20 至 21 行 :超过队列最大长度,添加失败。

  • 第 24 至 25 行 :创建 com.netflix.hystrix.collapser.CollapsedRequestSubject ,并将 添加到队列( argumentMap ) 。

    • CollapsedRequestSubject 实现 com.netflix.hystrix.HystrixCollapser.CollapsedRequest 接口 ,定义了批量命令执行的 请求 ,不仅限于获得请求参数( #getArgument() 方法 ),也包括对批量命令执行结束后,每个 请求 的结果设置( #setResponse(...) / #emitResponse(...) / #setException(...) / #setComplete() 方法 ),点击 链接 查看该接口的代码。

    • CollapsedRequestSubject 构造方法 ,代码如下:

      /* package */class CollapsedRequestSubject implements CollapsedRequest {
      
          /**
           * 参数
           */
          private final R argument;
      
          /**
           * 结果( response ) 是否设置
           */
          private AtomicBoolean valueSet = new AtomicBoolean(false);
          /**
           * 可回放的 ReplaySubject
           */
          private final ReplaySubject subject = ReplaySubject.create();
          /**
           * 带订阅数量的 ReplaySubject
           */
          private final Observable subjectWithAccounting;
      
          /**
           * 订阅数量
           */
          private volatile int outstandingSubscriptions = 0;
      
          public CollapsedRequestSubject(final R arg, final RequestBatch containingBatch) {
              // 设置 argument
              if (arg == RequestCollapser.NULL_SENTINEL) {
                  this.argument = null;
              } else {
                  this.argument = arg;
              }
              // 设置 带订阅数量的 ReplaySubject
              this.subjectWithAccounting = subject
                      .doOnSubscribe(new Action0() {
                          @Override
                          public void call() {
                              outstandingSubscriptions++;
                          }
                      })
                      .doOnUnsubscribe(new Action0() {
                          @Override
                          public void call() {
                              outstandingSubscriptions--;
                              if (outstandingSubscriptions == 0) {
                                  containingBatch.remove(arg);
                              }
                          }
                      });
          }
      }
      • argument 属性, 单个 命令请求参数。
      • valueSet 属性,结果( Response ) 是否设置,通过 #setResponse() / #emitResponse() 方法设置。
      • subject 属性, 可回放执行结果 的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启 缓存 功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并 没有 向任何 Observable 订阅结果, 而是通过 #setResponse() / #emitResponse() 方法设置结果
      • outstandingSubscriptions 属性,订阅数量。
      • subjectWithAccounting 属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用 RequestBatch#remove(arg) 方法,移除 单个 命令请求。
  • 第 38 至 47 行 :返回 Observable 。

    • argumentMap 已经存在 arg 对应的 Observable 时,必须开启缓存 ( HystrixCollapserProperties.requestCachingEnabled = true ) 功能。原因是,如果在 相同的 arg ,并且未开启缓存,同时 第 43 行 实现的是 collapsedRequest.toObservable() ,那么 相同的 arg 将有 多个 Observable 执行命令,此时 HystrixCollapserBridge#mapResponseToRequests(...) 方法无法将执行( Response )赋值到 arg 对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 github.com/Netflix/Hys…
    • 回过头看 HystrixCollapser#toObservable() 方法的 第 32 至 41 行的代码 ,这里也有对 缓存 功能,是不是 重复 了呢? argumentMap 针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是 1 : 1 : N 的关系,通过 HystrixCollapser#toObservable() 对缓存的处理逻辑,保证 RequestBatch 切换后, 依然有缓存

RequestBatch#remove() 方法,代码如下 :

/* package-private */ void remove(RequestArgumentType arg) {
    if (batchStarted.get()) {
        //nothing we can do
        return;
    }

    if (batchLock.readLock().tryLock()) {
        try {
            /* double-check now that we have the lock - if the batch is started, deleting is useless */
            if (batchStarted.get()) {
                return;
            }

            argumentMap.remove(arg);
        } finally {
            batchLock.readLock().unlock();
        }
    }
}
  • 当 RequestBatch 开始执行,不允许移除 单个 命令请求。

4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

本小节建议在后,再回过头看。

#createNewBatchAndExecutePreviousIfNeeded(previousBatch) 方法,代码如下 :

1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch previousBatch) {
  2:     if (previousBatch == null) {
  3:         throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");
  4:     }
  5:     if (batch.compareAndSet(previousBatch, new RequestBatch(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {
  6:         // this thread won so trigger the previous batch
  7:         previousBatch.executeBatchIfNotAlreadyStarted();
  8:     }
  9: }
  • 第 5 行 :通过 CAS 修改 batch ,保证并发情况下的线程安全。同时注意,此处也进行了 新的 RequestBatch ,切换掉 老的 RequestBatch 。
  • 第 6 行 :使用 老的 RequestBatch ,调用 RequestBatch#executeBatchIfNotAlreadyStarted() 方法,命令合并执行。

RequestBatch#executeBatchIfNotAlreadyStarted() 方法,代码如下 :

1: public void executeBatchIfNotAlreadyStarted() {
  2:     /*
  3:      * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit)
  4:      * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch
  5:      */
  6:     // 设置 执行已经开始
  7:     if (batchStarted.compareAndSet(false, true)) {
  8:         // 获得 写锁
  9:         /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */
 10:         batchLock.writeLock().lock();
 11: 
 12:         try {
 13:             // 将多个命令请求分片成 N 个【多个命令请求】。
 14:             // shard batches
 15:             Collection<Collection<CollapsedRequest>> shards = commandCollapser.shardRequests(argumentMap.values());
 16:             // for each shard execute its requests 
 17:             for (final Collection<CollapsedRequest> shardRequests : shards) {
 18:                 try {
 19:                     // 将多个命令请求合并,创建一个 HystrixCommand
 20:                     // create a new command to handle this batch of requests
 21:                     Observable o = commandCollapser.createObservableCommand(shardRequests);
 22: 
 23:                     // 将一个 HystrixCommand 的执行结果,映射回对应的命令请求们
 24:                     commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1() {
 25: 
 26:                         /**
 27:                          * This handles failed completions
 28:                          */
 29:                         @Override
 30:                         public void call(Throwable e) {
 31:                             // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted
 32:                             Exception ee;
 33:                             if (e instanceof Exception) {
 34:                                 ee = (Exception) e;
 35:                             } else {
 36:                                 ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);
 37:                             }
 38:                             logger.debug("Exception mapping responses to requests.", e);
 39:                             // if a failure occurs we want to pass that exception to all of the Futures that we've returned
 40:                             for (CollapsedRequest request : argumentMap.values()) {
 41:                                 try {
 42:                                     ((CollapsedRequestSubject) request).setExceptionIfResponseNotReceived(ee);
 43:                                 } catch (IllegalStateException e2) {
 44:                                     // if we have partial responses set in mapResponseToRequests
 45:                                     // then we may get IllegalStateException as we loop over them
 46:                                     // so we'll log but continue to the rest
 47:                                     logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
 48:                                 }
 49:                             }
 50:                         }
 51: 
 52:                     }).doOnCompleted(new Action0() {
 53: 
 54:                         /**
 55:                          * This handles successful completions
 56:                          */
 57:                         @Override
 58:                         public void call() {
 59:                             // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
 60:                             Exception e = null;
 61:                             for (CollapsedRequest request : shardRequests) {
 62:                                 try {
 63:                                    e = ((CollapsedRequestSubject) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");
 64:                                 } catch (IllegalStateException e2) {
 65:                                     logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
 66:                                 }
 67:                             }
 68:                         }
 69: 
 70:                     }).subscribe();
 71:                     
 72:                 } catch (Exception e) {
 73:                     // 异常
 74:                     logger.error("Exception while creating and queueing command with batch.", e);
 75:                     // if a failure occurs we want to pass that exception to all of the Futures that we've returned
 76:                     for (CollapsedRequest request : shardRequests) {
 77:                         try {
 78:                             request.setException(e);
 79:                         } catch (IllegalStateException e2) {
 80:                             logger.debug("Failed trying to setException on CollapsedRequest", e2);
 81:                         }
 82:                     }
 83:                 }
 84:             }
 85: 
 86:         } catch (Exception e) {
 87:             // 异常
 88:             logger.error("Exception while sharding requests.", e);
 89:             // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails
 90:             for (CollapsedRequest request : argumentMap.values()) {
 91:                 try {
 92:                     request.setException(e);
 93:                 } catch (IllegalStateException e2) {
 94:                     logger.debug("Failed trying to setException on CollapsedRequest", e2);
 95:                 }
 96:             }
 97:         } finally {
 98:             batchLock.writeLock().unlock();
 99:         }
100:     }
101: }
  • 代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
  • 第 7 行 :通过 CAS 修改 batchStarted ,保证并发情况下的线程安全。
  • 第 10 行 :获得 写锁 。等待调用 #offer(...) / #remove(...) 方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。
  • 第 15 行 :调用 HystrixCollapserBridge#shardRequests(...) 方法,将 多个 命令请求 分片N 个【 多个 命令请求】。默认实现下,不进行分片。点击 链接 查看代码。
  • 第 17 行 :循环 N 个【 多个 命令请求】。
  • 第 21 行 :调用 HystrixCollapserBridge#createObservableCommand(...) 方法,将 多个 命令请求 合并 ,创建 一个 HystrixCommand 。点击 链接 查看代码。
  • 第 24 行 :调用 HystrixCollapserBridge#mapResponseToRequests(...) 方法,将 一个 HystrixCommand 的执行结果, 映射 回对应的命令请求们。点击 链接 查看代码。
    • Observable#single() 方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。
    • Observable#ignoreElements() 方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知( #onError()#onCompleted() )通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击 rx.internal.operators.OperatorIgnoreElements 看下源码,可能更加易懂。
    • Observable#cast() 方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是 map 的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击 rx.internal.operators.OperatorCast 看下源码,可能更加易懂。
    • 使用 Observable#ignoreElements() / Observable#cast() 方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里 Observable#doNext() 处理数据项,调用 HystrixCollapser#mapResponseToRequests(...) 方法。
    • 点击 链接 ,查看 CollapsedRequestSubject#setResponse(response) 方法的代码。
  • 第 24 至 50 行 :调用 Observable#doError(Action1) 方法,当命令合并执行发生异常时,设置 每个 CollapsedRequestSubject 的执行结果为异常。
    • 点击 链接 ,查看 CollapsedRequestSubject#setResponse(response) 方法的代码。
  • 第 52 至 68 行 :调用 Observable#doOnCompleted(Action0) 方法,当命令合并执行完成时,检查 每个 CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现 HystrixCollapser#mapResponseToRequests(...) 方法存在 BUG 。另外,如果不设置,将导致无结果的 单个 命令请求 无限阻塞
  • 第 70 行 :调用 Observable#subscribe() 方法, 触发 HystrixCommand 执行。
  • 第 72 至 96 行 :发生异常,设置 每个 CollapsedRequestSubject 的执行结果为异常。
    • 点击 链接 ,查看 CollapsedRequestSubject#setException(response) 方法的代码。
  • 第 97 至 99 行 :释放 写锁

5. CollapserTimer

com.netflix.hystrix.collapser.CollapserTimer ,命令合并器的定时器 接口 ,定义了 提交定时监听器,生成定时任务 的接口方法,代码如下 :

public interface CollapserTimer {

    Reference addListener(TimerListener collapseTask);
}

5.1 RealCollapserTimer

com.netflix.hystrix.collapser.RealCollapserTimer ,命令合并器的定时器 实现类 ,代码如下 :

public class RealCollapserTimer implements CollapserTimer {
    /* single global timer that all collapsers will schedule their tasks on */
    private final static HystrixTimer timer = HystrixTimer.getInstance();

    @Override
    public Reference addListener(TimerListener collapseTask) {
        return timer.addTimerListener(collapseTask);
    }

}

5.2 CollapsedTask

com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask ,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds = 10ms ) 轮询其对应的 一个 RequestCollapser 当前 RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。

代码比较简单,点击 链接 直接看代码。

稀土掘金稿源:稀土掘金 (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » Hystrix 源码解析 —— 命令合并执行

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录