#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"

pushd "${this_dir}"
USER_SCHEMA_FILE=schemas/user.avsc
STATS_SCHEMA_FILE=schemas/stats.avsc
CSV_INPUT=$(mktemp -d)
LOCAL_INPUT=$(mktemp -d)
INPUT=$(basename ${LOCAL_INPUT})
OUTPUT=results

# --- generate avro input ---
N=20
for i in 1 2; do
    ${PYTHON} py/create_input.py ${N} "${CSV_INPUT}/users_${i}.csv"
    ${PYTHON} py/write_avro.py "${USER_SCHEMA_FILE}" \
      "${CSV_INPUT}/users_${i}.csv" "${LOCAL_INPUT}/users_${i}.avro"
done

# --- run cc ---
MODULE=avro_pyrw
MPY=py/"${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"

${HADOOP} fs -mkdir -p /user/"${USER}"
${HADOOP} fs -rm "${INPUT}" || :
${HADOOP} fs -put "${LOCAL_INPUT}" "${INPUT}"
${HADOOP} fs -rm -r "/user/${USER}/${OUTPUT}" || :
${PYDOOP} submit \
    --upload-file-to-cache "${MPY}" \
    --upload-file-to-cache "${USER_SCHEMA_FILE}" \
    --upload-file-to-cache "${STATS_SCHEMA_FILE}" \
    --num-reducers 1 \
    --do-not-use-java-record-reader \
    --do-not-use-java-record-writer \
    --log-level "${LOGLEVEL}" \
    --job-name "${JOBNAME}" \
    "${MODULE}" "${INPUT}" "${OUTPUT}"

# --- dump & check results ---
DUMP_DIR=$(mktemp -d)
rm -rf "${OUTPUT}"
${HADOOP} fs -get "${OUTPUT}"
for f in "${OUTPUT}"/part*; do
    ${PYTHON} py/avro_container_dump_results.py \
      "${f}" "${DUMP_DIR}"/$(basename ${f}).tsv
done
${PYTHON} py/check_results.py "${CSV_INPUT}" "${DUMP_DIR}"

rm -rf "${CSV_INPUT}" "${LOCAL_INPUT}" "${OUTPUT}" "${DUMP_DIR}"
popd
