Javascript微服务的断路器模式

Javascript微服务的断路器模式

2019-09-13 21:28:47发布 浏览数:1.1万
概述:断路器是执行代理,可帮助应用程序以更安全,更可预测的方式处理故障。 通常,断路器被定义为“自动操作的电气系统,其被设计用于保护电路免受由过载或短路的过电流引起的损坏”。 在我们的例子中,电路将是请求和响应周期。断路器模式可以帮助我们确定上游服务(API,数据库等)的可用性。当服务的错误或超时计数超过预定义阈值时,它应该跳闸以停止对从属服务的应用程序请求一段时间。


他们是如何工作的?

断路器状态


CLOSED:资源尚未尝试或已经过尝试和可用

OPEN:资源被尝试并且不可用,断路器跳闸

HALF OPEN:满足等待阈值,再次尝试资源


滚动窗口(概念)


它在幕后有一个滚动窗口的概念(滚动窗口基于Netflix的Hystrix,),这就是断路器跟踪成功和失败的方式。所以我们这里有一个这个窗口的虚线框,它是一个可配置的时间段,通常为10到15秒,它显示右侧“Now”下面的当前桶,然后是前面的X个桶,它们被限制在左侧。在其他语言中,存储桶是线程池,但在我们的情况下(节点js),这些是给定时间段的请求的集合。然后我们还有铲斗跨度,即铲斗在其加盖前打开多长时间,并在右侧窗口中添加一个新的空铲斗,并将左侧的最后一个铲斗放入垃圾箱。此外,我要强调的最后一件事是容量,它是允许的并发请求数,因此如果铲斗跨度中的总请求数达到了容量,则断路器将跳闸。


class NodeCircuitBreaker {
 constructor(opts){
  this.windowDuration = opts.windowDuration  || 10000
  this.numberOfBuckets = opts.numberOfBuckets || 10
  this.timeout = opts.timeout || 3000
  this.errorThreshold  = opts.errorThreshold  || 50
  this.volumeThreshold = opts.volumeThreshold || 5
  this.onCircuitOpen   = opts.onCircuitOpen   || function() {}
  this.onCircuitClose  = opts.onCircuitClose  || function() {}
  this.buckets = [this.createBucket()]
  this.state = CircuitBreaker.CLOSED
  this.start()
 }
}


创建断路器的新实例。接受以下配置选项:

windowDuration:滚动窗口的持续时间(以毫秒为单位)。

numberOfBuckets:滚动窗口被分解的桶数。

timeout:命令超时后的时间(以毫秒为单位)

errorThreshold:电路应跳闸并开始短路请求回退逻辑的错误百分比

volumeThreshold:在跳闸电路之前所需的滚动窗口中的最小请求数


static OPEN = 0
static CLOSED = 2
static HALF_OPEN = 1
createBucket(){
    return {
        failures: 0,
        successes: 0,
        timeouts: 0,
        shortCircuits: 0
    }
}
isOpen(){
    return this.state === CircuitBreaker.OPEN
}
destroy(){
    clearInterval(this.interval)
}
forceClose(){
    this.forced = this.state
    this.state = CircuitBreaker.CLOSED
}
forceOpen(){
    this.forced = this.state
    this.state = CircuitBreaker.OPEN
}
unforce(){
    this.state = this.forced
    this.forced = null
}
lastBucket(){
    return this.buckets[this.buckets.length-1]
}


isOpen:检查断路器当前是否正在接受请求

forceOpen:强制电路打开

forceClose:强制关闭电路

unforce:将电路返回到上一个非强制状态


tick(){
    let bucketIdx = 0

    if (this.buckets.length > this.numOfBuckets){
        this.buckets.shift()
    }

    bucketIdx++

    if (bucketIdx > this.numOfBuckets){
        bucketIdx = 0
        this.state = this.isOpen() && CircuitBreaker.HALF_OPEN
    }
    this.buckets.push(this.createBucket())
}
start(){
    const bucketDuration = this.windowDuration/this.numOfBuckets
    this.interval = setInterval(this.tick, bucketDuration)
}
exec(command, fallback) {
    if (this.isOpen()){
       this.executeFallback(fallback)
    }else {
        this.executeCommand(command)
    }
}


如果电路关闭则执行命令,否则默认为回退(如果提供)。使用成功和失败处理程序调用该命令,您需要在命令的适当位置调用该命令。


increment(prop, timeout){
    return () => {
        if (!timeout){
            return
        }
        const bucket = this.lastBucket()
        bucket[prop]++

        if (this.forced === null){
            this.updateState()
        }
        clearTimeout(timeout)
        timeout = null
    }
}
executeCommand(command){
    let timeout
    timeout = setTimeout(this.increment('timeouts', timeout), this.timeoutDuration)
    command(this.increment('successes', timeout), this.increment('failure', timeout))
}

executeFallback(fallback){
    fallback()
    const bucket = this.lastBucket()
    bucket.shortCircuits++
}


根据滚动窗口上的请求计算指标


calculateMetrics(){
    let totalCount = 0, errorCount = 0, errorPercentage

    for(let i =0; i<this.buckets.length; i++){
        errorCount += this.buckets[i].failures + this.buckets[i].timeouts
        totalCount += errorCount+this.buckets[i].successes
    }
    errorPercentage = (errorCount/(totalCount > 0 ? totalCount : 1))*100

    return {totalCount, errorCount, errorPercentage}
}


如果没有强制执行状态更改,则更新状态


updateState(){
    const metrics = this.calculateMetrics()
    if (this.state === CircuitBreaker.HALF_OPEN){
        const lastCommandFailed = !this.lastBucket().successes && metrics.errorCount > 0
        if (lastCommandFailed){
            this.state = CircuitBreaker.OPEN
            this.onCircuitOpen(metrics)
        }else{
            this.state = CircuitBreaker.CLOSED
            this.onCircuitClose(metrics)
        }
    }else {
        const overErrThreshold = metrics.errorPercentage > this.errorThreshold
        const overVolThreshold = metrics.errorCount > this.volumeThreshold

        if (overErrThreshold && overVolThreshold){
            this.state = CircuitBreaker.OPEN
            this.onCircuitOpen(metrics)
        }
    }
}


onCircuitOpen(metrics):电路打开时运行的函数。

onCircuitClose(metrics):电路关闭时运行的函数。

用法:


const axios = require('axios')
const cb = new CircuitBreaker()

async function getUser() {
 try {
  const response = await axios.get('/user?ID=12345');
  console.log(response);
 } catch (error) {
  console.error(error);
 }
}
const command = (success, failed)=> {
  getUser()
    .then(success)
    .catch(failed);
}
cb.exec(command, fallback)








请先
登录
后评论
1 条评论
评论于 · 2020-01-23 16:34:55

膜拜大神!

最新文章
更多