#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"

module="wordcount_minimal"
module_path="${this_dir}/../pydoop_submit/mr/${module}.py"
job_name="input_format_test_job"
jar_name="pydoop-input-formats.jar"

wd=$(mktemp -d)
javac -cp $(${HADOOP} classpath) -d "${wd}" it/crs4/pydoop/mapred*/*.java
jar cvf "${wd}/${jar_name}" -C "${wd}" it
opts=(
    "--upload-file-to-cache" "${module_path}"
    "--entry-point" "main"
    "--input-format" "it.crs4.pydoop.mapreduce.TextInputFormat"
    "--libjars" "${wd}/${jar_name}"
    "-D" "pydoop.input.issplitable=true"
    "-D" "mapreduce.job.name=${job_name}"
    "-D" "mapreduce.task.timeout=10000"
)
[ -n "${DEBUG:-}" ] && opts+=( "--log-level" "DEBUG" )

if [ "$(hadoop_fs)" != "file" ]; then
    ensure_dfs_home
    input="input"
    output="output"
    ${HDFS} dfs -rm -r -f "${input}" "${output}"
    ${HDFS} dfs -put "${this_dir}/../input" "${input}"
else
    input="${this_dir}/../input"
    output="${wd}/output"
fi

${PYDOOP} submit "${opts[@]}" ${module} "${input}" "${output}"
${PYTHON} "${this_dir}"/check_results.py "${this_dir}/../input" "${output}"

rm -rf "${wd}"
