#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"
. "${this_dir}/config.sh"

pushd "${this_dir}"
[ -f "${JAR_PATH}" ] || ./build.sh
SCHEMA_FILE_LOCAL=schemas/user.avsc
SCHEMA_FILE_HDFS=user.avsc

# --- create input ---
CSV_INPUT=$(mktemp -d)
INPUT=$(basename ${CSV_INPUT})
PARQUETS_DIR=parquets
N=20
for i in 1 2; do
    ${PYTHON} py/create_input.py ${N} "${CSV_INPUT}/users_${i}.csv"
done

# --- convert to avro-parquet ---
${HADOOP} fs -mkdir -p /user/"${USER}"
${HADOOP} fs -rm -r /user/"${USER}"/"${PARQUETS_DIR}" || :
${HADOOP} fs -rm -r "${INPUT}" || :
${HADOOP} fs -put "${CSV_INPUT}" "${INPUT}"

${HADOOP} fs -put -f "${SCHEMA_FILE_LOCAL}" "${SCHEMA_FILE_HDFS}"
export HADOOP_CLASSPATH=$(<"${CP_PATH}")
${HADOOP} jar "${JAR_PATH}" it.crs4.pydoop.WriteParquet \
    -libjars="${HADOOP_CLASSPATH//:/,}" \
    "${INPUT}" "${PARQUETS_DIR}" "${SCHEMA_FILE_HDFS}"

# --- run color count ---
MODULE=avro_value_in
MPY=py/"${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"
USER_SCHEMA=$(cat "${SCHEMA_FILE_LOCAL}")
INPUT_FORMAT=org.apache.parquet.avro.AvroParquetInputFormat

INPUT="${PARQUETS_DIR}"
OUTPUT=results

${HADOOP} fs -rm -r /user/"${USER}"/"${OUTPUT}" || :

${PYDOOP} submit --upload-file-to-cache "${MPY}" \
  --upload-file-to-cache py/avro_base.py \
  --num-reducers 1 \
  --input-format "${INPUT_FORMAT}" \
  --avro-input v \
  --libjars "${JAR_PATH},${HADOOP_CLASSPATH//:/,}" \
  --log-level "${LOGLEVEL}" \
  --job-name "${JOBNAME}" \
  "${MODULE}" "${PARQUETS_DIR}" "${OUTPUT}"

# --- check results ---
rm -rf "${OUTPUT}"
${HADOOP} fs -get /user/"${USER}"/"${OUTPUT}"
${PYTHON} py/check_results.py "${CSV_INPUT}" "${OUTPUT}"

rm -rf "${CSV_INPUT}" "${OUTPUT}"
popd
