我尝试用java和spark读取csv。
现在我这样做:
String master = "local[2]";
String csvInput = "/home/username/Downloads/countrylist.csv";
String csvOutput = "/home/username/Downloads/countrylist";
JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> csvData = sc.textFile(csvInput, 1);
JavaRDD<List<String>> lines = csvData.map(new Function <String, List<String>>() {
@Override
public List<String> call(String s) {
return new ArrayList<String>(Arrays.asList(s.split("\\s*,\\s*")));
}
});
所以我将 csv 文件的所有“行”作为我的 RDD 中的一行。我还编写了这个方法来获取列:
public static JavaRDD<String> getColumn (JavaRDD<List<String>> data, final int index)
{
return data.flatMap(
new FlatMapFunction <List<String>, String>()
{
public Iterable<String> call (List<String> s)
{
return Arrays.asList(s.get(index));
}
}
);
}
但后来我想对列进行许多转换并更改列的位置等。因此,将 RDD 中的列填充为数组列表而不是行会更容易。
有谁知道如何实现这一目标?我不想多次调用“getColumn()”。
如果你能帮助我那就太好了。
解释:我的 csvData 看起来像这样:
one, two, three
four, five, six
seven, eight, nine
我的 RDD 线路如下所示:
[one, two, three]
[four, five, six]
[seven, eigth, nine]
But我要这个:
[one, four, seven]
[two, five, eight]
[three, six, nine]