skywalking09 - 异步线程链路续接(下)

x33g5p2x  于2021-12-21 转载在 其他  
字(8.7k)|赞(0)|评价(0)|浏览(606)

在上篇,我们提到了,多线程可能会导致链路断开,而可以通过三种方式将其接上。那你有没有好奇,为什么它会断开,它又是怎么接上的呢?

链路为何断开

要知道链路为何断开,我们就需要知道,正常情况下的链路是如何工作的,几个Span之间是如何接在一起的。我们可以通过第四篇提到的@Trace注解进行入手,这个注解会增加一个Span。

正常情况下@Trace添加Span

对skywalking源码有一定了解的你一定知道,其对类做修改增强的时候,会定义一个该类全类名的字符串,以及会用来增强该类的增强类的全类名,所以我们找到了TraceAnnotationActivation

/** * {@link TraceAnnotationActivation} enhance all method that annotated with <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace</code> * by <code>TraceAnnotationMethodInterceptor</code>. */
public class TraceAnnotationActivation extends ClassInstanceMethodsEnhancePluginDefine {
   // 用来增强的类
    public static final String TRACE_ANNOTATION_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.TraceAnnotationMethodInterceptor";
    // 被增强处理的注解
    public static final String TRACE_ANNOTATION = "org.apache.skywalking.apm.toolkit.trace.Trace";

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new DeclaredInstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return isAnnotatedWith(named(TRACE_ANNOTATION));
                }

                @Override
                public String getMethodsInterceptor() {
                    return TRACE_ANNOTATION_METHOD_INTERCEPTOR;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    protected ClassMatch enhanceClass() {
        return MethodAnnotationMatch.byMethodAnnotationMatch(TRACE_ANNOTATION);
    }
}

然后我们去翻查TraceAnnotationMethodInterceptor:

/** * {@link TraceAnnotationMethodInterceptor} create a local span and set the operation name which fetch from * <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace.operationName</code>. if the fetch value is blank * string, and the operation name will be the method name. */
public class TraceAnnotationMethodInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        Trace trace = method.getAnnotation(Trace.class);
        final AbstractSpan localSpan = ContextManager.createLocalSpan(operationName);

    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
   
        ContextManager.stopSpan();
        
        return ret;
    }

}

代码做了一定精简,我们可以看到,在beforeMethod()方法中,其使用ContextManager创建了一个LocalSpan,在afterMethod()方法中,使用了ContextManager停止了Span。而ContextManager是skywalking链路管理的一个核心的类,那么也一定是它在创建Span的时候没有续接上导致的。

ContextManager创建Span的核心

/** * {@link ContextManager} controls the whole context of {@link TraceSegment}. Any {@link TraceSegment} relates to * single-thread, so this context use {@link ThreadLocal} to maintain the context, and make sure, since a {@link * TraceSegment} starts, all ChildOf spans are in the same context. <p> What is 'ChildOf'? * https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans * * <p> Also, {@link ContextManager} delegates to all {@link AbstractTracerContext}'s major methods. */
public class ContextManager implements BootService {
    private static final String EMPTY_TRACE_CONTEXT_ID = "N/A";
    private static final ILog LOGGER = LogManager.getLogger(ContextManager.class);
    private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
    private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
    private static ContextManagerExtendService EXTEND_SERVICE;

    private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                if (LOGGER.isDebugEnable()) {
                    LOGGER.debug("No operation name, ignore this trace.");
                }
                context = new IgnoredTracerContext();
            } else {
                if (EXTEND_SERVICE == null) {
                    EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
                }
                context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);

            }
            CONTEXT.set(context);
        }
        return context;
    }

    private static AbstractTracerContext get() {
        return CONTEXT.get();
    }
    
        public static AbstractSpan createLocalSpan(String operationName) {
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        return context.createLocalSpan(operationName);
    }
}
  • 看到人家写的注释没,“Any TraceSegment relates to single-thread, so this context use ThreadLocal to maintain the context, and make sure, since a TraceSegment starts, all ChildOf spans are in the same context.” 一条链路就是一个单线程的,所以用了ThreadLocal来保存,让我们自己来保证,子Span是同一个上下文中的。
  • ThreadLocal<AbstractTracerContext> CONTEXT 这一个变量,用来存Span,那难怪了,新的线程中,它就是断开的

链路如何续接

​ 我们搞清楚了,断开是因为CONTEXT是存在ThreadLocal中的,导致新的线程中没有上下文,那么我们只要将父线程的上下文传入进去,就可以完成续接。那让我们来看看skywalking是怎么做的。我们以@TraceCrossThread为例,其他方式大体思路是一致的。

/** * {@link CallableOrRunnableActivation} presents that skywalking intercepts all Class with annotation * "org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named "call" or "run". */
public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePluginDefine {

    public static final String ANNOTATION_NAME = "org.apache.skywalking.apm.toolkit.trace.TraceCrossThread";
    private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableConstructInterceptor";
    private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.toolkit.activation.trace.CallableOrRunnableInvokeInterceptor";
    private static final String CALL_METHOD_NAME = "call";
    private static final String RUN_METHOD_NAME = "run";
    private static final String GET_METHOD_NAME = "get";

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[] {
            new ConstructorInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return any();
                }

                @Override
                public String getConstructorInterceptor() {
                    return INIT_METHOD_INTERCEPTOR;
                }
            }
        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(CALL_METHOD_NAME)
                        .and(takesArguments(0))
                        .or(named(RUN_METHOD_NAME).and(takesArguments(0)))
                        .or(named(GET_METHOD_NAME).and(takesArguments(0)));
                }

                @Override
                public String getMethodsInterceptor() {
                    return CALL_METHOD_INTERCEPTOR;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    protected ClassMatch enhanceClass() {
        return byClassAnnotationMatch(new String[] {ANNOTATION_NAME});
    }

}

通过全局搜索,我们找到CallableOrRunnableActivation,它完成了对"org.skywalking.apm.toolkit.trace.TraceCrossThread" and method named “call” or "run"的增强。增强方式分为构造时增强、以及对方法的增强。

CallableOrRunnableConstructInterceptor

public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
    }

}

在构造的时候,ContextManager对当前的上下文做了一次快照,并存到skyWalkingDynamicField这个动态属性中,共子线程来取。

CallableOrRunnableInvokeInterceptor

public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
        ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (cachedObjects != null) {
            ContextManager.continued(cachedObjects);
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        ContextManager.stopSpan();
        // clear ContextSnapshot
        objInst.setSkyWalkingDynamicField(null);
        return ret;
    }

}

这个类,将skyWalkingDynamicField这个动态属性中的内容取出,并通过“ContextManager.continued(cachedObjects);”完成了续接。最后,在afterMethod()中,也完成了对Span的关闭。

TracingContext#continued

/** * Continue the context from the given snapshot of parent thread. * * @param snapshot from {@link #capture()} in the parent thread. Ref to {@link AbstractTracerContext#continued(ContextSnapshot)} */
    @Override
    public void continued(ContextSnapshot snapshot) {
        if (snapshot.isValid()) {
            TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot);
            this.segment.ref(segmentRef);
            this.activeSpan().ref(segmentRef);
            this.segment.relatedGlobalTraces(snapshot.getTraceId());
            this.correlationContext.continued(snapshot);
            this.extensionContext.continued(snapshot);
            this.extensionContext.handle(this.activeSpan());
        }
    }

这个注释也很明白的说明了,这个上下文会将父线程的快照进行续接。

总结

第一步,通过对对象的构造方法进行增强,将链路上下文快照作为动态属性赋值给子线程;第二步,子线程的异步方法在开始前,将快照续接上并创建新的Span,方法结束后将Span关闭。

相关文章