// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package plogotlp

import (
	"context"
	"errors"
	"net"
	"sync"
	"testing"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/resolver"
	"google.golang.org/grpc/status"
	"google.golang.org/grpc/test/bufconn"

	"go.opentelemetry.io/collector/pdata/plog"
)

func TestGrpc(t *testing.T) {
	lis := bufconn.Listen(1024 * 1024)
	s := grpc.NewServer()
	RegisterGRPCServer(s, &fakeLogsServer{t: t})
	wg := sync.WaitGroup{}
	wg.Go(func() {
		assert.NoError(t, s.Serve(lis))
	})
	t.Cleanup(func() {
		s.Stop()
		wg.Wait()
	})

	resolver.SetDefaultScheme("passthrough")
	cc, err := grpc.NewClient("bufnet",
		grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
			return lis.Dial()
		}),
		grpc.WithTransportCredentials(insecure.NewCredentials()))
	require.NoError(t, err)
	t.Cleanup(func() {
		assert.NoError(t, cc.Close())
	})

	logClient := NewGRPCClient(cc)

	resp, err := logClient.Export(context.Background(), generateLogsRequest())
	require.NoError(t, err)
	assert.Equal(t, NewExportResponse(), resp)
}

func TestGrpcError(t *testing.T) {
	lis := bufconn.Listen(1024 * 1024)
	s := grpc.NewServer()
	RegisterGRPCServer(s, &fakeLogsServer{t: t, err: errors.New("my error")})
	wg := sync.WaitGroup{}
	wg.Go(func() {
		assert.NoError(t, s.Serve(lis))
	})
	t.Cleanup(func() {
		s.Stop()
		wg.Wait()
	})

	cc, err := grpc.NewClient("bufnet",
		grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
			return lis.Dial()
		}),
		grpc.WithTransportCredentials(insecure.NewCredentials()))
	require.NoError(t, err)
	t.Cleanup(func() {
		assert.NoError(t, cc.Close())
	})

	logClient := NewGRPCClient(cc)
	resp, err := logClient.Export(context.Background(), generateLogsRequest())
	require.Error(t, err)
	st, okSt := status.FromError(err)
	require.True(t, okSt)
	assert.Equal(t, "my error", st.Message())
	assert.Equal(t, codes.Unknown, st.Code())
	assert.Equal(t, ExportResponse{}, resp)
}

type fakeLogsServer struct {
	UnimplementedGRPCServer
	t   *testing.T
	err error
}

func (f fakeLogsServer) Export(_ context.Context, request ExportRequest) (ExportResponse, error) {
	assert.Equal(f.t, generateLogsRequest(), request)
	return NewExportResponse(), f.err
}

func generateLogsRequest() ExportRequest {
	ld := plog.NewLogs()
	ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test_log_record")
	return NewExportRequestFromLogs(ld)
}
