在我们实际开发过程中经常会处理各种大批量数据入库,这个时候我们就会到队列,将数据先写入队列中,然后开启多个消费线程慢慢消费入库。从队列中消费数据有两种方式:
批量消费
我们分别来实现这两种消费方
存数据到队列
存数据相对比较简单,这里我推荐大家使用BlockingQueue,该队列为阻塞队列!
//创建队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000000,true);
/**
* 向队列中存放数据
* @param message
*/
public void saveQueueData(String message){
//存放数据
blockingQueue.offer("test");
}
ArrayBlockingQueue参数说明
ArrayBlockingQueue一共有三个重载方法
int capacity
该参数表示当前定义队列的大小,也就是能存放多少条数据
boolean fair
该参数表示访问该队列的策略是否公平。true:按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;false:访问顺序是不确定的
Collection<? extends E> c
该参数是一个集合,表示将一个集合的数据存入该阻塞队列,相当于给该队列一个初始数据
/**
* 从队列中单条消费数据
*/
public void consumerBySingle() {
while (true) {
try {
String take = blockingQueue.take();
log.info("消费到的数据是:{}", take);
} catch (Exception e) {
log.error("缓存队列单条消费异常:{}", e.getMessage());
}
}
}
为什么用while(true),这样不是一个死循环么,那不是一直都在执行?其实并不是这样的,这就是为什么我推荐大家用BlockingQueue的原因,他是一个阻塞队列,take()这个方法是阻塞的,一段队列中没有数据,那么就不会继续往下执行,而是阻塞到这个地方,等对队列中有数据的时候才会继续执行
/**
* 从队列中批量消费数据
*/
public void consumerByBatch() {
while (true) {
try {
List<String> list = new ArrayList<>();
Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);
log.info("批量消费到的数据是:{}", list);
} catch (Exception e) {
log.error("缓存队列批量消费异常:{}", e.getMessage());
}
}
}
这里面用到一个很重要的东西Guava的Queues
需要导入如Guava的包,maven项目只需要在pom文件中添加:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);这个方法一共有5个参数
第一个:传入你需要批量消费的队列
第二个:传入一个用来接收批量消费到的数据
第三个:批量消费数据的大小,这里我们给100,即意味着每次消费100条数据
第四个:批量消费的等待的最大间隔,什么意思呢?比如说,我先在队列中只有10条数据,它不到100条,那按道理就不会消费,但是这样显然不合理,所以需要指定当超多多长时间,即使当前队列中数据低于我们设定的阈值也会消费
第五个,这个就很好理解,就是指定第四个参数的单位,是秒是分钟还是小时等等
public static void main(String[] args) {
//创建队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000000,true);
for (int i = 0 ;i < 789; i++){
//存放数据
blockingQueue.offer(i + "test");
}
while (true) {
try {
List<String> list = new ArrayList<>();
Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);
log.info("批量消费到的数据量是:{}, 数据是: {}", list.size(), list);
} catch (Exception e) {
log.error("缓存队列批量消费异常:{}", e.getMessage());
}
}
}
测试代码中可以看出:每次批量消费100条数据,如果队列当前数据不够100条,那么等待1分钟然后将数据全部消费
如果此篇文章有帮助到您, 希望打大佬们能关注、点赞、收藏、评论支持一波,非常感谢大家!
如果有不对的地方请指正!!!
「艾尔登法环」梅琳娜手办开订 立体手办▪
万代「艾尔登法环」白狼战鬼手办开订 立体手办▪
「夏目友人帐」猫咪老师粘土人开订 立体手办▪
「五等分的新娘∬」中野三玖·白无垢版手办开订 立体手办▪
「海贼王」乌索普Q版手办开订 立体手办▪
良笑社「初音未来」新手办开订 立体手办▪
「黑岩射手DAWN FALL」死亡主宰手办开订 立体手办▪
「盾之勇者成名录」菲洛手办登场 立体手办▪
「魔法少女小圆」美树沙耶香手办开订 立体手办▪
「咒术回战」七海建人粘土人登场 立体手办▪
「五等分的新娘」中野二乃白无垢手办开订 立体手办▪
「为美好的世界献上祝福!」芸芸粘土人开订 立体手办▪
「公主连结 与你重逢」六星可可萝手办开订 立体手办▪
「女神异闻录5」Joker雨宫莲手办开订 立体手办▪
「间谍过家家」约尔・福杰粘土人登场 立体手办▪
「街角魔族 2丁目」吉田优子手办开订 立体手办▪
「火影忍者 疾风传」旗木卡卡西·暗部版粘土人登场 立体手办▪
「佐佐木与宫野」宫野由美粘土人开订 立体手办▪
「盾之勇者成名录」第2季拉芙塔莉雅手办开订 立体手办▪
「咒术回战」两面宿傩Q版坐姿手办开订 立体手办▪
「DATE·A·BULLET」时崎狂三手办开订 立体手办▪
「狂赌之渊××」早乙女芽亚里粘土人开订 立体手办▪
「魔道祖师」魏无羨粘土人开订 立体手办▪
「新·奥特曼」奥特曼手办现已开订 立体手办▪