如何将 MDC 与线程池一起使用?

2022-08-31 06:48:40

在我们的软件中,我们广泛使用MDC来跟踪诸如Web请求的会话ID和用户名之类的内容。这在原始线程中运行时工作正常。

但是,有很多事情需要在后台处理。为此,我们使用 and 类以及一些自卷轴异步执行服务。所有这些服务都管理自己的线程池。java.concurrent.ThreadPoolExecutorjava.util.Timer

以下是Logback的手册关于在这样的环境中使用MDC的说法:

映射的诊断上下文的副本不能总是由工作线程从起始线程继承。当java.util.concurrent.Executors用于线程管理时,就是这种情况。例如,newCachedThreadPool 方法创建了一个 ThreadPoolExecutor,与其他线程池代码一样,它具有复杂的线程创建逻辑。

在这种情况下,建议在将任务提交给执行器之前,在原始(主)线程上调用 MDC.getCopyOfContextMap()。当任务运行时,作为其第一个操作,它应调用 MDC.setContextMapValues() 将原始 MDC 值的存储副本与新的执行器托管线程相关联。

这很好,但是很容易忘记添加这些调用,并且在为时已晚之前没有简单的方法来识别问题。Log4j 的唯一标志是日志中缺少 MDC 信息,而使用 Logback,您将获得过时的 MDC 信息(因为线程池中的线程从在其上运行的第一个任务继承其 MDC)。两者都是生产系统中的严重问题。

我不认为我们的情况有任何特殊之处,但我在网上找不到太多关于这个问题的信息。显然,这不是很多人碰到的东西,所以一定有办法避免它。我们在这里做错了什么?


答案 1

是的,这也是我遇到的一个常见问题。有一些解决方法(如所述手动设置),但理想情况下,您需要一个解决方案

  • 一致地设置 MDC;
  • 避免MDC不正确但您不知道的隐性错误;和
  • 最大限度地减少对线程池使用方式的更改(例如,无处不在的子类化,或类似的丑陋)。CallableMyCallable

这是我使用的解决方案可以满足这三个需求。代码应该是不言自明的。

(作为旁注,如果您使用番石榴,则可以创建此执行器并将其提供给番石榴。MoreExecutors.listeningDecorator()ListanableFuture

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

答案 2

我们遇到了类似的问题。您可能希望扩展 ThreadPoolExecutor 并重写之前/之后Execute 方法,以便在启动/停止新线程之前进行所需的 MDC 调用。


推荐