3.1 Java 动态代理
动态代理是一种设计模式,它允许在运行时创建代理对象,并将方法调用重定向到不同的实际对象。它使我们能够在不修改现有代码的情况下增加或改变某个对象的行为。
3.1.1 InvocationHandler接口:
这个接口定义了一个invoke方法,该方法在代理实例上的方法被调用时调用。
public interface InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}
3.1.2 Proxy类:
这个类提供了创建动态代理类和实例的静态方法。
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
3.1.2.1 ClassLoader loader:
这个类加载器用于定义代理类的类加载器。通常,我们可以使用被代理对象的类加载器,即targetObject.getClass().getClassLoader()。
代理类必须和它所表示的接口在同一个类加载器的命名空间中,以确保代理类能够访问被代理的接口。
总结:类加载器,targetObject.getClass().getClassLoader()。
3.1.2.2 Class<?>[] interfaces:
这是一个接口数组,表示代理类需要实现的接口。
代理类将实现这些接口,并可以在运行时动态地调用这些接口的方法。
总结:动态代理类会调用实现该接口的方法。
3.1.2.3 InvocationHandler h:
这是一个调用处理程序,它负责实现接口中的方法调用。
当代理类的方法被调用时,实际上会调用这个 InvocationHandler 的 invoke 方法。 invoke 方法会接受被调用的方法、方法的参数以及代理实例本身作为参数。
总结:动态代理类调用方法的时候,会流转到 invoke 方法中,在 invoke 方法中可以完成我们要做的操作,比如打印日志。
3.2 Java 动态代理Demo
3.2.1 ResourceManagerGateway 接口
package com.annn.fink.proxy;
/**
* 模拟Flink为代理目标对象定义一个接口
*/
public interface ResourceManagerGateway {
/**
* 定义一个注册方法
*/
void registerTaskExecutor();
}
3.2.2 ResourceManager 类
package com.annn.fink.proxy;
/**
* 创建实现该接口的目标对象
*/
public class ResourceManager implements ResourceManagerGateway{
/**
* 实现方法中打印一句话
*/
@Override
public void registerTaskExecutor() {
System.out.println("注册 registerTaskExecutor ");
}
}
3.2.3 PekkoInvocationHandler 类
package com.annn.fink.proxy;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
/**
* 创建实现 InvocationHandler 接口的类。
*/
public class PekkoInvocationHandler implements InvocationHandler {
private Object target; //被代理的对象
public PekkoInvocationHandler(Object target) {
this.target = target;
}
/**
* 在 invoke 中调用内部方法 invokeRpc
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return invokeRpc(method,args);
}
/**
* 在 invokeRpc 中实现自己的逻辑,比如向 ResourceManager 发送 Pekko 的请求
* flink内部实现的时候会将调用的代理类和方法封装成 RpcInvocation 调用 ask 方法发送给 PekkoRpcActor 接收到消息
* 内部调用 HandlerMessage 处理不同类型的请求然后通过 java 反射调用最终调用传递给 ResourceManager.registerTaskExecutor 方法
*
* @param method
* @param args
* @return
* @throws Exception
*/
private Object invokeRpc(Method method, Object[] args) throws Exception {
System.out.println("调用pekko ask方法向 ResourceManager 发送调用的方法");
Object invoke = method.invoke(target, args);
System.out.println("结束调用");
return invoke;
}
}
3.2.4 Demo
package com.annn.fink.proxy;
import java.lang.reflect.Proxy;
public class Demo {
public static void main(String[] args) {
// 创建目标对象
ResourceManager myObject = new ResourceManager();
// 创建 InvocationHandler
PekkoInvocationHandler handler = new PekkoInvocationHandler(myObject);
// 调用 Proxy.newProxyInstance 静态方法创建动态代理类
ResourceManagerGateway proxy = (ResourceManagerGateway) Proxy
.newProxyInstance(myObject.getClass().getClassLoader(),
new Class<?>[] { ResourceManagerGateway.class },
handler);
// 调用 registerTaskExecutor 注册方法最终会调用 PekkoInvocationHandler 的 invoke 方法
proxy.registerTaskExecutor();
}
}
3.2.5 运行结果
调用pekko ask方法向 ResourceManager 发送调用的方法
注册 registerTaskExecutor
结束调用
3.2.6 总结动态代理
- 定义一个接口
- 定义接口实现类
- 定义 InvocationHandler
- 定义服务,在服务中调用 Proxy.newProxyInstance() 方法创建动态代理
3.3 Flink RPC中的动态代理详解
3.3.1 ResourceManagerGateway 接口
public interface ResourceManagerGateway
extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
CompletableFuture<RegistrationResponse> registerTaskExecutor(
TaskExecutorRegistration taskExecutorRegistration, @RpcTimeout Time timeout);
}
3.3.2 ResourceManager 实现类
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements DelegationTokenManager.Listener, ResourceManagerGateway {
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final TaskExecutorRegistration taskExecutorRegistration, final Time timeout) {
}
}
3.3.3 PekkoInvocationHandler 类
class PekkoInvocationHandler implements InvocationHandler, PekkoBasedEndpoint, RpcServer {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(PekkoBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException(
"InvocationHandler does not support the call FencedRpcGateway#"
+ method.getName()
+ ". This indicates that you retrieved a FencedRpcGateway without specifying a "
+ "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
+ "retrieve a properly FencedRpcGateway.");
} else {
result = invokeRpc(method, args);
}
return result;
}
private Object invokeRpc(Method method, Object[] args) throws Exception {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
Duration futureTimeout =
RpcGatewayUtils.extractRpcTimeout(parameterAnnotations, args, timeout);
final RpcInvocation rpcInvocation =
createRpcInvocationMessage(
method.getDeclaringClass().getSimpleName(),
methodName,
isLocalRpcInvocation,
parameterTypes,
args);
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
tell(rpcInvocation);
result = null;
} else {
// Capture the call stack. It is significantly faster to do that via an exception than
// via Thread.getStackTrace(), because exceptions lazily initialize the stack trace,
// initially only
// capture a lightweight native pointer, and convert that into the stack trace lazily
// when needed.
final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
// execute an asynchronous call
final CompletableFuture<?> resultFuture =
ask(rpcInvocation, futureTimeout)
.thenApply(
resultValue ->
deserializeValueIfNeeded(
resultValue, method, flinkClassLoader));
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
resultFuture.whenComplete(
(resultValue, failure) -> {
if (failure != null) {
completableFuture.completeExceptionally(
resolveTimeoutException(
ExceptionUtils.stripCompletionException(failure),
callStackCapture,
address,
rpcInvocation));
} else {
completableFuture.complete(resultValue);
}
});
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
} else {
try {
result = completableFuture.get(futureTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException ee) {
throw new RpcException(
"Failure while obtaining synchronous RPC result.",
ExceptionUtils.stripExecutionException(ee));
}
}
}
return result;
}
}
3.3.4 PekkoRpcService 类
public class PekkoRpcService implements RpcService {
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint);
final ActorRef actorRef = actorRegistration.getActorRef();
final CompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();
LOG.info(
"Starting RPC endpoint for {} at {} .",
rpcEndpoint.getClass().getName(),
actorRef.path());
final String address = PekkoUtils.getRpcURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(PekkoBasedEndpoint.class);
final InvocationHandler invocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedPekkoInvocationHandler
invocationHandler =
new FencedPekkoInvocationHandler<>(
address,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
actorTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks,
flinkClassLoader);
} else {
invocationHandler =
new PekkoInvocationHandler(
address,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
actorTerminationFuture,
captureAskCallstacks,
flinkClassLoader);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
invocationHandler);
return server;
}
}
3.3.5 PekkoRpcActor 类
PekkoInvocationHandler 中的远端调用invokeRpc 方法并没有直接调用 invoke 方法,而是将所需参数封装为 RpcInvocation 通过 tell 或 ask 发送到 PekkoRpcActor ,在该类中调用 invoke 方法(tell和ask是Pekko中的通信方式,后面会提到)。
class PekkoRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
.match(ControlMessages.class, this::handleControlMessage)
.matchAny(this::handleMessage)
.build();
}
private void handleMessage(final Object message) {
if (state.isRunning()) {
mainThreadValidator.enterMainThread();
try {
handleRpcMessage(message);
} finally {
mainThreadValidator.exitMainThread();
}
} else {
log.info(
"The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
rpcEndpoint.getClass().getName(),
message);
sendErrorIfSender(
new EndpointNotStartedException(
String.format(
"Discard message %s, because the rpc endpoint %s has not been started yet.",
message, getSelf().path())));
}
}
protected void handleRpcMessage(Object message) {
if (message instanceof RunAsync) {
handleRunAsync((RunAsync) message);
} else if (message instanceof CallAsync) {
handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
} else {
log.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
message.getClass().getName(),
message);
sendErrorIfSender(
new UnknownMessageException(
"Received unknown message "
+ message
+ " of type "
+ message.getClass().getSimpleName()
+ '.'));
}
}
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
Method rpcMethod = null;
try {
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
} catch (final NoSuchMethodException e) {
log.error("Could not find rpc method for rpc invocation.", e);
RpcConnectionException rpcException =
new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
}
if (rpcMethod != null) {
try {
// this supports declaration of anonymous classes
rpcMethod.setAccessible(true);
final Method capturedRpcMethod = rpcMethod;
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
runWithContextClassLoader(
() -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} else {
final Object result;
try {
result =
runWithContextClassLoader(
() ->
capturedRpcMethod.invoke(
rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} catch (InvocationTargetException e) {
log.debug(
"Reporting back error thrown in remote procedure {}", rpcMethod, e);
// tell the sender about the failure
getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
}
final String methodName = rpcMethod.getName();
final boolean isLocalRpcInvocation =
rpcMethod.getAnnotation(Local.class) != null;
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
sendAsyncResponse(responseFuture, methodName, isLocalRpcInvocation);
} else {
sendSyncResponse(result, methodName, isLocalRpcInvocation);
}
}
} catch (Throwable e) {
log.error("Error while executing remote procedure call {}.", rpcMethod, e);
// tell the sender about the failure
getSender().tell(new Status.Failure(e), getSelf());
}
}
}
}
3.4 Flink RPC 底层使用动态代理做什么
- 动态代理用到的所有类都是 RpcGateway 的实现,也就是说创建的的 RpcGateway 接口对应实现类的动态代理,比如 ResourceManagerGateway 类;
- PekkoInvocationHandler 类实现 invoke 方法,是将代理类的方法,参数类型,参数封装为 RpcInvocation 对象,之后通过 Pello.tell 、Pekko.ask 方法将 RpcInvocation 作为消息发送到代理接口所在的进程中;
- 代理接口所在的进程中,接收到消息以后会调用对应的方法。