重试利器之GuavaRetrying转载
原创-
- *
重试的使用场景
在许多业务场景中,为了消除系统中的各种不稳定因素以及逻辑错误,并以最大概率确保预期结果,重试机制是必不可少的。
特别是在调用远程服务时,在高并发场景下,很可能会因为服务器响应延迟或网络原因而得不到期望的结果或根本得不到响应。此时,优雅的重试调用机制使我们有更大的可能性获得预期的响应。
通常,我们会通过 定时任务 再试试。例如,如果操作失败,则会被记录下来。当定时任务再次启动时,数据将被放入定时任务的方法中并再次运行。最后,直到得到想要的结果。
无论是基于定时任务的重试机制,还是自行编写的简单重试设备,缺点都是重试机制过于单一,难以实现。
如何优雅地设计重试实现。。
一个完整的重试实施应该能很好地解决以下问题:
- 在什么情况下应该再次尝试
- 在什么情况下应该停下来
- 如何停止重试
- 别再试了,等多久
- 如何等待
- 请求时间限制
- 如何结束
- 如何监听整个重试过程。
而且,为了更好地封装,重试的实现一般分为两个步骤:
- 使用工厂模式构造重试。
- 执行重试方法并获得结果。
可以简单地指示完整的重试过程:
guava-retrying基础用法
guava-retrying基于Google的核心类库guava重试机制的实施可以说是重试的利器。
让我们快速了解一下它的用法。
1.Maven配置
com.github.rholder
guava-retrying
2.0.0
复制代码
需要注意的是,此版本依赖于27.0.1版本的guava。如果在您的项目中guava一些较低的版本是可以的,但太多的较低版本是不兼容的。此时,您需要升级您的项目guava版本,或简单地删除您自己的guava依赖性,使用guava-retrying跳过guava依赖。
2.实现Callable
Callable callable = new Callable() {
public Boolean call() throws Exception {
return true; // do something useful here
}
};
复制代码
Callable的call方法是您自己的实际业务调用。
-
通过RetryerBuilder构造Retryer
Retryer
retryer = RetryerBuilder. newBuilder() .retryIfResult(Predicates. isNull()) .retryIfExceptionOfType(IOException.class) .retryIfRuntimeException() .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .build(); 复制代码 -
使用重试来执行您的业务。
retryer.call(callable); 复制代码
以下是完整的参考实现。
public Boolean test() throws Exception {
//定义重试机制
Retryer retryer = RetryerBuilder.newBuilder()
//retryIf 重试条件
.retryIfException()
.retryIfRuntimeException()
.retryIfExceptionOfType(Exception.class)
.retryIfException(Predicates.equalTo(new Exception()))
.retryIfResult(Predicates.equalTo(false))
//等待策略:每个请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//停止策略 : 尝试请求6次
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
//时间限制 : 一个请求不能超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
.build();
//定义请求实现
Callable callable = new Callable() {
int times = 1;
@Override
public Boolean call() throws Exception {
log.info("call times={}", times);
times++;
if (times == 2) {
throw new NullPointerException();
} else if (times == 3) {
throw new Exception();
} else if (times == 4) {
throw new RuntimeException();
} else if (times == 5) {
return false;
} else {
return true;
}
}
};
//调用带有重试的请求
return retryer.call(callable);
}
复制代码
guava-retrying实现原理
guava-retrying的核心是Attempt类、Retryer类以及一些Strategy(策略)相关类。
- Attempt
Attempt它不仅仅是一个重试请求(call),它也是请求的结果,并记录当前请求的次数、是否包含异常以及请求的返回值。
/**
* An attempt of a call, which resulted either in a result returned by the call,
* or in a Throwable thrown by the call.
*
* @param The type returned by the wrapped callable.
* @author JB
*/
public interface Attempt
复制代码
- Retryer
Retryer通过RetryerBuilder这个工厂类是构造的。RetryerBuilder负责将定义的重试策略分配给Retryer对象中。
在Retryer执行call方法时,将逐个使用这些重试策略。
让我们来看看它Retryer的call该方法的具体实现。
/**
* Executes the given callable. If the rejection predicate
* accepts the attempt, the stop strategy is used to decide if a new attempt
* must be made. Then the wait strategy is used to decide how much time to sleep
* and a new attempt is made.
*
* @param callable the callable task to be executed
* @return the computed result of the given callable
* @throws ExecutionException if the given callable throws an exception, and the
* rejection predicate considers the attempt as successful. The original exception
* is wrapped into an ExecutionException.
* @throws RetryException if all the attempts failed before the stop strategy decided
* to abort, or the thread was interrupted. Note that if the thread is interrupted,
* this exception is thrown and the threads interrupt status is set.
*/
public V call(Callable callable) throws ExecutionException, RetryException {
long startTime = System.nanoTime();
//说明: 根据attemptNumber循环-也就是再试几次
for (int attemptNumber = 1; ; attemptNumber++) {
//注意:Entry方法不等待,立即执行一次。
Attempt attempt;
try {
//描述:执行callable特定业务在
//attemptTimeLimiter限制每次尝试的等待频率。
V result = attemptTimeLimiter.call(callable);
//用调用的结果构造新的。attempt
attempt = new ResultAttempt(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (Throwable t) {
attempt = new ExceptionAttempt(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
//描述:遍历自定义监听程序
for (RetryListener listener : listeners) {
listener.onRetry(attempt);
}
//描述:判断是否满足重试条件,决定是否继续等待重试。
if (!rejectionPredicate.apply(attempt)) {
return attempt.get();
}
//注意:此时满足停止策略,因为没有获得期望的结果,抛出异常。
if (stopStrategy.shouldStop(attempt)) {
throw new RetryException(attemptNumber, attempt);
} else {
//描述:执行默认的停止策略——线程休眠
long sleepTime = waitStrategy.computeSleepTime(attempt);
try {
//注意:还可以实施定义的停止策略。
blockStrategy.block(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RetryException(attemptNumber, attempt);
}
}
}
}
复制代码
Retryer具体实施过程如下。
guava-retrying高级用法
基于guava-retrying我们可以根据实际业务确定重试策略。
下面以 数据同步
例如,该一般系统业务定制重试策略。
以下实现基于Spring Boot 2.1.2.RELEASE版本。
并使用Lombok简化Bean。
org.projectlombok
lombok
true
复制代码
业务描述
当商品被创造出来时,商品的价格需要单独设定。由于两个操作是由两个人执行的,所以存在商品没有创建,但价格数据已经建立的问题。在这种情况下,价格数据需要等待商品正常创建并继续完成同步。
我们通过了一个http请求创建物品,并使用计时器修改物品的价格。
商品不存在,或者商品数量较少1货物的价格不能定。需要成功创建的项目数更多0只有在商品的价格被成功设定的情况下。
实现过程
- 自定义重试阻止策略
默认的阻塞策略是线程休眠,它是使用旋转锁实现的,不会阻塞线程。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy;
import com.github.rholder.retry.BlockStrategy;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* 自旋锁的实现, 不响应线程中断
*/
@Slf4j
@NoArgsConstructor
public class SpinBlockStrategy implements BlockStrategy {
@Override
public void block(long sleepTime) throws InterruptedException {
LocalDateTime startTime = LocalDateTime.now();
long start = System.currentTimeMillis();
long end = start;
log.info("[SpinBlockStrategy]...begin wait.");
while (end - start <= sleepTime) {
end = System.currentTimeMillis();
}
//使用Java8新增的Duration计算时间间隔
Duration duration = Duration.between(startTime, LocalDateTime.now());
log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis());
}
}
复制代码
- 自定义重试监听程序
RetryListener可以监控多个重试程序,并可以使用 attempt
做一些额外的事情。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RetryLogListener implements RetryListener {
@Override
public void onRetry(Attempt attempt) {
// 多少次重试,(注意:第一次重试实际上是第一次调用)
log.info("retry time : [{}]", attempt.getAttemptNumber());
// 第一次重试的延迟
log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt());
// 重试结果: 是否异常终止, 或正常回报
log.info("hasException={}", attempt.hasException());
log.info("hasResult={}", attempt.hasResult());
// 是什么导致了这种异常?
if (attempt.hasException()) {
log.info("causeBy={}" , attempt.getExceptionCause().toString());
} else {
// 正常回报的结果
log.info("result={}" , attempt.getResult());
}
log.info("log listen over.");
}
}
复制代码
- 自定义Exception
一些例外需要重试,而其他例外则不需要。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception;
/**
* 当抛出此异常时,意味着您需要重试。
*/
public class NeedRetryException extends Exception {
public NeedRetryException(String message) {
super("NeedRetryException can retry."+message);
}
}
复制代码
- 实施具体的重试业务。Callable接口
使用call方法调用它自己的业务。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.math.BigDecimal;
/**
* 商品model
*/
@Data
@AllArgsConstructor
public class Product {
private Long id;
private String name;
private Integer count;
private BigDecimal price;
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 商品DAO
*/
@Repository
public class ProductRepository {
private static ConcurrentHashMap products=new ConcurrentHashMap();
private static AtomicLong ids=new AtomicLong(0);
public List findAll(){
return new ArrayList<>(products.values());
}
public Product findById(Long id){
return products.get(id);
}
public Product updatePrice(Long id, BigDecimal price){
Product p=products.get(id);
if (null==p){
return p;
}
p.setPrice(price);
return p;
}
public Product addProduct(Product product){
Long id=ids.addAndGet(1);
product.setId(id);
products.put(id,product);
return product;
}
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.model.Product;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.repository.ProductRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* 业务方法实施
*/
@Component
@Slf4j
public class ProductInformationHander implements Callable {
@Autowired
private ProductRepository pRepo;
private static Map prices = new HashMap<>();
static {
prices.put(1L, new BigDecimal(100));
prices.put(2L, new BigDecimal(200));
prices.put(3L, new BigDecimal(300));
prices.put(4L, new BigDecimal(400));
prices.put(8L, new BigDecimal(800));
prices.put(9L, new BigDecimal(900));
}
@Override
public Boolean call() throws Exception {
log.info("sync price begin,prices size={}", prices.size());
for (Long id : prices.keySet()) {
Product product = pRepo.findById(id);
if (null == product) {
throw new NeedRetryException("can not find product by id=" + id);
}
if (null == product.getCount() || product.getCount() < 1) {
throw new NeedRetryException("product count is less than 1, id=" + id);
}
Product updatedP = pRepo.updatePrice(id, prices.get(id));
if (null == updatedP) {
return false;
}
prices.remove(id);
}
log.info("sync price over,prices size={}", prices.size());
return true;
}
}
复制代码
- 构造重试的Retryer
使用上面的实现作为参数,构造Retryer。
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service;
import com.github.rholder.retry.*;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.exception.NeedRetryException;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.listener.RetryLogListener;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.strategy.SpinBlockStrategy;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 构造重试的
*/
@Component
public class ProductRetryerBuilder {
public Retryer build() {
//定义重试机制
Retryer retryer = RetryerBuilder.newBuilder()
//retryIf 重试条件
//.retryIfException()
//.retryIfRuntimeException()
//.retryIfExceptionOfType(Exception.class)
//.retryIfException(Predicates.equalTo(new Exception()))
//.retryIfResult(Predicates.equalTo(false))
.retryIfExceptionOfType(NeedRetryException.class)
//等待策略:每个请求间隔1s
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
//停止策略 : 尝试请求3次
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
//时间限制 : 一个请求不能超过2s , 类似: TimeLimiter timeLimiter = new SimpleTimeLimiter();
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(2, TimeUnit.SECONDS))
//默认拦截策略:线程休眠。
//.withBlockStrategy(BlockStrategies.threadSleepStrategy())
//自定义阻塞策略:自旋锁定
.withBlockStrategy(new SpinBlockStrategy())
//自定义重试监听程序
.withRetryListener(new RetryLogListener())
.build();
return retryer;
}
}
复制代码
- 与定时任务相结合Retryer
定时任务只需要运行一次,但实际上实现了所有重试策略。这大大简化了定时器的设计。
首先使用 @EnableScheduling
声明项支持计时器批注。
@SpringBootApplication
@EnableScheduling
public class DemoRetryerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoRetryerApplication.class, args);
}
}
复制代码
package net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.task;
import com.github.rholder.retry.Retryer;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductInformationHander;
import net.ijiangtao.tech.framework.spring.ispringboot.demo.retryer.guava.service.ProductRetryerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 商品信息定时器
*/
@Component
public class ProductScheduledTasks {
@Autowired
private ProductRetryerBuilder builder;
@Autowired
private ProductInformationHander hander;
/**
* 同步大宗商品价格计时任务
* @Scheduled(fixedDelay = 30000) :在上次执行完成的时间点之后30秒再执行
*/
@Scheduled(fixedDelay = 30*1000)
public void syncPrice() throws Exception{
Retryer retryer=builder.build();
retryer.call(hander);
}
}
复制代码
执行结果:由于没有商品,重试后抛出异常。
2019-二月-28 14:37:52.667 INFO [scheduling-1] n.i.t.f.s.i.d.r.g.l.RetryLogListener - log listen over.
2019-二月-28 14:37:52.672 ERROR [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler - Unexpected error occurred in scheduled task.
com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
at com.github.rholder.retry.Retryer.call(Retryer.java:174)
复制代码
您也可以添加一些商品数据来查看重试成功的效果。
完整的样例代码在中。 这里 。
使用中遇到的问题
Guava版本冲突
由于该项目依赖于guava版本太低,启动项目时出现以下异常
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor()Lcom/google/common/util/concurrent/ListeningExecutorService;
at org.apache.curator.framework.listen.ListenerContainer.addListener(ListenerContainer.java:41)
at com.bzn.curator.ZkOperator.getZkClient(ZkOperator.java:207)
at com.bzn.curator.ZkOperator.checkExists(ZkOperator.java:346)
at com.bzn.curator.watcher.AbstractWatcher.initListen(AbstractWatcher.java:87)
at com.bzn.web.listener.NebulaSystemInitListener.initZkWatcher(NebulaSystemInitListener.java:84)
at com.bzn.web.listener.NebulaSystemInitListener.contextInitialized(NebulaSystemInitListener.java:33)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4939)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5434)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
复制代码
因此,要排除较低版本的项目。guava依赖。
com.google.guava
guava
复制代码
同时,由于Guava在新版本中删除 sameThreadExecutor
方法,但在当前项目中。ZK此方法是必需的,因此需要手动设置相应的guava版本。
果然,在19.0版本中MoreExecutors此方法仍然存在,只是标记为已过期。
@Deprecated
@GwtIncompatible("TODO")
public static ListeningExecutorService sameThreadExecutor() {
return new DirectExecutorService();
}
复制代码
声明从属关系guava版本改为19.0即可。
com.google.guava
guava
19.0
复制代码
动态调整重试策略
在实际使用过程中,有时需要调整重试次数、等待时间等重试策略。因此,可以动态调整重试策略的配置参数。
例如,在秒杀、双十一购物节等期间增加等待时间和重试次数,以确保错峰请求。在正常情况下,可以适当减少等待时间和重试次数。
对于系统关键业务,如果多次重试成功,则可以通过。RetryListener监控和报警。
关于『动态调整重试策略 下面提供了一个参考实现:
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategy;
/**
* 自定义等待策略:根据重试次数、第一次请求间隔动态调整等待时间。1s,第二个间隔10s,第三个及以后的20s。
*
*
* 在创建Retryer当通过时withWaitStrategy把等待政策付诸实施就行了。
*
* RetryerBuilder.newBuilder()
* .withWaitStrategy(new AlipayWaitStrategy())
*
* 类似的效果也可以定制。 BlockStrategy 要做到这一点,你可以把它写下来并尝试一下。
*
*/
public class AlipayWaitStrategy implements WaitStrategy {
@Override
public long computeSleepTime(Attempt failedAttempt) {
long number = failedAttempt.getAttemptNumber();
if (number==1){
return 1*1000;
}
if (number==2){
return 10*1000;
}
return 20*1000;
}
}
作者:西照
链接:https://juejin.im/post/5c77e3bcf265da2d914da410
来源:掘金
版权归作者所有。请联系作者以获得商业转载的授权,并注明非商业转载的来源。
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除