Notification

从 0.2 版开始,HCatalog 为系统中发生的某些事件提供通知。这样,诸如 Oozie 之类的应用程序可以 await 这些事件并安排依赖于这些事件的工作。当前版本的 HCatalog 支持两种事件:

  • 添加新分区时的通知

  • 添加一组分区时的通知

添加新分区后,无需执行其他工作即可发送通知:现有的addPartition呼叫将发送通知消息。

通知新分区

要接收有关已添加新分区的通知,您需要执行以下三个步骤。

  • 要开始接收消息,请创建到消息总线的连接,如下所示:
ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl);
Connection conn = connFac.createConnection();
conn.start();
  • 订阅您感兴趣的主题。订阅消息总线时,您需要订阅特定主题,以接收在该主题上传递的消息。

  • 与特定表相对应的主题名称存储在表属性中,可以使用以下代码进行检索:

HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
String topicName = msc.getTable("mydb",
                   "myTbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
  • 使用主题名称来订阅主题,如下所示:
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Destination hcatTopic = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(hcatTopic);
consumer.setMessageListener(this);
  • 要开始接收消息,您需要实现 JMS 接口MessageListener,进而使您实现方法onMessage(Message msg)。每当新消息到达消息总线时,将调用此方法。该消息包含一个代表相应分区的分区对象,可以按如下所示进行检索:
@Override
public void onMessage(Message msg) {
  // We are interested in only add_partition events on this table.
  // So, check message type first.
  if(msg.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
       Object obj = (((ObjectMessage)msg).getObject());
  }
}

您需要在 Classpath 中有一个 JMS jar 才能完成此工作。另外,您需要在 Classpath 中有一个 JMS 提供程序的 jar。 HCatalog 已通过 ActiveMQ 作为 JMS 提供程序进行了测试,尽管可以使用任何 JMS 提供程序。可以从http://activemq.apache.org/activemq-550-release.html获得 ActiveMQ。

一组分区的通知

有时您需要等到分区集合完成后才能 continue 执行其他操作。例如,您可能要在一天的所有分区都完成后开始处理。但是,HCatalog 没有分区集合或层次结构的概念。为了支持这一点,HCatalog 允许数据编写者在完成编写分区集合时发出 signal。数据读取器可以在开始读取之前 await 此 signal。

下面的示例代码说明了添加一组分区后如何发送通知。

为了表示 signal,数据写入器执行以下操作:

HiveMetaStoreClient msc = new HiveMetaStoreClient(conf);

// Create a map, specifying partition key names and values
Map<String,String> partMap = new HashMap<String, String>();
partMap.put("date","20110711");
partMap.put("country","*");

// Mark the partition as "done"
msc.markPartitionForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);

要接收此通知,Consumer 需要执行以下操作:

  • above重复步骤 1 和 2,以构建与通知系统的连接并订阅主题。

  • 收到此示例中所示的通知:

HiveMetaStoreClient msc = new HiveMetaStoreClient(conf);

// Create a map, specifying partition key names and values
Map<String,String> partMap = new HashMap<String, String>();
partMap.put("date","20110711");
partMap.put("country","*");

// Mark the partition as "done"
msc.markPartitionForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);

如果使用者已经在消息总线上注册并且当前处于活动状态,则一旦生产者将分区标记为“完成”,它将从消息总线获得回调。替代地,Consumer 可以从元存储库明确请求特定分区。以下代码从使用者的角度说明了用法:

// Enquire to metastore whether a particular partition has been marked or not.
boolean marked = msc.isPartitionMarkedForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);

// Or register to a message bus and get asynchronous callback.
ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl);
Connection conn = connFac.createConnection();
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Destination hcatTopic = session.createTopic(topic);
MessageConsumer consumer = session.createConsumer(hcatTopic);
consumer.setMessageListener(this);

public void onMessage(Message msg) {

                                
  MapMessage mapMsg = (MapMessage)msg;
  Enumeration<String> keys = mapMsg.getMapNames();
  
  // Enumerate over all keys. This will print key-value pairs specifying the  
  // particular partition 44which was marked done. In this case, it will print:
  // date : 20110711
  // country: *

  while(keys.hasMoreElements()){
    String key = keys.nextElement();
    System.out.println(key + " : " + mapMsg.getString(key));
  }
  System.out.println("Message: "+msg);

Server Configuration

要启用通知,您需要配置服务器(请参阅下文)。

要禁用通知,您需要将hive.metastore.event.listeners留空或将其从hive-site.xml中删除。

启用 JMS 通知

您需要对 HCatalog 服务器的hive-site.xml文件进行(添加/修改)以下更改以打开通知。

<property>
<name>hive.metastore.event.expiry.duration</name>
<value>300L</value>
<description>Duration after which events expire from events table (in seconds)</description>
</property>

<property>
<name>hive.metastore.event.clean.freq</name>
<value>360L</value>
<description>Frequency at which timer task runs to purge expired events in metastore (in seconds).</description>
</property>

<property>
<name>msgbus.brokerurl</name>
<value>tcp://localhost:61616</value>
<description></description>
</property>

<property>
<name>msgbus.username</name>
<value></value>
<description></description>
</property>

<property>
<name>msgbus.password</name>
<value></value>
<description></description>
</property>

为了使服务器开始支持通知,必须在 Classpath 中包含以下内容:

(a)activemq个罐子

(b)jndi.properties文件,其属性已适当配置为用于通知

然后,请遵循以下准则来设置您的环境:

  • HCatalog 服务器启动脚本为* $ YOUR_HCAT_SERVER * /share/hcatalog/scripts/hcat_server_start.sh

  • 该脚本期望 Classpath 由 AUX_CLASSPATH 环境变量设置。

  • 因此,将 AUX_CLASSPATH 设置为满足上面的(a)和(b)。

  • jndi.properties文件位于* $ YOUR_HCAT_SERVER * /etc/hcatalog/jndi.properties

  • 您需要取消 Comments 并在jndi.properties文件中设置以下属性:

  • java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

  • java.naming.provider.url = tcp://localhost:61616(这是您设置中的 ActiveMQ URL.)

Topic Names

如果在为服务器配置通知时创建表,则默认主题名称将自动设置为表属性。要将通知与先前创建的表一起使用(在其他 HCatalog 安装中或在当前安装中启用通知之前),则必须手动设置主题名称。例如:

  • $ YOUR_HCAT_CLIENT_HOME * /bin/hcat -e "ALTER TABLE access SET hcat.msgbus.topic.name=$TOPIC_NAME"

然后,您需要配置 ActiveMQ 使用者以侦听有关您在$ TOPIC_NAME 中给出的主题的消息。一个好的默认策略是TOPIC_NAME = "$database.$table"(即 Literals 点)。

Navigation Links

Previous: Dynamic Partitioning
Next: 基于存储的授权

一般:HCatalog ManualWebHCat ManualHive Wiki 主页Hive 项目 site