#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

# This example shows how to use Hadoop's SequenceFile input and output formats
# with Pydoop. First we run a word count on the input, storing counts as
# 32-bit integers in Hadoop SequenceFiles; next we run a MapReduce application
# that filters out those words whose count falls below a specified threshold.

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"

local_input="${this_dir}/../input"
occurrence_threshold=10
opts=(
    "--python-program" "${PYTHON}"
    "-D" "mapreduce.task.timeout=10000"
)
[ -n "${DEBUG:-}" ] && OPTS+=( "--log-level" "DEBUG" )

run_wc() {
    local input=$1
    local output=$2
    local opts=( "${opts[@]}" )
    opts+=(
	"--job-name" "wordcount"
	"--num-reducers" "2"
	"--output-format" "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
	"--upload-file-to-cache" "${this_dir}/bin/wordcount.py"
	"-D" "mapreduce.output.fileoutputformat.compress.type=NONE"
    )
    ${PYDOOP} submit "${opts[@]}" wordcount "${input}" "${output}"
}

run_filter() {
    local input=$1
    local output=$2
    local opts=( "${opts[@]}" )
    opts+=(
	"--job-name" "filter"
	"--num-reducers" "0"
	"--input-format" "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
	"--upload-file-to-cache" "${this_dir}/bin/filter.py"
	"-D" "filter.occurrence.threshold=${occurrence_threshold}"
    )
    ${PYDOOP} submit "${opts[@]}" filter "${input}" "${output}"
}

wd=$(mktemp -d)
if [ "$(hadoop_fs)" != "file" ]; then
    ensure_dfs_home
    input="input"
    wc_output="wc_output"
    filter_output="filter_output"
    ${HDFS} dfs -rm -r -f "${input}" "${wc_output}" "${filter_output}"
    ${HDFS} dfs -put "${local_input}" "${input}"
else
    input="${local_input}"
    wc_output="${wd}/wc_output"
    filter_output="${wd}/filter_output"
fi

run_wc "${input}" "${wc_output}"
run_filter "${wc_output}" "${filter_output}"
${PYTHON} "${this_dir}/check.py" "${local_input}" "${filter_output}" -t ${occurrence_threshold}

rm -rf "${wd}"
