由于 RCFile 的列性质,行读取路径与写入路径显着不同。我们仍然可以使用 RCFile.Reader 类按行读取 RCFile(不需要 RCFileRecordReader)。但此外,我们还需要使用 ColumnarSerDe 将列数据转换为行数据。
以下是我们可以按行读取 RCFile 的最简化的代码。请参阅内联代码注释了解更多详细信息。
private static void readRCFileByRow(String pathStr)
throws IOException, SerDeException {
final Configuration conf = new Configuration();
final Properties tbl = new Properties();
/*
* Set the column names and types using comma separated strings.
* The actual name of the columns are not important, as long as the count
* of column is correct.
*
* For types, this example uses strings. byte[] can be stored as string
* by encoding the bytes to ASCII (such as hexString or Base64)
*
* Numbers of columns and number of types must match exactly.
*/
tbl.setProperty("columns", "col1,col2,col3,col4,col5");
tbl.setProperty("columns.types", "string:string:string:string:string");
/*
* We need a ColumnarSerDe to de-serialize the columnar data to row-wise
* data
*/
ColumnarSerDe serDe = new ColumnarSerDe();
serDe.initialize(conf, tbl);
Path path = new Path(pathStr);
FileSystem fs = FileSystem.get(conf);
final RCFile.Reader reader = new RCFile.Reader(fs, path, conf);
final LongWritable key = new LongWritable();
final BytesRefArrayWritable cols = new BytesRefArrayWritable();
while (reader.next(key)) {
System.out.println("Getting next row.");
/*
* IMPORTANT: Pass the same cols object to the getCurrentRow API; do not
* create new BytesRefArrayWritable() each time. This is because one call
* to getCurrentRow(cols) can potentially read more than one column
* values which the serde below would take care to read one by one.
*/
reader.getCurrentRow(cols);
final ColumnarStruct row = (ColumnarStruct) serDe.deserialize(cols);
final ArrayList<Object> objects = row.getFieldsAsList();
for (final Object object : objects) {
// Lazy decompression happens here
final String payload =
((LazyString) object).getWritableObject().toString();
System.out.println("Value:" + payload);
}
}
}
在此代码中,getCourrentRow 仍然按列读取数据,我们需要使用 SerDe 将其转换为行。另外,打电话getCurrentRow()
并不意味着该行中的所有字段都已解压。实际上,根据惰性解压缩,一列只有在其字段之一被反序列化时才会被解压缩。为此,我们使用了coulmnarStruct.getFieldsAsList()
获取对惰性对象的引用列表。实际的读取发生在getWritableObject()
调用 LazyString 引用。
实现同样目标的另一种方法是使用StructObjectInspector
并使用copyToStandardObject
API。但我觉得上面的方法更简单。