Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added usablility of HBaseTap to be used as a Sink + Bugfix #14

Merged
merged 2 commits into from
Jul 10, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/jvm/com/twitter/maple/hbase/HBaseScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -145,9 +146,10 @@ public String[] getFamilyNames() {
familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName));
}
} else {
for (String familyName : familyNames) { familyNameSet.add(hbaseColumn(familyName)); }
for (String familyName : familyNames) {
familyNameSet.add(familyName);
}
}

return familyNameSet.toArray(new String[0]);
}

Expand Down Expand Up @@ -204,22 +206,19 @@ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputColl
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
OutputCollector outputCollector = sinkCall.getOutput();
Tuple key = tupleEntry.selectTuple(keyField);

byte[] keyBytes = Bytes.toBytes(key.getString(0));
Put put = new Put(keyBytes);

for (int i = 0; i < valueFields.length; i++) {
Fields fieldSelector = valueFields[i];
TupleEntry values = tupleEntry.selectEntry(fieldSelector);

for (int j = 0; j < values.getFields().size(); j++) {
Fields fields = values.getFields();
Tuple tuple = values.getTuple();

String value = tuple.getString(j);

byte[] asBytes = value == null ? null : Bytes.toBytes(value);

put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), asBytes);
}
}
Expand Down
132 changes: 87 additions & 45 deletions src/jvm/com/twitter/maple/hbase/HBaseTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
Expand All @@ -31,15 +34,18 @@
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.UUID;

/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with the {@HBaseFullScheme}
* to allow for the reading and writing of data to and from a HBase cluster.
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
* the {@HBaseFullScheme} to allow for the reading and writing
* of data to and from a HBase cluster.
*/
public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
/** Field LOG */
private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class);

private final String id = UUID.randomUUID().toString();

/** Field SCHEME */
Expand All @@ -55,9 +61,11 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
*
* @param tableName
* of type String
* @param HBaseFullScheme
* of type HBaseFullScheme
*/
public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) {
super(HBaseFullScheme, SinkMode.UPDATE);
Expand All @@ -66,10 +74,13 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) {

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
* @param sinkMode of type SinkMode
*
* @param tableName
* of type String
* @param HBaseFullScheme
* of type HBaseFullScheme
* @param sinkMode
* of type SinkMode
*/
public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) {
super(HBaseFullScheme, sinkMode);
Expand All @@ -78,9 +89,11 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
*
* @param tableName
* of type String
* @param HBaseFullScheme
* of type HBaseFullScheme
*/
public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) {
super(HBaseFullScheme, SinkMode.UPDATE);
Expand All @@ -90,21 +103,23 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
* @param tableName of type String
* @param HBaseFullScheme of type HBaseFullScheme
* @param sinkMode of type SinkMode
*
* @param tableName
* of type String
* @param HBaseFullScheme
* of type HBaseFullScheme
* @param sinkMode
* of type SinkMode
*/
public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme,
SinkMode sinkMode) {
public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) {
super(HBaseFullScheme, sinkMode);
this.quorumNames = quorumNames;
this.tableName = tableName;
}

/**
* Method getTableName returns the tableName of this HBaseTap object.
*
*
* @return the tableName (type String) of this HBaseTap object.
*/
public String getTableName() {
Expand All @@ -115,10 +130,10 @@ public Path getPath() {
return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
}

private HBaseAdmin getHBaseAdmin(JobConf conf)
throws MasterNotRunningException, ZooKeeperConnectionException {
private HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
if (hBaseAdmin == null) {
hBaseAdmin = new HBaseAdmin(HBaseConfiguration.create());
Configuration hbaseConf = HBaseConfiguration.create(conf);
hBaseAdmin = new HBaseAdmin(hbaseConf);
}

return hBaseAdmin;
Expand All @@ -127,41 +142,56 @@ private HBaseAdmin getHBaseAdmin(JobConf conf)
@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
conf.set("hbase.zookeeper.quorum", quorumNames);

LOG.debug("sinking to table: {}", tableName);

if (isReplace() && conf.get("mapred.task.partition") == null) {
try {
deleteResource(conf);

} catch (IOException e) {
throw new RuntimeException("could not delete resource: " + e);
}
}

else if (isUpdate()) {
try {
createResource(conf);
} catch (IOException e) {
throw new RuntimeException(tableName + " does not exist !");
}

}

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
super.sinkConfInit(process, conf);
}

@Override public String getIdentifier() {
@Override
public String getIdentifier() {
return id;
}

@Override public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess,
RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
@Override
public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
}

@Override public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess,
OutputCollector outputCollector) throws IOException {
throw new NotImplementedException();
@Override
public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this );
hBaseCollector.prepare();
return hBaseCollector;
}

@Override public boolean createResource(JobConf jobConf) throws IOException {
@Override
public boolean createResource(JobConf jobConf) throws IOException {
HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);

if (hBaseAdmin.tableExists(tableName)) {
return true;
}

if (hBaseAdmin.tableExists(tableName)) { return true; }

LOG.debug("creating hbase table: {}", tableName);
LOG.info("creating hbase table: {}", tableName);

HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

Expand All @@ -176,41 +206,53 @@ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
return true;
}

@Override public boolean deleteResource(JobConf jobConf) throws IOException {
@Override
public boolean deleteResource(JobConf jobConf) throws IOException {
// eventually keep table meta-data to source table create
HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);

if (!hBaseAdmin.tableExists(tableName)) { return true; }

LOG.debug("deleting hbase table: {}", tableName);
if (!hBaseAdmin.tableExists(tableName)) {
return true;
}

LOG.info("deleting hbase table: {}", tableName);

hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);

return true;
}

@Override public boolean resourceExists(JobConf jobConf) throws IOException {
@Override
public boolean resourceExists(JobConf jobConf) throws IOException {
return getHBaseAdmin(jobConf).tableExists(tableName);
}

@Override public long getModifiedTime(JobConf jobConf) throws IOException {
return System.currentTimeMillis(); // currently unable to find last mod time on a table
@Override
public long getModifiedTime(JobConf jobConf) throws IOException {
return System.currentTimeMillis(); // currently unable to find last mod time
// on a table
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
conf.set("hbase.zookeeper.quorum", quorumNames);
LOG.debug("sourcing from table: {}", tableName);

FileInputFormat.addInputPaths(conf, tableName);
super.sourceConfInit(process, conf);
}

@Override
public boolean equals(Object object) {
if (this == object) { return true; }
if (object == null || getClass() != object.getClass()) { return false; }
if (!super.equals(object)) { return false; }
if (this == object) {
return true;
}
if (object == null || getClass() != object.getClass()) {
return false;
}
if (!super.equals(object)) {
return false;
}

HBaseTap hBaseTap = (HBaseTap) object;

Expand Down
Loading