hive_stream {hive} | R Documentation |
High-level R function for using Hadoop Streaming.
hive_stream( mapper, reducer, input, output, henv = hive(), mapper_args = NULL, reducer_args = NULL, cmdenv_arg = NULL, streaming_args = NULL)
mapper |
a function which is executed on each worker node. The so-called mapper typically maps input key/value pairs to a set of intermediate key/value pairs. |
reducer |
a function which is executed on each worker node. The so-called reducer reduces a set of intermediate values which share a key to a smaller set of values. If no reducer is used leave empty. |
input |
specifies the directory holding the data in the DFS. |
output |
specifies the output directory in the DFS containing the results after the streaming job finished. |
henv |
Hadoop local environment. |
mapper_args |
additional arguments to the mapper. |
reducer_args |
additional arguments to the reducer. |
cmdenv_arg |
additional arguments passed as environment variables to distributed tasks. |
streaming_args |
additional arguments passed to the Hadoop
Streaming utility. By default, only the number of reducers will be
set using |
The function hive_stream()
starts a MapReduce job on the given
data located on the HDFS.
Stefan Theussl
Apache Hadoop Streaming (https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html).
## A simple word count example ## Put some xml files on the HDFS: ## Not run: DFS_put( system.file("defaults/core/", package = "hive"), "/tmp/input" ) ## End(Not run) ## Not run: DFS_put( system.file("defaults/hdfs/hdfs-default.xml", package = "hive"), "/tmp/input" ) ## End(Not run) ## Not run: DFS_put( system.file("defaults/mapred/mapred-default.xml", package = "hive"), "/tmp/input" ) ## End(Not run) ## Define the mapper and reducer function to be applied: ## Note that a Hadoop map or reduce job retrieves data line by line from stdin. ## Not run: mapper <- function(x){ con <- file( "stdin", open = "r" ) while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) { terms <- unlist(strsplit(line, " ")) terms <- terms[nchar(terms) > 1 ] if( length(terms) ) cat( paste(terms, 1, sep = "\t"), sep = "\n") } } reducer <- function(x){ env <- new.env( hash = TRUE ) con <- file( "stdin", open = "r" ) while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) { keyvalue <- unlist( strsplit(line, "\t") ) if( exists(keyvalue[1], envir = env, inherits = FALSE) ){ assign( keyvalue[1], get(keyvalue[1], envir = env) + as.integer(keyvalue[2]), envir = env ) } else { assign( keyvalue[1], as.integer(keyvalue[2]), envir = env ) } } env <- as.list(env) for( term in names(env) ) writeLines( paste(c(term, env[[term]]), collapse ="\t") ) } hive_set_nreducer(1) hive_stream( mapper = mapper, reducer = reducer, input = "/tmp/input", output = "/tmp/output" ) DFS_list("/tmp/output") head( DFS_read_lines("/tmp/output/part-00000") ) ## End(Not run) ## Don't forget to clean file system ## Not run: DFS_dir_remove("/tmp/input") ## Not run: DFS_dir_remove("/tmp/output")