在Java中,我如何创建一个相当于Apache Avro容器文件而不被迫使用File作为媒介?

2022-09-02 04:53:10

这有点像在黑暗中拍摄的镜头,以防任何精通Apache Avro的Java实现的人正在阅读这篇文章。

我的高级目标是有某种方式通过网络传输一些系列的avro数据(例如,让我们说HTTP,但特定的协议对于这个目的并不那么重要)。在我的上下文中,我有一个HttpServletResponse,我需要以某种方式将这些数据写入其中。

我最初尝试将数据写入相当于avro容器文件的虚拟版本(假设“响应”的类型为HttpServletResponse):

response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);

Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();

这一切都很好,很花哨,除了事实证明Avro并没有真正提供一种读取容器文件的方法,除了实际文件:DataFileReader只有两个构造函数:

public DataFileReader(File file, DatumReader<D> reader);

public DataFileReader(SeekableInput sin, DatumReader<D> reader);

其中,SeekableInput是一些特定于avro的自定义表单,其创建也最终从文件中读取。现在考虑到这一点,除非有某种方法以某种方式将InputStream强制转换为文件(http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte-array-in-memory-without-a 表明没有,并且我也尝试过查看Java文档),否则如果TransputStream另一端的读取器收到该avro容器文件(我不确定为什么他们允许将avro二进制容器文件输出到任意 OutputStream没有提供一种从另一端的相应输入流读取它们的方法,但这不是重点)。容器文件读取器的实现似乎需要具体文件提供的“可搜索”功能。

好吧,所以看起来这种方法不会做我想要的。如何创建一个模仿 avro 容器文件的 JSON 响应?

public static Schema WRAPPER_SCHEMA = Schema.parse(
  "{\"type\": \"record\", " +
   "\"name\": \"AvroContainer\", " +
   "\"doc\": \"a JSON avro container file\", " +
   "\"namespace\": \"org.bar.foo\", " +
   "\"fields\": [" +
     "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
     "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
  );

鉴于上述约束,我不确定这是否是实现这一目标的最佳方法,但看起来这可能可以解决问题。我将架构(例如,上面的“Schema someSchema”)作为字符串放在“schema”字段中,然后放入适合该架构的记录的avro-binary序列化形式(即。“GenericRecord someRecord”),在“data”字段中。

我实际上想知道下面描述的具体细节,但我认为也值得给出一个更大的背景,这样如果有更好的高级方法我可以采取(这种方法有效,但感觉不是最佳的),请让我知道。

我的问题是,假设我采用这种基于JSON的方法,如何将我的记录的avro二进制表示形式写入AvroContainer架构的“数据”字段?例如,我到达了这里:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();

GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));

我最初尝试省略ByteBuffer.wrap子句,但后来行

datumWriter.write(someRecord, e);

抛出了一个异常,我无法将字节数组转换为ByteBuffer。公平地说,看起来当调用 Encoder 类(JsonEncoder 是其中的子类)来编写 avro Bytes 对象时,它需要一个 ByteBuffer 作为参数给出。因此,我尝试用java.nio.ByteBuffer.wrap封装byte[],但是当数据被打印出来时,它被打印成一系列直的字节,而没有通过avro十六进制表示:

"data": {"bytes": ".....some gibberish other than the expected format...}

这似乎不对。根据avro文档,他们给出的示例bytes对象说我需要放入一个json对象,其中一个示例看起来像“\u00FF”,而我放入其中的内容显然不是该格式。我现在想知道的是以下内容:

  • 什么是avro字节格式的示例?它看起来像“\uDEADBEEFDEADBEEF...”吗?
  • 如何将我的二进制 avro 数据(由 BinaryEncoder 输出到 byte[] 数组中)强制转换为可以粘贴到 GenericRecord 对象中并使其以 JSON 格式正确打印的格式?例如,我想要一个对象数据,我可以调用一些GenericRecord“someRecord.put(”data“,DATA);”,里面有我的avro序列化数据?
  • 然后,当数据被赋予文本JSON表示形式并希望重新创建由AvroContainer格式JSON表示的GenericRecord时,我如何将该数据读回另一端(消费者)端的字节数组中?
  • (重申之前的问题)有没有更好的方法可以做到这一切?

答案 1

正如Knut所说,如果你想使用文件以外的其他东西,你可以:

  • 正如Knut所说,使用SeekableByteArrayInput,对于任何你可以硬塞进字节数组的东西。
  • 以你自己的方式实现SeekablInput - 例如,如果你从一些奇怪的数据库结构中获取它。
  • 或者只是使用一个文件。为什么不呢?

这些是你的答案。


答案 2

我解决这个问题的方法是将架构与数据分开发布。我设置了一个连接握手,从服务器向下传输架构,然后来回发送编码数据。您必须创建一个外部包装器对象,如下所示:

{'name':'Wrapper','type':'record','fields':[
  {'name':'schemaName','type':'string'},
  {'name':'records','type':{'type':'array','items':'bytes'}}
]}

首先将记录数组逐个编码为编码字节数组的位置。一个数组中的所有内容都应具有相同的架构。然后,使用上述架构对包装器对象进行编码 - 将“schemaName”设置为用于对数组进行编码的架构的名称。

在服务器上,您将首先解码包装器对象。解码包装器对象后,您就知道了 schemaName,并且您有一个知道如何解码的对象数组 - 随心所欲地使用!

请注意,如果您使用类似 Socket.IO(对于 Node.js)之类的协议和引擎,则可以不使用包装器对象,Socket.io 在浏览器和服务器之间提供了基于信道的通信层。在这种情况下,只需为每个通道使用特定的架构,在发送之前对每条消息进行编码。在连接启动时,您仍然需要共享架构 - 但是如果您正在使用,则很容易实现。完成后,客户端和服务器之间有任意数量的强类型双向流。WebSocketsWebSockets