#!/usr/bin/env bash

# BEGIN_COPYRIGHT
#
# Copyright 2009-2019 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

set -euo pipefail
[ -n "${DEBUG:-}" ] && set -x
this="${BASH_SOURCE-$0}"
this_dir=$(cd -P -- "$(dirname -- "${this}")" && pwd -P)
. "${this_dir}/../config.sh"
. "${this_dir}/config.sh"

pushd "${this_dir}"
[ -f "${JAR_PATH}" ] || ./build.sh
MODULE=kmer_count
MPY=py/"${MODULE}".py
JOBNAME="${MODULE}"-job
LOGLEVEL="DEBUG"
INPUT_FORMAT=org.apache.parquet.avro.AvroParquetInputFormat
PROJECTION=$(cat schemas/alignment_record_proj.avsc)
LOCAL_INPUT=$(mktemp -d)
INPUT=$(basename ${LOCAL_INPUT})
OUTPUT=kmer_count

for i in 1 2; do
    cp data/mini_aligned_seqs.gz.parquet ${LOCAL_INPUT}/seqs_${i}.gz.parquet
done

${HADOOP} fs -mkdir -p /user/"${USER}"
${HADOOP} fs -rm -r "${INPUT}" || :
${HADOOP} fs -put "${LOCAL_INPUT}" "${INPUT}"
${HADOOP} fs -rm -r /user/"${USER}"/"${OUTPUT}" || :

HADOOP_CLASSPATH=$(<"${CP_PATH}")
${PYDOOP} submit \
     -D parquet.avro.projection="${PROJECTION}" \
    --upload-file-to-cache "${MPY}" \
    --num-reducers 1 \
    --input-format "${INPUT_FORMAT}" \
    --avro-input v \
    --libjars "${JAR_PATH},${HADOOP_CLASSPATH//:/,}" \
    --log-level "${LOGLEVEL}" \
    --job-name "${JOBNAME}" \
    "${MODULE}" "${INPUT}" "${OUTPUT}"

rm -rf "${OUTPUT}"
${HADOOP} fs -get /user/"${USER}"/"${OUTPUT}"
${PYTHON} py/show_kmer_count.py "${OUTPUT}"/part-r-00000

rm -rf "${OUTPUT}" "${LOCAL_INPUT}"
popd
