Active Object 设计模式实战二

x33g5p2x  于2022-05-06 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(193)

一 点睛

本篇实现订单服务接受异步消息。

二 实战

1 订单接口

package concurrent.activeobject2;

import concurrent.future.Future;

/**
* @className: OrderService
* @description: 订单接口
* @date: 2022/5/4
* @author: cakin
*/
public interface OrderService {
    /**
     * 功能描述:根据订单编号查询订单明细,有入参也有返回值,但是返回值类型必须是 Future
     *
     * @param orderId 订单编号
     * @return Future<String>
     * @author cakin
     * @date 2022/5/4
     * @description: 因为方法的执行是在其他线程中进行的,势必不会立即得到正确的最终结果,通过 Future 可以立即得到返回
     */
    Future<String> findOrderDetails(long orderId);

    /**
     * 功能描述:提交订单,没有返回值
     *
     * @param account 用户信息
     * @param orderId 订单编号
     * @author cakin
     * @date 2022/5/4
     * @description: 它是一种无返回值的方法
     */
    void order(String account, long orderId);
}

2 订单接口具体实现

package concurrent.activeobject2;

import concurrent.future.Future;
import concurrent.future.FutureService;

import java.util.concurrent.TimeUnit;

/**
* @className: OrderServiceImpl
* @description: 在执行线程中将被使用的类
* @date: 2022/5/4
* @author: cakin
*/
public class OrderServiceImpl implements OrderService {
    @Override
    public Future<String> findOrderDetails(long orderId) {
        return FutureService.<Long, String>newService().submit(input -> {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println("Process the orderId->" + orderId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "The order Details Information";
        }, orderId, null);
    }

    @Override
    public void order(String account, long orderId) {
        try {
            TimeUnit.SECONDS.sleep(10);
            System.out.println("Process the order for account " + account + ",orderId " + orderId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3 订单接口代理

package concurrent.activeobject2;

import concurrent.future.Future;

import java.util.HashMap;
import java.util.Map;

public class OrderServiceProxy implements OrderService {
    private final OrderService orderService;
    private final ActiveMessageQueue activeMessageQueue;

    public OrderServiceProxy(OrderService orderService, ActiveMessageQueue activeMessageQueue) {
        this.orderService = orderService;
        this.activeMessageQueue = activeMessageQueue;
    }

    @Override
    public Future<String> findOrderDetails(long orderId) {
        // 定义一个 ActiveFuture,并且可支持立即返回
        final ActiveFuture<String> activeFuture = new ActiveFuture<>();
        // 收集方法入参以及返回的 ActiveFuture 封装成 MethodMessage
        Map<String, Object> params = new HashMap<>();
        params.put("orderId", orderId);
        params.put("activeFuture", activeFuture);
        MethodMessage message = new FindOrderDetailsMessage(params, orderService);
        // 将 MethodMessage 保存到 activeMessageQueue
        activeMessageQueue.offer(message);
        return activeFuture;
    }

    @Override
    public void order(String account, long orderId) {
        // 收集方法参数,并且封装成 MethodMessage,然后 offer 到队列中
        Map<String, Object> params = new HashMap<>();
        params.put("account", account);
        params.put("orderId", orderId);
        MethodMessage message = new OrderMessage(params, orderService);
        activeMessageQueue.offer(message);
    }
}

4 订单代理工厂类

package concurrent.activeobject2;

/**
* @className: OrderServiceFactory
* @description: 为了使 Proxy 的构造透明化,设计此工厂类
* @date: 2022/5/5
* @author: cakin
*/
public final class OrderServiceFactory {
    private final static ActiveMessageQueue activeMessageQueue = new ActiveMessageQueue();

    // 不允许外部通过 new 方式构造
    private OrderServiceFactory() {
    }

    // 返回 OrderServiceProxy
    public static OrderService toActiveObject(OrderService orderService) {
        return new OrderServiceProxy(orderService, activeMessageQueue);
    }
}

5 方法消息抽象类

package concurrent.activeobject2;

import java.util.Map;

public abstract class MethodMessage {
    // 用于收集方法参数,如果又返回 Future 类型,则一并收集
    protected final Map<String, Object> params;
    // 具体的接口实现
    protected final OrderService orderService;

    public MethodMessage(Map<String, Object> params, OrderService orderService) {
        this.params = params;
        this.orderService = orderService;
    }

    // 抽象方法,扮演 work thread 的说明书
    public abstract void execute();
}

6 处理订单消息

package concurrent.activeobject2;

import java.util.Map;

public class OrderMessage extends MethodMessage {
    public OrderMessage(Map<String, Object> params, OrderService orderService) {
        super(params, orderService);
    }

    @Override
    public void execute() {
        // 获取参数
        String account = (String) params.get("account");
        long orderId = (long) params.get("orderId");
        // 执行真正的 order 方法
        orderService.order(account, orderId);
    }
}

7 查找订单详细

package concurrent.activeobject2;

import concurrent.future.Future;

import java.util.Map;

public class FindOrderDetailsMessage extends MethodMessage {
    public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
        super(params, orderService);
    }

    @Override
    public void execute() {
        // 执行 orderService 的 findOrderDetails 方法
        Future<String> realFuture = orderService.findOrderDetails((Long) params.get("orderId"));
        // 此方法会导致阻塞直到 findOrderDetails 方法完全执行结束
        ActiveFuture<String> activeFuture = (ActiveFuture<String>) params.get("activeFuture");
        // 当 findOrderDetails 执行结束时,将结果通过 finish 的方法传递给 activeFuture

        try {
            String result = realFuture.get();
            activeFuture.finish(result);
        } catch (InterruptedException e) {
            activeFuture.finish(null);
        }
    }
}

8 传递最终结果

package concurrent.activeobject2;

import concurrent.future.FutureTask;

/**
* @className: ActiveFuture
* @description: 执行线程完成任务之后传递最终结果
* @date: 2022/5/4
* @author: cakin
*/
public class ActiveFuture<T> extends FutureTask<T> {
    @Override
    public void finish(T result) {
        super.finish(result);
    }
}

9 消息队列

package concurrent.activeobject2;

import java.util.LinkedList;

public class ActiveMessageQueue {
    // 用于存放提交的 MethodMessage 消息
    private final LinkedList<MethodMessage> messages = new LinkedList<>();

    public ActiveMessageQueue() {
        // 启动 Worker 线程
        new ActiveDaemonThread(this).start();
    }

    public void offer(MethodMessage methodMessage) {
        synchronized (this) {
            messages.addLast(methodMessage);
            // 因为只有一个线程负责 take 数据,因此没有必要使用 notifyAll 方法
            this.notify();
        }
    }

    protected MethodMessage take() {
        synchronized (this) {
            // 当 MethodMessage 队列中没有 Message 的时候,执行线程进入阻塞
            while (messages.isEmpty()) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取其中一个 MethodMessage 并且从队列中移除
            return messages.removeFirst();
        }
    }
}

10 守护消息,处理订单消息

package concurrent.activeobject2;

/**
* @className: ActiveDaemonThread
* @description: 守护线程,主要是从 queue 中获取 Message,然后执行 execute 方法
* @date: 2022/5/4
* @author: cakin
*/
public class ActiveDaemonThread extends Thread {
    private final ActiveMessageQueue queue;

    public ActiveDaemonThread(ActiveMessageQueue queue) {
        super("ActiveDaemonThread");
        this.queue = queue;
        // ActiveDaemonThread 为守护线程
        setDaemon(true);
    }

    @Override
    public void run() {
        while (true) {
            // 从 MethodMessage 队列中获取一个 MethodMessage,然后执行 execute 方法
            MethodMessage methodMessage = this.queue.take();
            methodMessage.execute();
        }
    }
}

11 测试

package concurrent.activeobject2;

import concurrent.future.Future;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        // 需要传递 OrderService 的具体实现
        OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
        orderService.order("hello", 434543);
        Future<String> orderDetails = orderService.findOrderDetails(434543);
        String result = orderDetails.get();
        System.out.println(result);
    }
}

三 测试结果

Process the order for account hello,orderId 434543

Process the orderId->434543

The order Details Information

相关文章