Importing 和输出接口

Set Up

HCatInputFormat 和 HCatOutputFormat 接口不需要特定于 HCatalog 的设置。

注意 :HCatalog 不是线程安全的。

HCatInputFormat

HCatInputFormat 与 MapReduce 作业一起使用,以从 HCatalogManagement 的表中读取数据。

HCatInputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像已将其发布到表一样。

API

HCatInputFormat 公开的 API 如下所示。这包括:

  • setInput

  • setOutputSchema

  • getTableSchema

要使用 HCatInputFormat 读取数据,请首先使用正在读取的表中的必要信息实例化一个InputJobInfo,然后使用InputJobInfo调用setInput

您可以使用setOutputSchema方法包括一个投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。

您可以使用getTableSchema方法来确定指定 Importing 表的表架构。

/**
   * Set the input to use for the Job. This queries the metadata server with
   * the specified partition predicates, gets the matching partitions, puts
   * the information in the conf object. The inputInfo object is updated with
   * information needed in the client context
   * @param job the job object
   * @param inputJobInfo the input info for table to read
   * @throws IOException the exception in communicating with the metadata server
   */
  public static void setInput(Job job,
      InputJobInfo inputJobInfo) throws IOException;

  /**
   * Set the schema for the HCatRecord data returned by HCatInputFormat.
   * @param job the job object
   * @param hcatSchema the schema to use as the consolidated schema
   */
  public static void setOutputSchema(Job job,HCatSchema hcatSchema)
    throws IOException;

  /**
   * Get the HCatTable schema for the table specified in the HCatInputFormat.setInput
   * call on the specified job context. This information is available only after
   * HCatInputFormat.setInput has been called for a JobContext.
   * @param context the context
   * @return the table schema
   * @throws IOException if HCatInputFormat.setInput has not been called
   *                     for the current context
   */
  public static HCatSchema getTableSchema(JobContext context)
    throws IOException;

HCatOutputFormat

HCatOutputFormat 与 MapReduce 作业一起使用,可将数据写入 HCatalogManagement 的表。

HCatOutputFormat 公开了用于将数据写入表的 Hadoop 0.20 MapReduce API。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。

API

HCatOutputFormat 公开的 API 如下所示。这包括:

  • setOutput

  • setSchema

  • getTableSchema

HCatOutputFormat 上的第一个调用必须为setOutput;其他任何调用都将引发异常,指出输出格式未初始化。 setSchema方法指定要写出的数据的模式。您必须调用此方法,以提供要写入的数据模式。如果您的数据与表架构具有相同的架构,则可以使用HCatOutputFormat.getTableSchema()获取表架构,然后将其传递给setSchema()

/**
   * Set the information about the output to write for the job. This queries the metadata
   * server to find the StorageHandler to use for the table. It throws an error if the
   * partition is already published.
   * @param job the job object
   * @param outputJobInfo the table output information for the job
   * @throws IOException the exception in communicating with the metadata server
   */
  @SuppressWarnings("unchecked")
  public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;

  /**
   * Set the schema for the data being written out to the partition. The
   * table schema is used by default for the partition if this is not called.
   * @param job the job object
   * @param schema the schema for the data
   * @throws IOException
   */
  public static void setSchema(final Job job, final HCatSchema schema) throws IOException;

  /**
   * Get the table schema for the table specified in the HCatOutputFormat.setOutput call
   * on the specified job context.
   * @param context the context
   * @return the table schema
   * @throws IOException if HCatOutputFormat.setOutput has not been called
   *                     for the passed context
   */
  public static HCatSchema getTableSchema(JobContext context) throws IOException;

HCatRecord

HCatRecord 是支持在 HCatalog 表中存储值的类型。

HCatalog 表架构中的类型确定为 HCatRecord 中的不同字段返回的对象的类型。下表显示了 MapReduce 程序的 Java 类与 HCatalog 数据类型之间的 Map:

HCatalog 数据类型MapReduce 中的 Java 类Values
TINYINTjava.lang.Byte-128 至 127
SMALLINTjava.lang.Short-(2 ^ 15)至(2 ^ 15)-1,即-32,768 至 32,767
INTjava.lang.Integer-(2 ^ 31)至(2 ^ 31)-1,即-2,147,483,648 至 2,147,483,647
BIGINTjava.lang.Long-(2 ^ 63)至(2 ^ 63)-1,即-9,223,372,036,854,775,808 至 9,223,372,036,854,775,807
BOOLEANjava.lang.Boolean对或错
FLOATjava.lang.Float单精度浮点值
DOUBLEjava.lang.Double双精度浮点值
DECIMALjava.math.BigDecimal精确的浮点值,精度为 38 位
BINARYbyte[]binary data
STRINGjava.lang.Stringcharacter string
STRUCTjava.util.Liststructured data
ARRAYjava.util.List一种数据类型的值
MAPjava.util.Mapkey-value pairs

有关 Hive 数据类型的一般信息,请参见Hive 数据类型Type System

使用 HCatalog 运行 MapReduce

您的 MapReduce 程序需要告知 Thrift 服务器在哪里。最简单的方法是将该位置作为参数传递给 Java 程序。您还需要通过-libjars 参数将 Hive 和 HCatalog jar 传递给 MapReduce。

export HADOOP_HOME=<path_to_hadoop_install>
export HCAT_HOME=<path_to_hcat_install>
export HIVE_HOME=<path_to_hive_install>
export LIB_JARS=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar,
$HIVE_HOME/lib/hive-metastore-0.10.0.jar,
$HIVE_HOME/lib/libthrift-0.7.0.jar,
$HIVE_HOME/lib/hive-exec-0.10.0.jar,
$HIVE_HOME/lib/libfb303-0.7.0.jar,
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar,
$HIVE_HOME/lib/slf4j-api-1.6.1.jar

export HADOOP_CLASSPATH=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar:
$HIVE_HOME/lib/hive-metastore-0.10.0.jar:
$HIVE_HOME/lib/libthrift-0.7.0.jar:
$HIVE_HOME/lib/hive-exec-0.10.0.jar:
$HIVE_HOME/lib/libfb303-0.7.0.jar:
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar:
$HIVE_HOME/conf:$HADOOP_HOME/conf:
$HIVE_HOME/lib/slf4j-api-1.6.1.jar

$HADOOP_HOME/bin/hadoop --config $HADOOP_HOME/conf jar <path_to_jar>
<main_class> -libjars $LIB_JARS <program_arguments>

这种方法有效,但是每次运行 MapReduce 程序时 Hadoop 都会附带 libjars,将文件视为不同的缓存条目,这样效率不高,并且可能会耗尽 Hadoop 分布式缓存。

相反,您可以优化以使用 HDFS 位置运送 libjar。这样,Hadoop 将重用分布式缓存中的条目。

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

# (Other statements remain the same.)

Authentication

如果失败导致在/tmp/ *<username> * /hive.log中显示“ 2010-11-03 16:17:28,225 WARN hive.metastore ...-无法将元存储与 URI thrift:// ...”连接的消息,则确保已运行“ kinit *<username> * @FOO.COM”以获取 Kerberos 票证并能够向 HCatalog 服务器进行身份验证。

Read Example

下面的非常简单的 MapReduce 程序从一个表中读取数据,该程序假定第二列中有一个整数(“列 1”),并计算找到的每个不同值的实例数。也就是说,它等效于“从 col1 的$ table group 中选择 col1,count(*)”。

例如,如果第二列中的值为\ { 1,1,1,3,3,5 },程序将生成以下值和计数的输出:

1, 3
3, 2
5, 1

public class GroupByAge extends Configured implements Tool {

    public static class Map extends
            Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> {

        int age;

        @Override
        protected void map(
                WritableComparable key,
                HCatRecord value,
                org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                        IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            age = (Integer) value.get(1);
            context.write(new IntWritable(age), new IntWritable(1));
        }
    }

    public static class Reduce extends Reducer<IntWritable, IntWritable,
    WritableComparable, HCatRecord> {

      @Override
      protected void reduce(
              IntWritable key,
              java.lang.Iterable<IntWritable> values,
              org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
                      WritableComparable, HCatRecord>.Context context)
              throws IOException, InterruptedException {
          int sum = 0;
          Iterator<IntWritable> iter = values.iterator();
          while (iter.hasNext()) {
              sum++;
              iter.next();
          }
          HCatRecord record = new DefaultHCatRecord(2);
          record.set(0, key.get());
          record.set(1, sum);

          context.write(null, record);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        String inputTableName = args[0];
        String outputTableName = args[1];
        String dbName = null;

        Job job = new Job(conf, "GroupByAge");
        HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
                inputTableName, null));
        // initialize HCatOutputFormat

        job.setInputFormatClass(HCatInputFormat.class);
        job.setJarByClass(GroupByAge.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName,
                outputTableName, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job);
        System.err.println("INFO: output schema explicitly set for writing:"
                + s);
        HCatOutputFormat.setSchema(job, s);
        job.setOutputFormatClass(HCatOutputFormat.class);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new GroupByAge(), args);
        System.exit(exitCode);
    }
}

请注意有关此程序的许多重要事项:

  • Map 的实现将 HCatRecord 作为 Importing,而 Reduce 的实现将其作为输出。

  • 该示例程序假定 Importing 的模式,但是它也可以通过HCatOutputFormat.getOutputSchema()检索该模式,并根据该调用的结果检索字段。

  • 通过调用InputJobInfo.create创建要读取的表的 ImportingDescriptors。它需要数据库名称,表名称和分区过滤器。在此示例中,分区过滤器为 null,因此将读取表的所有分区。

  • 要写入的表的输出 Descriptors 是通过调用OutputJobInfo.create创建的。它需要数据库名称,表名称以及分区键的 Map 和描述正在写入的分区的值。在此示例中,假定该表是未分区的,因此此 Map 为 null。

要仅扫描表的选定分区,可以将描述所需分区的过滤器传递给InputJobInfo.create。要扫描单个分区,过滤器字符串应类似于:“ ds=20120401”,其中日期戳“ ds”是分区列名称,而“ 20120401”是您要读取的值(年,月和日)。

Filter Operators

过滤器可以包含运算符'and','or','like','(()','=',' <>'(不等于),' <', '>',' <=' and '> ='。

For example:

  • ds > "20110924"

  • ds < "20110925"

  • ds <= "20110925" and ds >= "20110924"

Scan Filter

例如,假设您有一个 web_logs 表,该表由“ ds”列进行了分区。您可以通过更改选择表的一个分区

HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

to

HCatInputFormat.setInput(job,
                         InputJobInfo.create(dbName, inputTableName, "ds=\"20110924\""));

该过滤器必须仅引用分区列。其他列中的值将导致作业失败。

Write Filter

要写入单个分区,您可以更改上面的示例,使其具有一个键值对 Map,用于描述该分区的所有分区键和值。在我们的示例 web_logs 表中,只有一个分区列(ds),因此我们的 Map 将只有一个条目。更改

HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));

to

Map partitions = new HashMap<String, String>(1);
partitions.put("ds", "20110924");
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, partitions));

要同时写入多个分区,可以将 Map 保留为空,但是所有分区列都必须存在于所写入的数据中。

Navigation Links

Previous: 加载和存储接口
Next: 读写器接口

一般:HCatalog ManualWebHCat ManualHive Wiki 主页Hive 项目 site