调试自定义 Kafka 连接器的简单、有效的方法是什么?

我正在处理几个Kafka连接器,并且在控制台输出中没有看到它们的创建/部署中的任何错误,但是我没有得到我正在寻找的结果(没有任何结果,无论想要还是其他)。我基于 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J Logger 的使用。我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是否在错误的位置查找这些邮件?或者也许有更好的方法来调试这些连接器?

我在实现中引用的SLF4J记录器的示例用法:

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask


答案 1

我将尽量以广泛的方式回答你的问题。执行连接器开发的简单方法可能如下所示:

  • 通过查看众多公开可用的 Kafka 连接器之一来构建和构建连接器源代码(您将在此处找到一个广泛的列表:https://www.confluent.io/product/connectors/ )
  • https://www.confluent.io/download/ 下载最新的 Confluent 开源版本 (>= 3.3.0)
  • 通过以下方式之一使连接器包可用于 Kafka Connect:

    1. 将所有连接器 jar 文件(连接器 jar 加上依赖项 jar(不包括 Connect API jar))存储到文件系统中的某个位置,并通过将此位置添加到 Connect worker 属性中的属性来启用插件隔离。例如,如果连接器 jar 存储在 中,则将在工作线程的属性中设置(见下文)。plugin.path/opt/connectors/my-first-connectorplugin.path=/opt/connectors
    2. 将所有连接器 jar 文件存储在 下的文件夹中。例如:。(需要以前缀开头,以便启动脚本选取)。$CONFLUENT_HOME 是您安装 Confluent Platform 的地方。${CONFLUENT_HOME}/share/java${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connectorkafka-connect-
  • (可选)通过更改“连接到 ”甚至 “的日志级别来增加日志记录。${CONFLUENT_HOME}/etc/kafka/connect-log4j.propertiesDEBUGTRACE

  • 使用 Confluent CLI 启动所有服务,包括 Kafka Connect。详情请点击此处:http://docs.confluent.io/current/connect/quickstart.html

    简要:confluent start

注: CLI 当前加载的 Connect worker 的属性文件是 。如果您选择启用类加载隔离,但如果需要更改 Connect worker 的属性,则应编辑该文件。${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties

  • 运行连接辅助角色后,通过运行以下命令启动连接器:

    confluent load <connector_name> -d <connector_config.properties>

    confluent load <connector_name> -d <connector_config.json>

    连接器配置可以是 java 属性或 JSON 格式。

  • 运行以打开 Connect 辅助角色的日志文件,或通过运行直接导航到存储日志和数据的位置confluent log connect

    cd "$( confluent current )"

注意:通过适当地设置环境变量,更改 Confluent CLI 会话期间日志和数据的存储位置。例如,给定存在并且要存储数据的位置,请运行:CONFLUENT_CURRENT/opt/confluent

export CONFLUENT_CURRENT=/opt/confluent
confluent current

  • 最后,要以交互方式调试连接器,一种可能的方法是在开始使用 Confluent CLI 连接之前应用以下内容:

    confluent stop connect
    export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
    confluent start connect

    ,然后连接到调试器(例如,远程连接到 Connect 工作线程(默认端口:5005)。要停止在调试模式下运行连接,只需运行:完成后。unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;

我希望以上能使您的连接器开发更容易和...更有趣!


答案 2

我喜欢接受的答案。一件事 - 环境变量对我不起作用...我正在使用融合社区版 5.3.1...

这是我所做的工作...

我从这里安装了汇合的cli:https://docs.confluent.io/current/cli/installing.html#tarball-installation

我使用命令运行汇合confluent local start

我使用命令获取了连接应用程序详细信息ps -ef | grep connect

我将生成的命令复制到编辑器并添加了arg(紧跟在java之后):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

然后我停止使用命令连接confluent local stop connect

然后我用arg运行连接命令

中场休息---

vs代码开发由erich gamma领导 - 成名,他也写了eclipse。vs code 正在成为一流的 java ide 参见 https://en.wikipedia.org/wiki/Erich_Gammagang of four

---场休息

接下来我启动了vs代码并打开了debezium oracle连接器文件夹(从这里克隆)https://github.com/debezium/debezium-incubator

然后我选择了Debug - Open Configurations

enter image description here

并进入突出显示的调试配置

enter image description here

然后运行调试器 - 它将命中您的断点!

enter image description here

连接命令应如下所示:

/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties