火花:读取输入流而不是文件
2022-09-04 04:33:43
我在Java应用程序中使用SparkSQL对使用Databricks进行解析的CSV文件进行一些处理。
我正在处理的数据来自不同的来源(远程URL,本地文件,Google Cloud Storage),我习惯于将所有内容转换为InputStream,以便我可以解析和处理数据,而无需知道数据来自何处。
我在Spark上看到的所有文档都从路径读取文件,例如
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
我想做的是从 InputStream 读取,甚至只是一个已经在内存中的字符串。如下所示:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
我在这里错过了什么简单的东西吗?
我已经阅读了一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接。Spark Streaming似乎将数据分解成块并对其进行一些处理,期望更多的数据进入无休止的流。
我最好的猜测是,Spark作为Hadoop的后代,期望大量数据可能驻留在某个地方的文件系统中。但是,由于Spark无论如何都会在内存中进行处理,因此对我来说,SparkSQL能够解析内存中已有的数据是有意义的。
任何帮助将不胜感激。