This is a JAR file which can be included in the command rhmr
to
access HBase databases.
The FileInputFormats are specific to Mozilla’s Socorro Crash Report Hbase file structure and Telemetry schema.
The class RHHBaseGeneral
should work with raw keys and general
values. The returned value is a list of lists. Each sublist
entry has two elements (as is usual in RHIPE) - the key and value.
The value (the second entry) has the names attribute equal to
columnfamily:columnname
. The values are raw bytes (and if the raw
bytes correspond to Java strings, they can be converted using the R
function rawToChar
). The keys are also raw bytes.
Read from Hbase (keys and values will be raw bytes), specifying start and stop rows,column families and their qualifiers.
Download the pre-build jar files from here:[1] Note, this jar files requires hbase-0.90.6. HBase classes MUST be present in classpath when your hadoop job runs.
You need the code described at the end of this file. *** To read data from Hbase table
sample.code <- rhmap({
## r is a list with entries called "data:json" and "meta:json"
## both entries are raw bytes
## k is rowkey and is also rawbytes
})
result <- rhwatch(map=sample.code
,reduce=0 # For example
,input=hbaseif(table="telemetry" # pre-created table
,colspec=c("data:json","meta:json") # column family, qualifier
,rows=list("20120601","20120602") # row names as strings, otherwise must be raw
,jars="/user/sguha/share/rhipecrashreport.jar"
,zooinfo=list(zookeeper.znode.parent=A # look in your hbase conf files for these
hbase.zookeeper.quorum=B)
))
*** To write to a pre-made Hbase table
sample.code <- rhmap({
a <- list("data:json"=A,"meta:json"=B)
rhcollect(rowkey, a)
## the values A and B will be serialized using RHIPE serialization
## same for rowkey
})
result <- rhwatch(map=sample.code
,reduce=0
,output=hbaseif(table="telemetry"
,jars="/user/sguha/share/rhipecrashreport.jar"
,zooinfo=list(zookeeper.znode.parent=A
hbase.zookeeper.quorum=B)
))
##' @param table A string indicating tablename, If this used as an output, tablename must preexist, it wont be created
##' @param colspec Required for when this is called as input. Colspec is of the form c("family:label","family1:label1","family2:label2",...)
##' @param rows a list of two elements that has rowkey1, rowkey2. They can either be strings or raw bytes.
##' @param caching leave as default or see Java source for more explanation
##' @param cacheBlocks leave as default or see Java source for more explanation
##' @param autoReduceDetect if you have rhbase installed it will try and determine how many reduces to use. Needed only for output
##' @param jars location of rhipecrashreport.jar
##' @param zooinfo required and must be a list with entries of zookeeper.znode.parent and hbase.zookeeper.quorum
hbasefmt <- function(table, colspec=NULL, rows=NULL,caching=3L, cacheBlocks=TRUE,autoReduceDetect=FALSE, jars="/user/sguha/share/rhipecrashreport.jar"
,zooinfo){
makeRaw <- function(a){
a <- if(is.character(a)) charToRaw(a)
if(!is.raw(a)) stop("rows must be raw")
J("org.apache.commons.codec.binary.Base64")$encodeBase64String(.jbyte( a ))
}
table <- eval(table); colspec <- eval(colspec);rows <- eval(rows);cacheBlocks <- eval(cacheBlocks)
autoReduceDetect <- eval(autoReduceDetect)
caching <- eval(caching)
hbaseJars <- list.files(Sys.getenv("HBASE_HOME"),pattern="jar$",full.names=TRUE,rec=TRUE)
hbaseConf <- list.files(sprintf("%s/conf",Sys.getenv("HBASE_HOME"),pattern="-site.xml$",full.names=TRUE,rec=TRUE))
.jaddClassPath(c(hbaseJars,hbaseConf))
function(mapred,direction, callers){
if(direction=="output"){
if(is.null(table)) stop("Please provide a pre-made table")
if(autoReduceDetect){
hb <- rb.init()
tba <- rb.table.connect(hb,table)
mapred$mapred.reduce.tasks <- min(mapred$mapred.reduce.tasks,tba$table$getRegionsInfo()$size(),na.rm=TRUE)
}
mapred$hbase.mapred.outputtable <- as.character(table[1])
mapred$zookeeper.znode.parent <- zooinfo$"zookeeper.znode.parent"
mapred$hbase.zookeeper.quorum <- zooinfo$"hbase.zookeeper.quorum"
mapred$rhipe_outputformat_class = 'org.godhuli.rhipe.hbase.RHTableOutputFormat'
mapred$rhipe_outputformat_keyclass <- 'org.godhuli.rhipe.RHBytesWritable'
mapred$rhipe_outputformat_valueclass <- 'org.godhuli.rhipe.RHBytesWritable'
mapred$rhipe_map_output_keyclass <- 'org.godhuli.rhipe.RHBytesWritable'
mapred$rhipe_map_output_valueclass <- 'org.godhuli.rhipe.RHBytesWritable'
mapred$jarfiles <- jars
mapred
}else{
if(is.null(table)) stop("Please provide table type e.g. crash_reports or telemetry")
mapred$rhipe.hbase.tablename <- as.character(table[1])
mapred$rhipe.hbase.colspec <- paste(colspec,collapse=",")
if(!is.null(rows)){
mapred$rhipe.hbase.rowlim.start <- makeRaw(rows[[1]])
mapred$rhipe.hbase.rowlim.end <- makeRaw(rows[[2]])
}
mapred$rhipe.hbase.mozilla.cacheblocks <- sprintf("%s:%s",as.integer(caching),as.integer(cacheBlocks))
mapred$zookeeper.znode.parent <- zooinfo$"zookeeper.znode.parent"
mapred$hbase.zookeeper.quorum <- zooinfo$"hbase.zookeeper.quorum"
message(sprintf("Using %s table", table))
if(!table %in% c("telemetry","crash_reports")){
mapred$rhipe_inputformat_class = 'org.godhuli.rhipe.hbase.RHHBaseGeneral'
}else{
if(table=="crash_reports"){
mapred$rhipe.hbase.dateformat <- "yyMMdd"
}else if(table =="telemetry"){
mapred$rhipe.hbase.dateformat <- "yyyyMMdd"
}
mapred$rhipe.hbase.mozilla.prefix <- if (table=="telemetry") "byteprefix" else "hexprefix"
mapred$rhipe_inputformat_class <- 'org.godhuli.rhipe.hbase.RHCrashReportTableInputFormat'
}
## mapred$hbase.client.scanner.caching <- 100L
mapred$rhipe_inputformat_keyclass <- 'org.godhuli.rhipe.RHBytesWritable'
mapred$rhipe_inputformat_valueclass <- 'org.godhuli.rhipe.hbase.RHResult'
mapred$jarfiles <- jars
mapred
}
}
}