From 146904aecc4502c3692e35971bf9a506128b691b Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Jul 2012 11:13:39 +0300 Subject: [PATCH 1/2] Fixes related to JobConf inclusion. Support for using HBase as a sink --- .../com/twitter/maple/hbase/HBaseScheme.java | 11 +- src/jvm/com/twitter/maple/hbase/HBaseTap.java | 132 ++++++++++++------ 2 files changed, 92 insertions(+), 51 deletions(-) diff --git a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java index 4b8be88..e6db79b 100644 --- a/src/jvm/com/twitter/maple/hbase/HBaseScheme.java +++ b/src/jvm/com/twitter/maple/hbase/HBaseScheme.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,9 +146,10 @@ public String[] getFamilyNames() { familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName)); } } else { - for (String familyName : familyNames) { familyNameSet.add(hbaseColumn(familyName)); } + for (String familyName : familyNames) { + familyNameSet.add(familyName); + } } - return familyNameSet.toArray(new String[0]); } @@ -204,22 +206,19 @@ public void sink(FlowProcess flowProcess, SinkCall { /** Field LOG */ private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class); + private final String id = UUID.randomUUID().toString(); /** Field SCHEME */ @@ -55,9 +61,11 @@ public class HBaseTap extends Tap { /** * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName of type String - * @param HBaseFullScheme of type HBaseFullScheme + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme */ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) { super(HBaseFullScheme, SinkMode.UPDATE); @@ -66,10 +74,13 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) { /** * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName of type String - * @param HBaseFullScheme of type HBaseFullScheme - * @param sinkMode of type SinkMode + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode */ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { super(HBaseFullScheme, sinkMode); @@ -78,9 +89,11 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode /** * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName of type String - * @param HBaseFullScheme of type HBaseFullScheme + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme */ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) { super(HBaseFullScheme, SinkMode.UPDATE); @@ -90,13 +103,15 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem /** * Constructor HBaseTap creates a new HBaseTap instance. - * - * @param tableName of type String - * @param HBaseFullScheme of type HBaseFullScheme - * @param sinkMode of type SinkMode + * + * @param tableName + * of type String + * @param HBaseFullScheme + * of type HBaseFullScheme + * @param sinkMode + * of type SinkMode */ - public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, - SinkMode sinkMode) { + public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { super(HBaseFullScheme, sinkMode); this.quorumNames = quorumNames; this.tableName = tableName; @@ -104,7 +119,7 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem /** * Method getTableName returns the tableName of this HBaseTap object. - * + * * @return the tableName (type String) of this HBaseTap object. */ public String getTableName() { @@ -115,10 +130,10 @@ public Path getPath() { return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); } - private HBaseAdmin getHBaseAdmin(JobConf conf) - throws MasterNotRunningException, ZooKeeperConnectionException { + private HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { if (hBaseAdmin == null) { - hBaseAdmin = new HBaseAdmin(HBaseConfiguration.create()); + Configuration hbaseConf = HBaseConfiguration.create(conf); + hBaseAdmin = new HBaseAdmin(hbaseConf); } return hBaseAdmin; @@ -127,41 +142,56 @@ private HBaseAdmin getHBaseAdmin(JobConf conf) @Override public void sinkConfInit(FlowProcess process, JobConf conf) { conf.set("hbase.zookeeper.quorum", quorumNames); - LOG.debug("sinking to table: {}", tableName); if (isReplace() && conf.get("mapred.task.partition") == null) { try { deleteResource(conf); + } catch (IOException e) { throw new RuntimeException("could not delete resource: " + e); } } + + else if (isUpdate()) { + try { + createResource(conf); + } catch (IOException e) { + throw new RuntimeException(tableName + " does not exist !"); + } + + } conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); super.sinkConfInit(process, conf); } - @Override public String getIdentifier() { + @Override + public String getIdentifier() { return id; } - @Override public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, - RecordReader recordReader) throws IOException { - return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); + @Override + public TupleEntryIterator openForRead(FlowProcess jobConfFlowProcess, RecordReader recordReader) throws IOException { + return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); } - @Override public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, - OutputCollector outputCollector) throws IOException { - throw new NotImplementedException(); + @Override + public TupleEntryCollector openForWrite(FlowProcess jobConfFlowProcess, OutputCollector outputCollector) throws IOException { + HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); + hBaseCollector.prepare(); + return hBaseCollector; } - @Override public boolean createResource(JobConf jobConf) throws IOException { + @Override + public boolean createResource(JobConf jobConf) throws IOException { HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + + if (hBaseAdmin.tableExists(tableName)) { + return true; + } - if (hBaseAdmin.tableExists(tableName)) { return true; } - - LOG.debug("creating hbase table: {}", tableName); + LOG.info("creating hbase table: {}", tableName); HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -176,13 +206,16 @@ public void sinkConfInit(FlowProcess process, JobConf conf) { return true; } - @Override public boolean deleteResource(JobConf jobConf) throws IOException { + @Override + public boolean deleteResource(JobConf jobConf) throws IOException { // eventually keep table meta-data to source table create HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); - if (!hBaseAdmin.tableExists(tableName)) { return true; } - - LOG.debug("deleting hbase table: {}", tableName); + if (!hBaseAdmin.tableExists(tableName)) { + return true; + } + + LOG.info("deleting hbase table: {}", tableName); hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); @@ -190,27 +223,36 @@ public void sinkConfInit(FlowProcess process, JobConf conf) { return true; } - @Override public boolean resourceExists(JobConf jobConf) throws IOException { + @Override + public boolean resourceExists(JobConf jobConf) throws IOException { return getHBaseAdmin(jobConf).tableExists(tableName); } - @Override public long getModifiedTime(JobConf jobConf) throws IOException { - return System.currentTimeMillis(); // currently unable to find last mod time on a table + @Override + public long getModifiedTime(JobConf jobConf) throws IOException { + return System.currentTimeMillis(); // currently unable to find last mod time + // on a table } @Override public void sourceConfInit(FlowProcess process, JobConf conf) { + conf.set("hbase.zookeeper.quorum", quorumNames); LOG.debug("sourcing from table: {}", tableName); - FileInputFormat.addInputPaths(conf, tableName); super.sourceConfInit(process, conf); } @Override public boolean equals(Object object) { - if (this == object) { return true; } - if (object == null || getClass() != object.getClass()) { return false; } - if (!super.equals(object)) { return false; } + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } HBaseTap hBaseTap = (HBaseTap) object; From eae03fb28178b6857af9ca804bbff27908bb14ba Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Jul 2012 11:36:24 +0300 Subject: [PATCH 2/2] Added HBaseTapCollector which is an exact (grrrrrrrrrrr ...) copy of the JDBC tap collector; The class facilitates (oddly) the use of HBaseTap as a sink --- .../maple/hbase/HBaseTapCollector.java | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 src/jvm/com/twitter/maple/hbase/HBaseTapCollector.java diff --git a/src/jvm/com/twitter/maple/hbase/HBaseTapCollector.java b/src/jvm/com/twitter/maple/hbase/HBaseTapCollector.java new file mode 100644 index 0000000..65f6ab9 --- /dev/null +++ b/src/jvm/com/twitter/maple/hbase/HBaseTapCollector.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.maple.hbase; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class HBaseTapCollector is a kind of + * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the + * resource managed by a particular {@link HBaseTap} instance. + */ +public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { + /** Field LOG */ + private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class); + /** Field conf */ + private final JobConf conf; + /** Field writer */ + private RecordWriter writer; + /** Field flowProcess */ + private final FlowProcess hadoopFlowProcess; + /** Field tap */ + private final Tap tap; + /** Field reporter */ + private final Reporter reporter = Reporter.NULL; + + /** + * Constructor TapCollector creates a new TapCollector instance. + * + * @param flowProcess + * @param tap + * of type Tap + * @throws IOException + * when fails to initialize + */ + public HBaseTapCollector(FlowProcess flowProcess, Tap tap) throws IOException { + super(flowProcess, tap.getScheme()); + this.hadoopFlowProcess = flowProcess; + this.tap = tap; + this.conf = new JobConf(flowProcess.getConfigCopy()); + this.setOutput(this); + } + + @Override + public void prepare() { + try { + initialize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + super.prepare(); + } + + private void initialize() throws IOException { + tap.sinkConfInit(hadoopFlowProcess, conf); + OutputFormat outputFormat = conf.getOutputFormat(); + LOG.info("Output format class is: " + outputFormat.getClass().toString()); + writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL); + sinkCall.setOutput(this); + } + + @Override + public void close() { + try { + LOG.info("closing tap collector for: {}", tap); + writer.close(reporter); + } catch (IOException exception) { + LOG.warn("exception closing: {}", exception); + throw new TapException("exception closing HBaseTapCollector", exception); + } finally { + super.close(); + } + } + + /** + * Method collect writes the given values to the {@link Tap} this instance + * encapsulates. + * + * @param writableComparable + * of type WritableComparable + * @param writable + * of type Writable + * @throws IOException + * when + */ + public void collect(Object writableComparable, Object writable) throws IOException { + if (hadoopFlowProcess instanceof HadoopFlowProcess) + ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + + writer.write(writableComparable, writable); + } +}