【Flink源码分析】3. Flink1.19源码分析-精通动态代理

news/2025/2/8 17:53:26 标签: flink, 大数据

3.1 Java 动态代理

动态代理是一种设计模式,它允许在运行时创建代理对象,并将方法调用重定向到不同的实际对象。它使我们能够在不修改现有代码的情况下增加或改变某个对象的行为。

3.1.1 InvocationHandler接口:

这个接口定义了一个invoke方法,该方法在代理实例上的方法被调用时调用。

public interface InvocationHandler {
    public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}

3.1.2 Proxy类:

这个类提供了创建动态代理类和实例的静态方法。

   public static Object newProxyInstance(ClassLoader loader,
                                          Class<?>[] interfaces,
                                          InvocationHandler h)
        throws IllegalArgumentException

3.1.2.1 ClassLoader loader:

这个类加载器用于定义代理类的类加载器。通常,我们可以使用被代理对象的类加载器,即targetObject.getClass().getClassLoader()。
代理类必须和它所表示的接口在同一个类加载器的命名空间中,以确保代理类能够访问被代理的接口。
总结:类加载器,targetObject.getClass().getClassLoader()。

3.1.2.2 Class<?>[] interfaces:

这是一个接口数组,表示代理类需要实现的接口。
代理类将实现这些接口,并可以在运行时动态地调用这些接口的方法。
总结:动态代理类会调用实现该接口的方法。

3.1.2.3 InvocationHandler h:

这是一个调用处理程序,它负责实现接口中的方法调用。
当代理类的方法被调用时,实际上会调用这个 InvocationHandler 的 invoke 方法。 invoke 方法会接受被调用的方法、方法的参数以及代理实例本身作为参数。
总结:动态代理类调用方法的时候,会流转到 invoke 方法中,在 invoke 方法中可以完成我们要做的操作,比如打印日志。

3.2 Java 动态代理Demo

3.2.1 ResourceManagerGateway 接口

package com.annn.fink.proxy;

/**
 * 模拟Flink为代理目标对象定义一个接口
 */
public interface ResourceManagerGateway {
    /**
     * 定义一个注册方法
     */
    void registerTaskExecutor();
}

3.2.2 ResourceManager 类

package com.annn.fink.proxy;

/**
 * 创建实现该接口的目标对象
 */
public class ResourceManager implements ResourceManagerGateway{

    /**
     * 实现方法中打印一句话
     */
    @Override
    public void registerTaskExecutor() {
        System.out.println("注册 registerTaskExecutor ");
    }
}

3.2.3 PekkoInvocationHandler 类

package com.annn.fink.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * 创建实现 InvocationHandler 接口的类。
 */
public class PekkoInvocationHandler implements InvocationHandler {

    private Object target; //被代理的对象

    public PekkoInvocationHandler(Object target) {
        this.target = target;
    }

    /**
     * 在 invoke 中调用内部方法 invokeRpc
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return invokeRpc(method,args);
    }

    /**
     * 在 invokeRpc 中实现自己的逻辑,比如向 ResourceManager 发送 Pekko 的请求
     * flink内部实现的时候会将调用的代理类和方法封装成 RpcInvocation 调用 ask 方法发送给 PekkoRpcActor 接收到消息
     * 内部调用 HandlerMessage 处理不同类型的请求然后通过 java 反射调用最终调用传递给 ResourceManager.registerTaskExecutor 方法
     *
     * @param method
     * @param args
     * @return
     * @throws Exception
     */
    private Object invokeRpc(Method method, Object[] args) throws Exception {
        System.out.println("调用pekko ask方法向 ResourceManager 发送调用的方法");
        Object invoke = method.invoke(target, args);
        System.out.println("结束调用");
        return invoke;
    }
}

3.2.4 Demo

package com.annn.fink.proxy;

import java.lang.reflect.Proxy;

public class Demo {
    public static void main(String[] args) {
        // 创建目标对象
        ResourceManager myObject = new ResourceManager();
        // 创建 InvocationHandler
        PekkoInvocationHandler handler = new PekkoInvocationHandler(myObject);
        // 调用 Proxy.newProxyInstance 静态方法创建动态代理类
        ResourceManagerGateway proxy = (ResourceManagerGateway) Proxy
                .newProxyInstance(myObject.getClass().getClassLoader(),
                        new Class<?>[] { ResourceManagerGateway.class },
                        handler);
        // 调用 registerTaskExecutor 注册方法最终会调用 PekkoInvocationHandler 的 invoke 方法
        proxy.registerTaskExecutor();
    }
}

3.2.5 运行结果

调用pekko ask方法向 ResourceManager 发送调用的方法
注册 registerTaskExecutor 
结束调用

3.2.6 总结动态代理

  1. 定义一个接口
  2. 定义接口实现类
  3. 定义 InvocationHandler
  4. 定义服务,在服务中调用 Proxy.newProxyInstance() 方法创建动态代理

3.3 Flink RPC中的动态代理详解

3.3.1 ResourceManagerGateway 接口

public interface ResourceManagerGateway
        extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
    CompletableFuture<RegistrationResponse> registerTaskExecutor(
            TaskExecutorRegistration taskExecutorRegistration, @RpcTimeout Time timeout);
}

3.3.2 ResourceManager 实现类

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
        extends FencedRpcEndpoint<ResourceManagerId>
        implements DelegationTokenManager.Listener, ResourceManagerGateway {
    @Override
    public CompletableFuture<RegistrationResponse> registerTaskExecutor(
            final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {
            
	}
}

3.3.3 PekkoInvocationHandler 类

class PekkoInvocationHandler implements InvocationHandler, PekkoBasedEndpoint, RpcServer {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

        if (declaringClass.equals(PekkoBasedEndpoint.class)
                || declaringClass.equals(Object.class)
                || declaringClass.equals(RpcGateway.class)
                || declaringClass.equals(StartStoppable.class)
                || declaringClass.equals(MainThreadExecutable.class)
                || declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) {
            throw new UnsupportedOperationException(
                    "InvocationHandler does not support the call FencedRpcGateway#"
                            + method.getName()
                            + ". This indicates that you retrieved a FencedRpcGateway without specifying a "
                            + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
                            + "retrieve a properly FencedRpcGateway.");
        } else {
            result = invokeRpc(method, args);
        }

        return result;
    }
    
	private Object invokeRpc(Method method, Object[] args) throws Exception {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        Duration futureTimeout =
                RpcGatewayUtils.extractRpcTimeout(parameterAnnotations, args, timeout);

        final RpcInvocation rpcInvocation =
                createRpcInvocationMessage(
                        method.getDeclaringClass().getSimpleName(),
                        methodName,
                        isLocalRpcInvocation,
                        parameterTypes,
                        args);

        Class<?> returnType = method.getReturnType();

        final Object result;

        if (Objects.equals(returnType, Void.TYPE)) {
            tell(rpcInvocation);

            result = null;
        } else {
            // Capture the call stack. It is significantly faster to do that via an exception than
            // via Thread.getStackTrace(), because exceptions lazily initialize the stack trace,
            // initially only
            // capture a lightweight native pointer, and convert that into the stack trace lazily
            // when needed.
            final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;

            // execute an asynchronous call
            final CompletableFuture<?> resultFuture =
                    ask(rpcInvocation, futureTimeout)
                            .thenApply(
                                    resultValue ->
                                            deserializeValueIfNeeded(
                                                    resultValue, method, flinkClassLoader));

            final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            resultFuture.whenComplete(
                    (resultValue, failure) -> {
                        if (failure != null) {
                            completableFuture.completeExceptionally(
                                    resolveTimeoutException(
                                            ExceptionUtils.stripCompletionException(failure),
                                            callStackCapture,
                                            address,
                                            rpcInvocation));
                        } else {
                            completableFuture.complete(resultValue);
                        }
                    });

            if (Objects.equals(returnType, CompletableFuture.class)) {
                result = completableFuture;
            } else {
                try {
                    result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
                } catch (ExecutionException ee) {
                    throw new RpcException(
                            "Failure while obtaining synchronous RPC result.",
                            ExceptionUtils.stripExecutionException(ee));
                }
            }
        }

        return result;
    }
}

3.3.4 PekkoRpcService 类

public class PekkoRpcService implements RpcService {


    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");

        final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint);
        final ActorRef actorRef = actorRegistration.getActorRef();
        final CompletableFuture<Void> actorTerminationFuture =
                actorRegistration.getTerminationFuture();

        LOG.info(
                "Starting RPC endpoint for {} at {} .",
                rpcEndpoint.getClass().getName(),
                actorRef.path());

        final String address = PekkoUtils.getRpcURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }

        Set<Class<?>> implementedRpcGateways =
                new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(PekkoBasedEndpoint.class);

        final InvocationHandler invocationHandler;

        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedPekkoInvocationHandler
            invocationHandler =
                    new FencedPekkoInvocationHandler<>(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                            captureAskCallstacks,
                            flinkClassLoader);
        } else {
            invocationHandler =
                    new PekkoInvocationHandler(
                            address,
                            hostname,
                            actorRef,
                            configuration.getTimeout(),
                            configuration.getMaximumFramesize(),
                            configuration.isForceRpcInvocationSerialization(),
                            actorTerminationFuture,
                            captureAskCallstacks,
                            flinkClassLoader);
        }

        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

        @SuppressWarnings("unchecked")
        RpcServer server =
                (RpcServer)
                        Proxy.newProxyInstance(
                                classLoader,
                                implementedRpcGateways.toArray(
                                        new Class<?>[implementedRpcGateways.size()]),
                                invocationHandler);

        return server;
    }


}

3.3.5 PekkoRpcActor 类

PekkoInvocationHandler 中的远端调用invokeRpc 方法并没有直接调用 invoke 方法,而是将所需参数封装为 RpcInvocation 通过 tell 或 ask 发送到 PekkoRpcActor ,在该类中调用 invoke 方法(tell和ask是Pekko中的通信方式,后面会提到)。

class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {


    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
                .match(ControlMessages.class, this::handleControlMessage)
                .matchAny(this::handleMessage)
                .build();
    }

    private void handleMessage(final Object message) {
        if (state.isRunning()) {
            mainThreadValidator.enterMainThread();

            try {
                handleRpcMessage(message);
            } finally {
                mainThreadValidator.exitMainThread();
            }
        } else {
            log.info(
                    "The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
                    rpcEndpoint.getClass().getName(),
                    message);

            sendErrorIfSender(
                    new EndpointNotStartedException(
                            String.format(
                                    "Discard message %s, because the rpc endpoint %s has not been started yet.",
                                    message, getSelf().path())));
        }
    }

    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
            handleRunAsync((RunAsync) message);
        } else if (message instanceof CallAsync) {
            handleCallAsync((CallAsync) message);
        } else if (message instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) message);
        } else {
            log.warn(
                    "Received message of unknown type {} with value {}. Dropping this message!",
                    message.getClass().getName(),
                    message);

            sendErrorIfSender(
                    new UnknownMessageException(
                            "Received unknown message "
                                    + message
                                    + " of type "
                                    + message.getClass().getSimpleName()
                                    + '.'));
        }
    }
   
	private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method rpcMethod = null;

        try {
            String methodName = rpcInvocation.getMethodName();
            Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

            rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        } catch (final NoSuchMethodException e) {
            log.error("Could not find rpc method for rpc invocation.", e);

            RpcConnectionException rpcException =
                    new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
            getSender().tell(new Status.Failure(rpcException), getSelf());
        }

        if (rpcMethod != null) {
            try {
                // this supports declaration of anonymous classes
                rpcMethod.setAccessible(true);

                final Method capturedRpcMethod = rpcMethod;
                if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                    // No return value to send back
                    runWithContextClassLoader(
                            () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
                            flinkClassLoader);
                } else {
                    final Object result;
                    try {
                        result =
                                runWithContextClassLoader(
                                        () ->
                                                capturedRpcMethod.invoke(
                                                        rpcEndpoint, rpcInvocation.getArgs()),
                                        flinkClassLoader);
                    } catch (InvocationTargetException e) {
                        log.debug(
                                "Reporting back error thrown in remote procedure {}", rpcMethod, e);

                        // tell the sender about the failure
                        getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
                        return;
                    }

                    final String methodName = rpcMethod.getName();
                    final boolean isLocalRpcInvocation =
                            rpcMethod.getAnnotation(Local.class) != null;

                    if (result instanceof CompletableFuture) {
                        final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
                        sendAsyncResponse(responseFuture, methodName, isLocalRpcInvocation);
                    } else {
                        sendSyncResponse(result, methodName, isLocalRpcInvocation);
                    }
                }
            } catch (Throwable e) {
                log.error("Error while executing remote procedure call {}.", rpcMethod, e);
                // tell the sender about the failure
                getSender().tell(new Status.Failure(e), getSelf());
            }
        }
    }
}

3.4 Flink RPC 底层使用动态代理做什么

  1. 动态代理用到的所有类都是 RpcGateway 的实现,也就是说创建的的 RpcGateway 接口对应实现类的动态代理,比如 ResourceManagerGateway 类;
  2. PekkoInvocationHandler 类实现 invoke 方法,是将代理类的方法,参数类型,参数封装为 RpcInvocation 对象,之后通过 Pello.tell 、Pekko.ask 方法将 RpcInvocation 作为消息发送到代理接口所在的进程中;
  3. 代理接口所在的进程中,接收到消息以后会调用对应的方法。

http://www.niftyadmin.cn/n/5845170.html

相关文章

【LeetCode-27】移除元素

目录 一、算法&#xff1a;移除元素问题 二、题目描述 三、相关知识 数组 &#xff08;1&#xff09;数组的特点 &#xff08;2&#xff09;数组的操作 时间复杂度与空间复杂度 &#xff08;1&#xff09;时间复杂度 &#xff08;2&#xff09;空间复杂度 &#xff0…

【论文阅读】Comment on the Security of “VOSA“

Comment on the Security of Verifiable and Oblivious Secure Aggregation for Privacy-Preserving Federated Learning -- 关于隐私保护联邦中可验证与遗忘的安全聚合的安全性 论文来源摘要Introduction回顾 VOSA 方案对VOSA不可伪造性的攻击对于类型 I 的攻击对于类型 II 的…

doris:MySQL 兼容性

Doris 高度兼容 MySQL 语法&#xff0c;支持标准 SQL。但是 Doris 与 MySQL 还是有很多不同的地方&#xff0c;下面给出了它们的差异点介绍。 数据类型​ 数字类型​ 类型MySQLDorisBoolean- 支持 - 范围&#xff1a;0 代表 false&#xff0c;1 代表 true- 支持 - 关键字&am…

计算机网络-SSH基本原理

最近年底都在忙&#xff0c;然后这两天好点抽空更新一下。前面基本把常见的VPN都学习了一遍&#xff0c;后面的内容应该又继续深入一点。 一、SSH简介 SSH&#xff08;Secure Shell&#xff0c;安全外壳协议&#xff09;是一种用于在不安全网络上进行安全远程登录和实现其他安…

电脑右下角小喇叭没反应怎么回事,快速解决方案

当电脑右下角的小喇叭&#xff08;音量图标&#xff09;没有反应时&#xff0c;可以尝试以下快速解决方案&#xff1a; 一、基础检查与操作 检查键盘音量键&#xff1a; 按下键盘上的音量增加或减少键&#xff0c;或尝试Fn音量键&#xff08;部分笔记本需组合键&#xff09;&a…

Docker build时apt update失败

配置好dockerfile后&#xff0c;编译镜像docker build -t my-debian .报错&#xff1a; E: Release file for http://mirrors.org/debian/dists/bookworm-updates/InRelease is not valid yet (invalid for another 3h 15min 21s). Updates for this repository will not be a…

elementui:el-table支持搜索、切换分页多选功能,以及数据回显

1、el-table相关代码&#xff0c;需注意:row-key"(row) > { return row.id }" 以及 :reserve-selection"true" <div class"boxList"><div class"search-form"><!-- 搜索表单 --><el-form :inline"true&q…

突破YOLOv11训练:用幽默的方式玩转自定义数据集与物体检测

前言 你是否曾在训练深度学习模型时,望着屏幕上那一堆堆的错误信息,差点觉得自己的大脑要冒烟?如果你也曾体验过这种“技术折磨”,恭喜,你找对地方了!今天,我们将带你踏入YOLOv11的神奇世界,用幽默的方式教你如何训练物体检测模型,处理自定义数据集。放心,这不仅仅是…