On this page
Importing 和输出接口
Set Up
HCatInputFormat 和 HCatOutputFormat 接口不需要特定于 HCatalog 的设置。
注意 :HCatalog 不是线程安全的。
HCatInputFormat
HCatInputFormat 与 MapReduce 作业一起使用,以从 HCatalogManagement 的表中读取数据。
HCatInputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像已将其发布到表一样。
API
HCatInputFormat 公开的 API 如下所示。这包括:
setInput
setOutputSchema
getTableSchema
要使用 HCatInputFormat 读取数据,请首先使用正在读取的表中的必要信息实例化一个InputJobInfo
,然后使用InputJobInfo
调用setInput
。
您可以使用setOutputSchema
方法包括一个投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。
您可以使用getTableSchema
方法来确定指定 Importing 表的表架构。
/**
* Set the input to use for the Job. This queries the metadata server with
* the specified partition predicates, gets the matching partitions, puts
* the information in the conf object. The inputInfo object is updated with
* information needed in the client context
* @param job the job object
* @param inputJobInfo the input info for table to read
* @throws IOException the exception in communicating with the metadata server
*/
public static void setInput(Job job,
InputJobInfo inputJobInfo) throws IOException;
/**
* Set the schema for the HCatRecord data returned by HCatInputFormat.
* @param job the job object
* @param hcatSchema the schema to use as the consolidated schema
*/
public static void setOutputSchema(Job job,HCatSchema hcatSchema)
throws IOException;
/**
* Get the HCatTable schema for the table specified in the HCatInputFormat.setInput
* call on the specified job context. This information is available only after
* HCatInputFormat.setInput has been called for a JobContext.
* @param context the context
* @return the table schema
* @throws IOException if HCatInputFormat.setInput has not been called
* for the current context
*/
public static HCatSchema getTableSchema(JobContext context)
throws IOException;
HCatOutputFormat
HCatOutputFormat 与 MapReduce 作业一起使用,可将数据写入 HCatalogManagement 的表。
HCatOutputFormat 公开了用于将数据写入表的 Hadoop 0.20 MapReduce API。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
API
HCatOutputFormat 公开的 API 如下所示。这包括:
setOutput
setSchema
getTableSchema
HCatOutputFormat 上的第一个调用必须为setOutput
;其他任何调用都将引发异常,指出输出格式未初始化。 setSchema
方法指定要写出的数据的模式。您必须调用此方法,以提供要写入的数据模式。如果您的数据与表架构具有相同的架构,则可以使用HCatOutputFormat.getTableSchema()
获取表架构,然后将其传递给setSchema()
。
/**
* Set the information about the output to write for the job. This queries the metadata
* server to find the StorageHandler to use for the table. It throws an error if the
* partition is already published.
* @param job the job object
* @param outputJobInfo the table output information for the job
* @throws IOException the exception in communicating with the metadata server
*/
@SuppressWarnings("unchecked")
public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;
/**
* Set the schema for the data being written out to the partition. The
* table schema is used by default for the partition if this is not called.
* @param job the job object
* @param schema the schema for the data
* @throws IOException
*/
public static void setSchema(final Job job, final HCatSchema schema) throws IOException;
/**
* Get the table schema for the table specified in the HCatOutputFormat.setOutput call
* on the specified job context.
* @param context the context
* @return the table schema
* @throws IOException if HCatOutputFormat.setOutput has not been called
* for the passed context
*/
public static HCatSchema getTableSchema(JobContext context) throws IOException;
HCatRecord
HCatRecord 是支持在 HCatalog 表中存储值的类型。
HCatalog 表架构中的类型确定为 HCatRecord 中的不同字段返回的对象的类型。下表显示了 MapReduce 程序的 Java 类与 HCatalog 数据类型之间的 Map:
HCatalog 数据类型 | MapReduce 中的 Java 类 | Values |
---|---|---|
TINYINT | java.lang.Byte | -128 至 127 |
SMALLINT | java.lang.Short | -(2 ^ 15)至(2 ^ 15)-1,即-32,768 至 32,767 |
INT | java.lang.Integer | -(2 ^ 31)至(2 ^ 31)-1,即-2,147,483,648 至 2,147,483,647 |
BIGINT | java.lang.Long | -(2 ^ 63)至(2 ^ 63)-1,即-9,223,372,036,854,775,808 至 9,223,372,036,854,775,807 |
BOOLEAN | java.lang.Boolean | 对或错 |
FLOAT | java.lang.Float | 单精度浮点值 |
DOUBLE | java.lang.Double | 双精度浮点值 |
DECIMAL | java.math.BigDecimal | 精确的浮点值,精度为 38 位 |
BINARY | byte[] | binary data |
STRING | java.lang.String | character string |
STRUCT | java.util.List | structured data |
ARRAY | java.util.List | 一种数据类型的值 |
MAP | java.util.Map | key-value pairs |
有关 Hive 数据类型的一般信息,请参见Hive 数据类型和Type System。
使用 HCatalog 运行 MapReduce
您的 MapReduce 程序需要告知 Thrift 服务器在哪里。最简单的方法是将该位置作为参数传递给 Java 程序。您还需要通过-libjars 参数将 Hive 和 HCatalog jar 传递给 MapReduce。
export HADOOP_HOME=<path_to_hadoop_install>
export HCAT_HOME=<path_to_hcat_install>
export HIVE_HOME=<path_to_hive_install>
export LIB_JARS=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar,
$HIVE_HOME/lib/hive-metastore-0.10.0.jar,
$HIVE_HOME/lib/libthrift-0.7.0.jar,
$HIVE_HOME/lib/hive-exec-0.10.0.jar,
$HIVE_HOME/lib/libfb303-0.7.0.jar,
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar,
$HIVE_HOME/lib/slf4j-api-1.6.1.jar
export HADOOP_CLASSPATH=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar:
$HIVE_HOME/lib/hive-metastore-0.10.0.jar:
$HIVE_HOME/lib/libthrift-0.7.0.jar:
$HIVE_HOME/lib/hive-exec-0.10.0.jar:
$HIVE_HOME/lib/libfb303-0.7.0.jar:
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar:
$HIVE_HOME/conf:$HADOOP_HOME/conf:
$HIVE_HOME/lib/slf4j-api-1.6.1.jar
$HADOOP_HOME/bin/hadoop --config $HADOOP_HOME/conf jar <path_to_jar>
<main_class> -libjars $LIB_JARS <program_arguments>
这种方法有效,但是每次运行 MapReduce 程序时 Hadoop 都会附带 libjars,将文件视为不同的缓存条目,这样效率不高,并且可能会耗尽 Hadoop 分布式缓存。
相反,您可以优化以使用 HDFS 位置运送 libjar。这样,Hadoop 将重用分布式缓存中的条目。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar
# (Other statements remain the same.)
Authentication
如果失败导致在/tmp/
*<username> * /hive.log
中显示“ 2010-11-03 16:17:28,225 WARN hive.metastore ...-无法将元存储与 URI thrift:// ...”连接的消息,则确保已运行“ kinit
*<username> * @FOO.COM
”以获取 Kerberos 票证并能够向 HCatalog 服务器进行身份验证。
Read Example
下面的非常简单的 MapReduce 程序从一个表中读取数据,该程序假定第二列中有一个整数(“列 1”),并计算找到的每个不同值的实例数。也就是说,它等效于“从 col1 的$ table group 中选择 col1,count(*)”。
例如,如果第二列中的值为\ { 1,1,1,3,3,5
},程序将生成以下值和计数的输出:
1, 3
3, 2
5, 1
public class GroupByAge extends Configured implements Tool {
public static class Map extends
Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> {
int age;
@Override
protected void map(
WritableComparable key,
HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
IntWritable, IntWritable>.Context context)
throws IOException, InterruptedException {
age = (Integer) value.get(1);
context.write(new IntWritable(age), new IntWritable(1));
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord> {
@Override
protected void reduce(
IntWritable key,
java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord>.Context context)
throws IOException, InterruptedException {
int sum = 0;
Iterator<IntWritable> iter = values.iterator();
while (iter.hasNext()) {
sum++;
iter.next();
}
HCatRecord record = new DefaultHCatRecord(2);
record.set(0, key.get());
record.set(1, sum);
context.write(null, record);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String inputTableName = args[0];
String outputTableName = args[1];
String dbName = null;
Job job = new Job(conf, "GroupByAge");
HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
inputTableName, null));
// initialize HCatOutputFormat
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(GroupByAge.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName,
outputTableName, null));
HCatSchema s = HCatOutputFormat.getTableSchema(job);
System.err.println("INFO: output schema explicitly set for writing:"
+ s);
HCatOutputFormat.setSchema(job, s);
job.setOutputFormatClass(HCatOutputFormat.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new GroupByAge(), args);
System.exit(exitCode);
}
}
请注意有关此程序的许多重要事项:
Map 的实现将 HCatRecord 作为 Importing,而 Reduce 的实现将其作为输出。
该示例程序假定 Importing 的模式,但是它也可以通过
HCatOutputFormat.getOutputSchema()
检索该模式,并根据该调用的结果检索字段。通过调用
InputJobInfo.create
创建要读取的表的 ImportingDescriptors。它需要数据库名称,表名称和分区过滤器。在此示例中,分区过滤器为 null,因此将读取表的所有分区。要写入的表的输出 Descriptors 是通过调用
OutputJobInfo.create
创建的。它需要数据库名称,表名称以及分区键的 Map 和描述正在写入的分区的值。在此示例中,假定该表是未分区的,因此此 Map 为 null。
要仅扫描表的选定分区,可以将描述所需分区的过滤器传递给InputJobInfo.create
。要扫描单个分区,过滤器字符串应类似于:“ ds=20120401
”,其中日期戳“ ds
”是分区列名称,而“ 20120401
”是您要读取的值(年,月和日)。
Filter Operators
过滤器可以包含运算符'and','or','like','(()','=',' <>'(不等于),' <', '>',' <=' and '> ='。
For example:
ds > "20110924"
ds < "20110925"
ds <= "20110925" and ds >= "20110924"
Scan Filter
例如,假设您有一个 web_logs 表,该表由“ ds
”列进行了分区。您可以通过更改选择表的一个分区
HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
to
HCatInputFormat.setInput(job,
InputJobInfo.create(dbName, inputTableName, "ds=\"20110924\""));
该过滤器必须仅引用分区列。其他列中的值将导致作业失败。
Write Filter
要写入单个分区,您可以更改上面的示例,使其具有一个键值对 Map,用于描述该分区的所有分区键和值。在我们的示例 web_logs 表中,只有一个分区列(ds
),因此我们的 Map 将只有一个条目。更改
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
to
Map partitions = new HashMap<String, String>(1);
partitions.put("ds", "20110924");
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, partitions));
要同时写入多个分区,可以将 Map 保留为空,但是所有分区列都必须存在于所写入的数据中。
Navigation Links
一般:HCatalog Manual – WebHCat Manual – Hive Wiki 主页 – Hive 项目 site