redis实现普通消息队列与延迟消息队列

x33g5p2x  于2021-10-10 转载在 Redis  
字(6.5k)|赞(0)|评价(0)|浏览(514)

1.redis实现普通消息队列

1.1 实现原理

利用lpush/rpush和lpop/blop(阻塞式)/rpop/brpop(阻塞式)来实现!

1.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>distribute-lock</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--json依赖-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>
        <!--jedis依赖-->
        <dependency>
               <groupId>redis.clients</groupId>
               <artifactId>jedis</artifactId>
               <version>3.2.0</version>
               <type>jar</type>
               <scope>compile</scope>
        </dependency>
    </dependencies>

</project>

1.3 JedisUtils工具类

package com.yl;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class JedisUtils {
    private static JedisPool jedisPool = null;

    public static Jedis getJedisObject() {
        if (jedisPool == null) {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            //最大空闲数
            config.setMaxIdle(400);
            //最大连接数
            config.setMaxTotal(2000);
            //连接最大等待时间,-1代表没有限制
            config.setMaxWaitMillis(300000);
            /** * 配置连接池的地址,端口号,超时时间,密码 */
            jedisPool = new JedisPool(config,"192.168.244.129",6379,30000,"root123");
        }
        try {
            //通过连接池获取jedis对象
            Jedis jedis = jedisPool.getResource();
            jedis.auth("root123");
            return jedis;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

1.4 消息类

package com.yl;

/** * 消息对象 */
public class Message {
    private String id;
    private Object object;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getObject() {
        return object;
    }

    public void setObject(Object object) {
        this.object = object;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id='" + id + '\'' +
                ", object=" + object +
                '}';
    }
}

1.4 消息队列类

package com.yl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;

import java.util.*;

/** * 消息队列 */
public class MessageQueue {
    private Jedis jedis;
    private String queue;

    public MessageQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /** * 消息入队 * @param list 要发送的消息集合 */
    public void queue(List<Message> list) {
        List<String> strs = new ArrayList<>();
        for (Message message : list) {
            //构造消息对象
            message.setId(UUID.randomUUID().toString());
            //序列化
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String str = objectMapper.writeValueAsString(message);
                strs.add(str);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
        String[] arrs = null;
        if (!strs.isEmpty()) {
            arrs = new String[strs.size()];
           for (int i = 0; i < strs.size(); i++) {
               arrs[i] = strs.get(i);
           }
        }
        if (arrs != null) {
            System.out.println("message start send..."+new Date());
            //发送消息
            jedis.lpush(queue,arrs);
        }
    }

    /** * 消息消费 */
    public void cosume() {
        //如果当前线程没有被打断,就一直读
        while (true) {
            //会一直读取
            //String s = jedis.lpop(queue);
            //阻塞式弹出元素,如果没有读取到数据,会睡眠指定时间,如果达到了睡眠时间后,list中任然没有数据进来,则会直接抛出!
            List<String> list = jedis.blpop(5, queue);
            System.out.println("receive message"+list.toString());
        }
    }

}

1.5 消息入队测试

package com.yl;

import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

public class Test1 {
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedisObject();
        MessageQueue queue = new MessageQueue(jedis,"myQueue");
        //生产者
        new Thread(){
            @Override
            public void run() {
                List<Message> list = new ArrayList<>();
                for (int i = 0; i < 10; i++) {
                    Message message = new Message();
                    message.setObject("yl===message"+i);
                    list.add(message);
                }
                queue.queue(list);
            }
        }.start();
    }
}

结果

1.5 消息出队测试

package com.yl;

import redis.clients.jedis.Jedis;

public class Test2 {
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedisObject();
        MessageQueue queue = new MessageQueue(jedis,"myQueue");
        //消费者
        new Thread(){
            @Override
            public void run() {
                queue.cosume();
            }
        }.start();
    }
}

结果

2.redis实现延迟消息队列

2.1 实现原理

通过zset,使用当前时间戳作为score可以实现延迟消息队列,读取数据的时候会延迟读取!

2.2 pom.xml

同上

2.2 JedisUtils工具类

同上

2.3 消息类

同上

2.4 延迟消息队列类

package com.yl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Set;
import java.util.UUID;

/** * 延迟消息队列 */
public class DelayMessageQueue {
    private Jedis jedis;
    private String queue;

    public DelayMessageQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /** * 消息入队 * @param object 要发送的消息 */
    public void queue(Object object) {
        //构造消息对象
        Message message = (Message)object;
        message.setId(UUID.randomUUID().toString());
        //序列化
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String str = objectMapper.writeValueAsString(message);
            System.out.println("message start send..."+new Date());
            //发送消息,score延迟五秒
            jedis.zadd(queue,System.currentTimeMillis() + 5000,str);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /** * 消息消费 */
    public void cosume() {
        //如果当前线程没有被打断,就一直读
        while (!Thread.interrupted()) {
            //读取score在0到当前时间戳之间的消息,每次读取一条数据出来
            Set<String> zset = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
            if (zset.isEmpty()) {
                try {
                    //如果消息是空的,则睡眠一段时间继续读取
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
                continue;
            }
            //如果读取到了消息,直接显示出来
            String next = zset.iterator().next();
            //从zset中移除消息,并且打印出来
            if (jedis.zrem(queue,next) > 0) {
                //抢到了,就在下面处理业务
                try {
                    ObjectMapper objectMapper = new ObjectMapper();
                    Message message = objectMapper.readValue(next, Message.class);
                    System.out.println("receive message:"+message + new Date());
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

2.5 消息入队测试

package com.yl;

import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

public class Test3 {
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedisObject();
        DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
        //生产者
        new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    Message message = new Message();
                    message.setObject("yl===message"+i);
                    queue.queue(message);
                };
            }
        }.start();
    }
}

结果

2.6 消息出队测试

package com.yl;

import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

public class Test4 {
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedisObject();
        DelayMessageQueue queue = new DelayMessageQueue(jedis,"myDelayQueue");
        //生产者
        new Thread(){
            @Override
            public void run() {
                queue.cosume();
            }
        }.start();
    }
}

结果,5秒后才读取到消息

相关文章