动态代理方式实现 Active Object

x33g5p2x  于2022-05-06 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(179)

一 代码

1 ActiveDaemonThread

package concurrent.activeobject3;

/**
* @className: ActiveDaemonThread
* @description: 守护线程,主要是从 queue 中获取 Message,然后执行 execute 方法
* @date: 2022/5/4
* @author: cakin
*/
public class ActiveDaemonThread extends Thread {
    private final ActiveMessageQueue queue;

    public ActiveDaemonThread(ActiveMessageQueue queue) {
        super("ActiveDaemonThread");
        this.queue = queue;
        // ActiveDaemonThread 为守护线程
        setDaemon(true);
    }

    @Override
    public void run() {
        while (true) {
            // 从 MethodMessage 队列中获取一个 MethodMessage,然后执行 execute 方法
            ActiveMessage activeMessage = this.queue.take();
            activeMessage.execute();
        }
    }
}

2 ActiveMessage

package concurrent.activeobject3;

import concurrent.activeobject2.ActiveFuture;
import concurrent.future.Future;

import java.lang.reflect.Method;

/**
* @className: ActiveMessage
* @description: 可以满足所有 Active Object 接口方法的要求,用于收集接口方法信息和具体的调用方法
* @date: 2022/5/5
* @author: 贝医
*/
class ActiveMessage {
    // 接口方法参数
    private final Object[] objects;
    // 接口方法
    private final Method method;
    // 有返回值的方法,会返回 ActiveFuture<?> 类型
    private final ActiveFuture<Object> future;
    // 具体的 Service 接口
    private final Object service;

    public ActiveMessage(Builder builder) {
        this.objects = builder.objects;
        this.method = builder.method;
        this.future = builder.future;
        this.service = builder.service;
    }

    // ActiveMessage 的方法通过反射的方式调用执行的具体实现
    public void execute() {
        try {
            Object result = method.invoke(service, objects);
            if (future != null) {
                // 如果是有返回值的接口方法,则需要通过 get 方法获得最终的结果
                Future<?> realFuture = (Future<?>) result;
                Object realResult = realFuture.get();
                // 将结果交给 ActiveFuture,接口方法的线程会得到返回
                future.finish(realResult);
            }
        } catch (Exception e) {
            // 如果发生异常,那么有返回值的方法将会显示地指定结果为 null,无返回值的接口方法则会忽略该异常
            if (future != null) {
                future.finish(null);
            }
        }
    }

    static class Builder {
        private Object[] objects;
        private Method method;
        // 有返回值的方法,会返回 ActiveFuture<?> 类型
        private ActiveFuture<Object> future;
        // 具体的 Service 接口
        private Object service;

        public Builder uesMethod(Method method) {
            this.method = method;
            return this;
        }

        public Builder returnFuture(ActiveFuture<Object> future) {
            this.future = future;
            return this;
        }

        public Builder withObjects(Object[] objects) {
            this.objects = objects;
            return this;
        }

        public Builder forService(Object service) {
            this.service = service;
            return this;
        }

        // 构建 ActiveMessage 实例
        public ActiveMessage build() {
            return new ActiveMessage(this);
        }
    }
}

3 ActiveMessageQueue

package concurrent.activeobject3;

import java.util.LinkedList;

public class ActiveMessageQueue {
    // 用于存放提交的 MethodMessage 消息
    private final LinkedList<ActiveMessage> messages = new LinkedList<>();

    public ActiveMessageQueue() {
        // 启动 Worker 线程
        new ActiveDaemonThread(this).start();
    }

    public void offer(ActiveMessage methodMessage) {
        synchronized (this) {
            messages.addLast(methodMessage);
            // 因为只有一个线程负责 take 数据,因此没有必要使用 notifyAll 方法
            this.notify();
        }
    }

    protected ActiveMessage take() {
        synchronized (this) {
            // 当 MethodMessage 队列中没有 Message 的时候,执行线程进入阻塞
            while (messages.isEmpty()) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取其中一个 MethodMessage 并且从队列中移除
            return messages.removeFirst();
        }
    }
}

4 ActiveMethod

package concurrent.activeobject3;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @className: ActiveMethod
* @description: 该注解用于将任意方法转换为 ActiveMethod
* @date: 2022/5/5
* @author: 贝医
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ActiveMethod {
}

5 ActiveServiceFactory

package concurrent.activeobject3;

import concurrent.activeobject2.ActiveFuture;
import concurrent.future.Future;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* @className: ActiveServiceFactory
* @description: 生成 Service 代理以及构建 ActiveMessage
* @date: 2022/5/5
* @author: 贝医
*/
public class ActiveServiceFactory {
    // 定义 ActiveMessageQueue,用于存放 ActiveMessage
    private final static ActiveMessageQueue queue = new ActiveMessageQueue();

    public static <T> T active(T instance) {
        Object proxy = Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), new ActiveInvocationHandler<>(instance));
        return (T) proxy;
    }

    // ActiveInvocationHandler 是 InvocationHandler 的子类,生成 Proxy 时需要使用
    private static class ActiveInvocationHandler<T> implements InvocationHandler {
        private final T instance;

        private ActiveInvocationHandler(T instance) {
            this.instance = instance;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 如果接口方法被 @ActiveMethod 注解,则会转换为 ActiveMessage
            if (method.isAnnotationPresent(ActiveMethod.class)) {
                // 检查该方法是否符合规范
                this.checkMethod(method);
                ActiveMessage.Builder builder = new ActiveMessage.Builder();
                builder.uesMethod(method).withObjects(args).forService(instance);
                Object result = null;
                if(this.isReturnFutureType(method)){
                    result = new ActiveFuture<>();
                    builder.returnFuture((ActiveFuture)result);
                }
                // 将 ActiveMessage 加入到队列中
                queue.offer(builder.build());
                return result;
            } else {
                // 如果是普通方法,则会正常执行
                return method.invoke(instance, args);
            }

        }

        // 检查有返回值的方法是否为 Future,否则会抛出异常
        private void checkMethod(Method method) throws IllegalActiveMethod {
            // 有返回值,必须是 ActiveFuture 类型的返回值
            if (!isReturnVoidType(method)&&isReturnFutureType(method)){
                throw new IllegalActiveMethod("the method ["+method.getName()+" return type must be void/Future");
            }
        }

        // 判断方法是否为 Future 返回类型
        private boolean isReturnFutureType(Method method) {
            return method.getReturnType().isAssignableFrom(Future.class);
        }
        // 判断方法是否无返回值
        private boolean isReturnVoidType(Method method) {
            return method.getReturnType().equals(Void.TYPE);
        }
    }
}

6 IllegalActiveMethod

package concurrent.activeobject3;

public class IllegalActiveMethod extends Exception {
    public IllegalActiveMethod(String message) {
        super(message);
    }
}

7 OrderServiceImpl

package concurrent.activeobject3;

import concurrent.activeobject2.OrderService;
import concurrent.future.Future;
import concurrent.future.FutureService;

import java.util.concurrent.TimeUnit;

/**
* @className: OrderServiceImpl
* @description: 在执行线程中将被使用的类
* @date: 2022/5/4
* @author: cakin
*/
public class OrderServiceImpl implements OrderService {
    @ActiveMethod
    @Override
    public Future<String> findOrderDetails(long orderId) {
        return FutureService.<Long, String>newService().submit(input -> {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println("Process the orderId->" + orderId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "The order Details Information";
        }, orderId, null);
    }

    @ActiveMethod
    @Override
    public void order(String account, long orderId) {
        try {
            TimeUnit.SECONDS.sleep(10);
            System.out.println("Process the order for account " + account + ",orderId " + orderId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

8 Test

package concurrent.activeobject3;

import concurrent.activeobject2.OrderService;
import concurrent.activeobject2.OrderServiceFactory;
import concurrent.future.Future;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        // 需要传递 OrderService 的具体实现
        OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
        orderService.order("hello", 434543);
        Future<String> orderDetails = orderService.findOrderDetails(434543);
        String result = orderDetails.get();
        System.out.println(result);
    }
}

二 测试

Process the order for account hello,orderId 434543

Process the orderId->434543

The order Details Information

相关文章