消费者在 Apache Kafka 中消费消息的延迟

2022-09-02 22:27:09

我正在使用Kafka 0.8.0并尝试实现下面提到的场景。

JCA API(充当生产者并向其发送数据)-----> Consumer------> HBase

一旦我使用JCA客户端获取数据,我就将每条消息发送给消费者。例如,一旦生产者发送消息No.1,我想从消费者那里获取相同的消息并“put”到HBase中。但是我的消费者开始在一些随机的n条消息之后获取消息。我想让生产者和消费者保持同步,这样他们俩就可以开始一起工作了。

我用过:

1个经纪人

1 个单一主题

1个单一生产者和高级消费者

任何人都可以建议我需要做些什么来实现同样的目标吗?

编辑:

添加一些相关的代码片段。

消费者.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

制片人.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

卡夫卡优点.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

这就是消费者在前10条消息中的行为方式,它不会阻止消费者收到的该消息,但从第11条消息开始,它开始正常工作。

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

已编辑:添加侦听器函数,其中生产者向使用者发送消息。而且我使用的是默认的生产者配置没有覆盖它

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

答案 1
  1. 尝试添加到创建器配置。默认情况下,生产者不会等待 acks,并且不能保证消息传递。因此,如果您在测试之前启动 broker,则创建者可能会在 broker 完全初始化之前开始发送消息,并且前几个消息可能会丢失。props.put("request.required.acks", "1")

  2. 尝试添加到使用者配置。它等于 kafka-console-consumer.sh 选项。如果您的使用者启动时间晚于生产者,并且 Zookeeper 中没有保存偏移数据,则默认情况下,它将开始仅使用新消息(请参阅文档中的使用者配置)。props.put("auto.offset.reset", "smallest")--from-beginning


答案 2

其中一种可能性是Kafka Lag。可能是使用者被过多的分区所过载。或者每条消息的处理成本非常高。


推荐