#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"
. "${this_dir}/config.sh"

nargs=1
if [ $# -ne ${nargs} ]; then
    die "Usage: $0 k|v|kv"
fi
mode=$1
if [ "${mode}" == "k" ]; then
    MODULE=avro_key_in_out
elif [ "${mode}" == "v" ]; then
    MODULE=avro_value_in_out
elif [ "${mode}" == "kv" ]; then
    MODULE=avro_key_value_in_out
else
    die "invalid mode: ${mode}"
fi

pushd "${this_dir}"
USER_SCHEMA_FILE=schemas/user.avsc
PET_SCHEMA_FILE=schemas/pet.avsc
STATS_SCHEMA_FILE=schemas/stats.avsc
STATS_SCHEMA=$(cat "${STATS_SCHEMA_FILE}")
CSV_INPUT=$(mktemp -d)
LOCAL_INPUT=$(mktemp -d)
INPUT=$(basename ${LOCAL_INPUT})
OUTPUT=results

# --- generate avro input ---
N=20
for i in 1 2; do
    ${PYTHON} py/create_input.py ${N} "${CSV_INPUT}/users_${i}.csv"
done
if [ "${mode}" == "kv" ]; then
    for i in 1 2; do
	./write_avro_kv "${USER_SCHEMA_FILE}" "${PET_SCHEMA_FILE}" \
          "${CSV_INPUT}/users_${i}.csv" "${LOCAL_INPUT}/users_${i}.avro"
    done
else
    for i in 1 2; do
	${PYTHON} py/write_avro.py "${USER_SCHEMA_FILE}" \
          "${CSV_INPUT}/users_${i}.csv" "${LOCAL_INPUT}/users_${i}.avro"
    done
fi
${HADOOP} fs -mkdir -p /user/"${USER}"
${HADOOP} fs -rm -r "${INPUT}" || :
${HADOOP} fs -put "${LOCAL_INPUT}" "${INPUT}"

# --- run cc ---
MPY=py/"${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"

# put the following opts at the end of the command line
# or the empty string will be parsed as the module arg
if [ "${mode}" == "k" ]; then
    K_SCHEMA_OPT="-D pydoop.mapreduce.avro.key.output.schema=${STATS_SCHEMA}"
    V_SCHEMA_OPT=""
elif [ "${mode}" == "v" ]; then
    K_SCHEMA_OPT=""
    V_SCHEMA_OPT="-D pydoop.mapreduce.avro.value.output.schema=${STATS_SCHEMA}"
else
    K_SCHEMA_OPT="-D pydoop.mapreduce.avro.key.output.schema=${STATS_SCHEMA}"
    V_SCHEMA_OPT="-D pydoop.mapreduce.avro.value.output.schema=${STATS_SCHEMA}"
fi

${HADOOP} fs -rm -r "/user/${USER}/${OUTPUT}" || :

${PYDOOP} submit \
    --upload-file-to-cache py/avro_base.py \
    --upload-file-to-cache "${MPY}" \
    --num-reducers 1 \
    --avro-input "${mode}" \
    --avro-output "${mode}" \
    --log-level "${LOGLEVEL}" \
    --job-name "${JOBNAME}" \
    "${MODULE}" "${INPUT}" "${OUTPUT}" \
    "${K_SCHEMA_OPT}" "${V_SCHEMA_OPT}"

# --- dump & check results ---
DUMP_DIR=$(mktemp -d)
rm -rf "${OUTPUT}"
${HADOOP} fs -get "${OUTPUT}"
for f in "${OUTPUT}"/part*; do
    ${PYTHON} py/avro_container_dump_results.py \
      "${f}" "${DUMP_DIR}"/$(basename ${f}).tsv "${mode}"
done
${PYTHON} py/check_results.py "${CSV_INPUT}" "${DUMP_DIR}"

rm -rf "${CSV_INPUT}" "${LOCAL_INPUT}" "${OUTPUT}" "${DUMP_DIR}"
popd
