使用 MDC 实现日志链路跟踪,包教包会!

戳上方蓝字“Java面试题精选”关注我!

1、背景简述

在技术运维过程中,很难从某服务庞杂的日志中,单独找寻出某次 API 调用的全部日志。

为提高排查问题的效率,在多个系统及应用内根据 统一的 TraceId 查找同一次请求链路上的日志,根据日志快速定位问题,同时需对业务代码无侵入,特别是在高频请求下,也可以方便的搜索此次请求的日志内容。

本此分享一个使用 MDC 实现日志链路跟踪,在微服务环境中,我们经常使用 Skywalking、Spring Cloud Sleut 等去实现整体请求链路的追踪,但是这个整体运维成本高,架构复杂,本次我们来使用 MDC 通过 Log 来实现一个轻量级的会话事务跟踪功能,需要的朋友可以参考一下。

1.1 应用效果图

我们知道了 MDC 的好处后,其实在用户从第一时间调用请求时候,我们其实可以将请求增加 traceid 一并返回,这样用户反馈时候,我们直接用 traceid 就可以全链路追踪到所有请求的情况了,做到信息的闭环。

请求效果图:

使用 MDC 实现日志链路跟踪,包教包会!

LOGBOOK 效果图:

使用 MDC 实现日志链路跟踪,包教包会!

2、关键思路

2.1 MDC

日志追踪目标是每次请求级别的,也就是说同一个接口的每次请求,都应该有不同的 traceId。每次接口请求,都是一个单独的线程,所以自然我们很容易考虑到通过 ThreadLocal 实现上述需求。考虑到 log4j 本身已经提供了类似的功能 MDC,所以直接使用 MDC 进行实现。

关于 MDC 的简述

MDC(Mapped Diagnostic Context)是一个映射,用于存储运行上下文的特定线程的上下文数据。因此,如果使用 log4j 进行日志记录,则每个线程都可以拥有自己的 MDC,该 MDC 对整个线程是全局的。属于该线程的任何代码都可以轻松访问线程的 MDC 中存在的值。

API 说明

  • clear() => 移除所有 MDC

  • get (String key) => 获取当前线程 MDC 中指定 key 的值

  • getContext() => 获取当前线程 MDC 的 MDC

  • put(String key, Object o) => 往当前线程的 MDC 中存入指定的键值对

  • remove(String key) => 删除当前线程 MDC 中指定的键值对

3、目标

  • 需要一个全服务唯一的 id,即 traceId,如何保证?
  • traceId 如何在服务内部传递?
  • traceId 如何在服务间传递?
  • traceId 如何在多线程中传递?

4、实现方式

4.1 需要一个全服务唯一的 id,即 traceId,如何保证?

使用最简单的 uuid 即可。复杂的话可以配置 Redis、雪花算法等方式。本次分享选最简单 uuid 生成 traceId 的方式。

4.2 traceId 如何在服务间传递?

1)在 XML 的日志格式中添加 %X{traceId} 配置。

<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%X{traceId}] [%p] %l[%t]%n%m%n" />
    </layout>
</appender>

2)新增拦截器,拦截所有请求,从 header 中获取 traceId 然后放到 MDC 中,如果没有获取到,则直接用 UUID 生成一个。

@Slf4j
@Component
public class LogInterceptor implements HandlerInterceptor {
    private static final String TRACE_ID = "traceId";

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
            Exception arg3)
 throws Exception 
{
    }

    @Override
    public void postHandle(HttpServletRequest request,
            HttpServletResponse response, Object handler, ModelAndView arg3)
 throws Exception 
{
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
            throws Exception 
{
        String traceId = request.getHeader(TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {
            MDC.put(TRACE_ID, UUID.randomUUID().toString());
        } else {
            MDC.put(TRACE_ID, traceId);
        }
        return true;
    }
}

3)配置拦截器

@Configuration
public class WebConfig implements WebMvcConfigurer {
    @Resource
    private LogInterceptor logInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(logInterceptor).addPathPatterns("/**");
    }
}

4.3 traceId 如何在服务间传递?

封装 HTTP 工具类,把 traceId 加入头中,带到下一个服务。

@Slf4j
public class HttpUtils {
    public static String get(String url) throws URISyntaxException {
        RestTemplate restTemplate = new RestTemplate();
        MultiValueMap<String, String> headers = new HttpHeaders();
        headers.add("traceId", MDC.get("traceId"));
        URI uri = new URI(url);
        RequestEntity<?> requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);
        ResponseEntity exchange = restTemplate.exchange(requestEntity, String.class);
        if (exchange.getStatusCode().equals(HttpStatus.OK)) {
            log.info("send http request success");
        }
        return exchange.getBody();
    }
}

4.4 traceId 如何在多线程中传递?

Spring 项目也使用到了很多线程池,比如 @Async 异步调用,Zookeeper 线程池、 Kafka 线程池等。不管是哪种线程池都大都支持传入指定的线程池实现,拿 @Async 举例:

原理为:

MDC 底层使用 ThreadLocal 来实现,那根据 ThreadLocal 的特点,它是可以让我们在同一个线程中共享数据的,但是往往我们在业务方法中,会开启多线程来执行程序,这样的话 MDC 就无法传递到其他子线程了。这时,我们需要使用额外的方法来传递存在 ThreadLocal 里的值。

MDC 提供了一个叫 getCopyOfContextMap 的方法,很显然,该方法就是把当前线程 ThreadLocal 绑定的Map获取出来,之后就是把该 Map 绑定到子线程中的ThreadLocal 中了。

改造 Spring 的异步线程池,包装提交的任务。

@Slf4j
@Component
public class TraceAsyncConfigurer implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-pool-");
        executor.setTaskDecorator(new MdcTaskDecorator());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(),
                Arrays.toString(params));
    }

    public static class MdcTaskDecorator implements TaskDecorator {
        @Override
        public Runnable decorate(Runnable runnable) {
            Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                try {
                    runnable.run();
                } finally {
                    MDC.clear();
                }
            };
        }
    }
}

public class MDCLogThreadPoolExecutor extends ThreadPoolExecutor {
    public MDCLogThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue workQueue)
 
{
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(MDCLogThreadPoolExecutor.executeRunable(command, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(MDCLogThreadPoolExecutor.executeRunable(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future submit(Callable callable) {
        return super.submit(MDCLogThreadPoolExecutor.submitCallable(callable, MDC.getCopyOfContextMap()));
    }

    public static Runnable executeRunable(Runnable runnable, Map<String, String> mdcContext) {
        return new Runnable() {
            @Override
            public void run() {
                if (mdcContext == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(mdcContext);
                }
                try {
                    runnable.run();
                } finally {
                    MDC.clear();
                }
            }
        };
    }

    private static Callable submitCallable(Callable callable, Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }
}

接下来需要对 ThreadPoolTaskExecutor 的方法进行重写:

package com.example.demo.common.threadpool;

import com.example.demo.common.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 * MDC线程池
 * 实现内容传递
 * @author wangbo
 * @date 2021/5/13
 */

@Slf4jpublic
class MdcTaskExecutor extends ThreadPoolTaskExecutor {
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        log.info("mdc thread pool task executor submit");
        Map<String, String> context = MDC.getCopyOfContextMap();

        return super.submit(() -> {
            T result;
            if (context != null) {
                // 将父线程的MDC内容传给子线程
                MDC.setContextMap(context);
            } else {
                // 直接给子线程设置MDC
                MDC.put(Constants.LOG_MDC_ID, UUID.randomUUID().toString().replace("-"""));
            }
            try {
                // 执行任务
                result = task.call();
            } finally {
                try {
                    MDC.clear();
                } catch (Exception e) {
                    log.warn("MDC clear exception", e);
                }
            }
            return result;
        });
    }

    @Override
    public void execute(Runnable task) {
        log.info("mdc thread pool task executor execute");
        Map<String, String> context = MDC.getCopyOfContextMap();
        super.execute(() -> {
            if (context != null) {
                // 将父线程的MDC内容传给子线程
                MDC.setContextMap(context);
            } else {
                // 直接给子线程设置MDC
                MDC.put(Constants.LOG_MDC_ID, UUID.randomUUID().toString().replace("-"""));
            }
            try {
                // 执行任务
                task.run();
            } finally {
                try {
                    MDC.clear();
                } catch (Exception e) {
                    log.warn("MDC clear exception", e);
                }
            }
        });
    }
}

然后使用自定义的重写子类 MdcTaskExecutor 来实现线程池配置:

/**
 * 线程池配置
 * 
 * @author wangbo
 * @date 2021/5/13
 */

@Slf4j
@Configurationpublic
class ThreadPoolConfig {
    /** * 异步任务线程池 * 用于执行普通的异步请求,带有请求链路的MDC标志 */
    @Bean
    public Executor commonThreadPool() {
        log.info("start init common thread pool"); // ThreadPoolTaskExecutor
        executor = new ThreadPoolTaskExecutor();
        MdcTaskExecutor executor = new MdcTaskExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(3000);
        // 配置空闲线程存活时间
        executor.setKeepAliveSeconds(120);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("common-thread-pool-");
        // 当达到最大线程池的时候丢弃最老的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }

    /**
     * 定时任务线程池
     * 用于执行自启动的任务执行,父线程不带有MDC标志,不需要传递,直接设置新的MDC
     * 和上面的线程池没啥区别,只是名字不同
     */

    @Bean
    public Executor scheduleThreadPool() {
        log.info("start init schedule thread pool");
        MdcTaskExecutor executor = new MdcTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(3000);
        executor.setKeepAliveSeconds(120);
        executor.setThreadNamePrefix("schedule-thread-pool-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.initialize();
        return executor;
    }
}

5、扩展点

5.1 JSF 接口日志追踪的应用

项目中也运用到了大量的 JSF 接口,我们其实可以按照上述的思路进行服务间的传递。

调用端:

// todo 不能在filter里面这么用
RpcContext.getContext().setAttachment("user""zhanggeng");
RpcContext.getContext().setAttachment(".passwd""11112222"); 
// "."开头的对应上面的hide=truexxxService.yyy();

// 再开始调用远程方法
// 重要:下一次调用要重新设置,之前的属性会被删除
RpcContext.getContext().setAttachment("user""zhanggeng");
RpcContext.getContext().setAttachment(".passwd""11112222"); 
// "."开头的对应上面的hide=truexxxService.zzz();
// 再开始调用远程方法

Provider 端:

1.filter 中直接获取,包括标记为 hidden 的参数。通过 Rpccontext 无法获取。

String consumerToken = (String) invocation.getAttachment(".passwd");

2.服务端业务代码中直接获取。

String user = RpcContext.getContext().getAttachment("user");

提示:调用链中的隐式传参。

注意:在调用链例如 A–>B–>C,A和B都要隐私传参的时候,由于是同一个线程,会出现数据污染。例如 A 发参数 P1 给 B,B 收到请求拿到 P1 同时要发参数 P2 给 C,那么 C 会直接拿到 P1、P2。这种情况,就要求 B 收到 P1,然后设置 P2 调用 C 之前,要求自己清空上下文数据(RpcContext.getContext().clearAttachments();

5.2 接口返回值应用

我们知道了 MDC 的好处后,其实在用户从第一时间调用请求时候,我们其实可以将有误的请求增加 traceid 一并返回。这样用户反馈时候,我们直接用 traceid 就可以全链路追踪到所有请求的情况了,做到信息的闭环。

效果图:

使用 MDC 实现日志链路跟踪,包教包会!

6、备注

各位知道了日志追踪的原理,其实很多应用场景可以继续补充,例如 MQ,JD 的其他中间件也可以应用相同原理进行追踪。其实,当了解了底层的原理后,我们其实就可以了解到 JD 监控中间件 PFinder 监控等中间件是如何做的了。

本次由于时间情况,就不进行扩展了,各位可以线下去了解 Skywalking 分布式链路追踪系统,就可以知道,万变不离其宗。

来源:blog.51cto.com/u_15714439/10281006
后端专属技术群

构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!

文明发言,以交流技术职位内推行业探讨为主

广告人士勿入,切勿轻信私聊,防止被骗

使用 MDC 实现日志链路跟踪,包教包会!

加我好友,拉你进群

使用 MDC 实现日志链路跟踪,包教包会!

原文始发于微信公众号(Java面试题精选):使用 MDC 实现日志链路跟踪,包教包会!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/292729.html

(0)
Java知音的头像Java知音bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!