与 Java 中的 Django/Celery 互操作

2022-09-04 03:36:53

我们公司有一个基于Python的网站和一些基于Python的工作节点,它们通过Django /Celery和RabbitMQ进行通信。我有一个基于Java的应用程序,它需要将任务提交给基于Celery的工人。我可以从Java向RabbitMQ发送工作,但是基于Celery的工人永远不会接手工作。从这两种类型的作业提交的数据包捕获来看,存在差异,但我无法理解如何解释它们,因为其中很多都是二进制的,我找不到有关解码的文档。在座的各位有没有关于Java/RabbitMQ和Celery一起工作的参考或经验?


答案 1

我找到了解决方案。RabbitMQ 的 Java 库指的是交换/队列/路由键。在 Celery 中,队列名称实际上映射到 Java 库中引用的交换。默认情况下,芹菜的队列只是“芹菜”。如果您的 Django 设置使用以下语法定义一个名为“myqueue”的队列:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

然后,基于 Java 的代码需要执行以下操作:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

事实证明,这只是术语的差异。


答案 2