概述:断路器是执行代理,可帮助应用程序以更安全,更可预测的方式处理故障。 通常,断路器被定义为“自动操作的电气系统,其被设计用于保护电路免受由过载或短路的过电流引起的损坏”。 在我们的例子中,电路将是请求和响应周期。断路器模式可以帮助我们确定上游服务(API,数据库等)的可用性。当服务的错误或超时计数超过预定义阈值时,它应该跳闸以停止对从属服务的应用程序请求一段时间。
他们是如何工作的?
断路器状态
CLOSED:资源尚未尝试或已经过尝试和可用
OPEN:资源被尝试并且不可用,断路器跳闸
HALF OPEN:满足等待阈值,再次尝试资源
滚动窗口(概念)
它在幕后有一个滚动窗口的概念(滚动窗口基于Netflix的Hystrix,),这就是断路器跟踪成功和失败的方式。所以我们这里有一个这个窗口的虚线框,它是一个可配置的时间段,通常为10到15秒,它显示右侧“Now”下面的当前桶,然后是前面的X个桶,它们被限制在左侧。在其他语言中,存储桶是线程池,但在我们的情况下(节点js),这些是给定时间段的请求的集合。然后我们还有铲斗跨度,即铲斗在其加盖前打开多长时间,并在右侧窗口中添加一个新的空铲斗,并将左侧的最后一个铲斗放入垃圾箱。此外,我要强调的最后一件事是容量,它是允许的并发请求数,因此如果铲斗跨度中的总请求数达到了容量,则断路器将跳闸。
class NodeCircuitBreaker { constructor(opts){ this.windowDuration = opts.windowDuration || 10000 this.numberOfBuckets = opts.numberOfBuckets || 10 this.timeout = opts.timeout || 3000 this.errorThreshold = opts.errorThreshold || 50 this.volumeThreshold = opts.volumeThreshold || 5 this.onCircuitOpen = opts.onCircuitOpen || function() {} this.onCircuitClose = opts.onCircuitClose || function() {} this.buckets = [this.createBucket()] this.state = CircuitBreaker.CLOSED this.start() } }
创建断路器的新实例。接受以下配置选项:
windowDuration:滚动窗口的持续时间(以毫秒为单位)。
numberOfBuckets:滚动窗口被分解的桶数。
timeout:命令超时后的时间(以毫秒为单位)
errorThreshold:电路应跳闸并开始短路请求回退逻辑的错误百分比
volumeThreshold:在跳闸电路之前所需的滚动窗口中的最小请求数
static OPEN = 0 static CLOSED = 2 static HALF_OPEN = 1 createBucket(){ return { failures: 0, successes: 0, timeouts: 0, shortCircuits: 0 } } isOpen(){ return this.state === CircuitBreaker.OPEN } destroy(){ clearInterval(this.interval) } forceClose(){ this.forced = this.state this.state = CircuitBreaker.CLOSED } forceOpen(){ this.forced = this.state this.state = CircuitBreaker.OPEN } unforce(){ this.state = this.forced this.forced = null } lastBucket(){ return this.buckets[this.buckets.length-1] }
isOpen:检查断路器当前是否正在接受请求
forceOpen:强制电路打开
forceClose:强制关闭电路
unforce:将电路返回到上一个非强制状态
tick(){ let bucketIdx = 0 if (this.buckets.length > this.numOfBuckets){ this.buckets.shift() } bucketIdx++ if (bucketIdx > this.numOfBuckets){ bucketIdx = 0 this.state = this.isOpen() && CircuitBreaker.HALF_OPEN } this.buckets.push(this.createBucket()) } start(){ const bucketDuration = this.windowDuration/this.numOfBuckets this.interval = setInterval(this.tick, bucketDuration) } exec(command, fallback) { if (this.isOpen()){ this.executeFallback(fallback) }else { this.executeCommand(command) } }
如果电路关闭则执行命令,否则默认为回退(如果提供)。使用成功和失败处理程序调用该命令,您需要在命令的适当位置调用该命令。
increment(prop, timeout){ return () => { if (!timeout){ return } const bucket = this.lastBucket() bucket[prop]++ if (this.forced === null){ this.updateState() } clearTimeout(timeout) timeout = null } } executeCommand(command){ let timeout timeout = setTimeout(this.increment('timeouts', timeout), this.timeoutDuration) command(this.increment('successes', timeout), this.increment('failure', timeout)) } executeFallback(fallback){ fallback() const bucket = this.lastBucket() bucket.shortCircuits++ }
根据滚动窗口上的请求计算指标
calculateMetrics(){ let totalCount = 0, errorCount = 0, errorPercentage for(let i =0; i<this.buckets.length; i++){ errorCount += this.buckets[i].failures + this.buckets[i].timeouts totalCount += errorCount+this.buckets[i].successes } errorPercentage = (errorCount/(totalCount > 0 ? totalCount : 1))*100 return {totalCount, errorCount, errorPercentage} }
如果没有强制执行状态更改,则更新状态
updateState(){ const metrics = this.calculateMetrics() if (this.state === CircuitBreaker.HALF_OPEN){ const lastCommandFailed = !this.lastBucket().successes && metrics.errorCount > 0 if (lastCommandFailed){ this.state = CircuitBreaker.OPEN this.onCircuitOpen(metrics) }else{ this.state = CircuitBreaker.CLOSED this.onCircuitClose(metrics) } }else { const overErrThreshold = metrics.errorPercentage > this.errorThreshold const overVolThreshold = metrics.errorCount > this.volumeThreshold if (overErrThreshold && overVolThreshold){ this.state = CircuitBreaker.OPEN this.onCircuitOpen(metrics) } } }
onCircuitOpen(metrics):电路打开时运行的函数。
onCircuitClose(metrics):电路关闭时运行的函数。
用法:
const axios = require('axios') const cb = new CircuitBreaker() async function getUser() { try { const response = await axios.get('/user?ID=12345'); console.log(response); } catch (error) { console.error(error); } } const command = (success, failed)=> { getUser() .then(success) .catch(failed); } cb.exec(command, fallback)