/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.operator;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.api.context.MemoryUpdate;
import org.apache.flink.agents.api.listener.EventListener;
import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerFactory;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
import org.apache.flink.agents.runtime.operator.ActionTask;
import org.apache.flink.agents.runtime.operator.JavaActionTask;
import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.python.operator.PythonActionTask;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
import org.apache.flink.agents.runtime.utils.EventUtil;
import org.apache.flink.agents.runtime.utils.StateUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActionExecutionOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ActionExecutionOperator.class);
    private static final String RECOVERY_MARKER_STATE_NAME = "recoveryMarker";
    private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME = "messageSequenceNumber";
    private static final String PENDING_INPUT_EVENT_STATE_NAME = "pendingInputEvents";
    private final AgentPlan agentPlan;
    private final Boolean inputIsJava;
    private transient StreamRecord<OUT> reusedStreamRecord;
    private transient MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState;
    private transient PythonActionExecutor pythonActionExecutor;
    private transient FlinkAgentsMetricGroupImpl metricGroup;
    private transient BuiltInMetrics builtInMetrics;
    private transient SegmentedQueue keySegmentQueue;
    private final transient MailboxExecutor mailboxExecutor;
    private transient MailboxProcessor mailboxProcessor;
    private transient ListState<ActionTask> actionTasksKState;
    private transient ListState<Event> pendingInputEventsKState;
    private transient ListState<Object> currentProcessingKeysOpState;
    private final transient EventLogger eventLogger;
    private final transient List<EventListener> eventListeners;
    private transient ActionStateStore actionStateStore;
    private transient ValueState<Long> sequenceNumberKState;
    private transient ListState<Object> recoveryMarkerOpState;
    private transient Map<Long, Map<Object, Long>> checkpointIdToSeqNums;
    private final transient Map<ActionTask, RunnerContextImpl> actionTaskRunnerContexts;

    public ActionExecutionOperator(AgentPlan agentPlan, Boolean inputIsJava, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, ActionStateStore actionStateStore) {
        this.agentPlan = agentPlan;
        this.inputIsJava = inputIsJava;
        this.processingTimeService = processingTimeService;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.mailboxExecutor = mailboxExecutor;
        this.eventLogger = EventLoggerFactory.createLogger(EventLoggerConfig.builder().build());
        this.eventListeners = new ArrayList<EventListener>();
        this.actionStateStore = actionStateStore;
        this.checkpointIdToSeqNums = new HashMap<Long, Map<Object, Long>>();
        this.actionTaskRunnerContexts = new HashMap<ActionTask, RunnerContextImpl>();
    }

    public void open() throws Exception {
        super.open();
        this.reusedStreamRecord = new StreamRecord(null);
        MapStateDescriptor shortTermMemStateDescriptor = new MapStateDescriptor("shortTermMemory", TypeInformation.of(String.class), TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
        this.shortTermMemState = this.getRuntimeContext().getMapState(shortTermMemStateDescriptor);
        this.metricGroup = new FlinkAgentsMetricGroupImpl((MetricGroup)this.getMetricGroup());
        this.builtInMetrics = new BuiltInMetrics(this.metricGroup, this.agentPlan);
        this.keySegmentQueue = new SegmentedQueue();
        if (this.actionStateStore == null && ActionStateStore.BackendType.KAFKA.getType().equalsIgnoreCase(this.agentPlan.getConfig().get(AgentConfigOptions.ACTION_STATE_STORE_BACKEND))) {
            LOG.info("Using Kafka as backend of action state store.");
            this.actionStateStore = new KafkaActionStateStore(this.agentPlan.getConfig());
        }
        if (this.actionStateStore != null) {
            this.recoveryMarkerOpState = this.getOperatorStateBackend().getUnionListState(new ListStateDescriptor(RECOVERY_MARKER_STATE_NAME, TypeInformation.of(Object.class)));
        }
        this.sequenceNumberKState = this.getRuntimeContext().getState(new ValueStateDescriptor(MESSAGE_SEQUENCE_NUMBER_STATE_NAME, Long.class));
        this.actionTasksKState = this.getRuntimeContext().getListState(new ListStateDescriptor("actionTasks", TypeInformation.of(ActionTask.class)));
        this.pendingInputEventsKState = this.getRuntimeContext().getListState(new ListStateDescriptor(PENDING_INPUT_EVENT_STATE_NAME, TypeInformation.of(Event.class)));
        this.currentProcessingKeysOpState = this.getOperatorStateBackend().getUnionListState(new ListStateDescriptor("currentProcessingKeys", TypeInformation.of(Object.class)));
        this.initPythonActionExecutor();
        this.mailboxProcessor = this.getMailboxProcessor();
        this.initEventLogger(this.getRuntimeContext());
        this.tryResumeProcessActionTasks();
    }

    private void initEventLogger(StreamingRuntimeContext runtimeContext) throws Exception {
        if (this.eventLogger == null) {
            return;
        }
        this.eventLogger.open(new EventLoggerOpenParams(runtimeContext));
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.keySegmentQueue.addWatermark(mark);
        this.processEligibleWatermarks();
    }

    public void processElement(StreamRecord<IN> record) throws Exception {
        Object input = record.getValue();
        LOG.debug("Receive an element {}", input);
        Event inputEvent = this.wrapToInputEvent(input);
        if (record.hasTimestamp()) {
            inputEvent.setSourceTimestamp(record.getTimestamp());
        }
        this.keySegmentQueue.addKeyToLastSegment(this.getCurrentKey());
        if (this.currentKeyHasMoreActionTask()) {
            this.pendingInputEventsKState.add((Object)inputEvent);
        } else {
            this.processEvent(this.getCurrentKey(), inputEvent);
        }
    }

    private void processEvent(Object key, Event event) throws Exception {
        this.notifyEventProcessed(event);
        boolean isInputEvent = EventUtil.isInputEvent(event);
        if (EventUtil.isOutputEvent(event)) {
            OUT outputData = this.getOutputFromOutputEvent(event);
            if (event.hasSourceTimestamp()) {
                this.output.collect((Object)this.reusedStreamRecord.replace(outputData, event.getSourceTimestamp().longValue()));
            } else {
                this.reusedStreamRecord.eraseTimestamp();
                this.output.collect((Object)this.reusedStreamRecord.replace(outputData));
            }
        } else {
            List<Action> triggerActions;
            if (isInputEvent) {
                this.currentProcessingKeysOpState.add(key);
                this.initOrIncSequenceNumber();
            }
            if ((triggerActions = this.getActionsTriggeredBy(event)) != null && !triggerActions.isEmpty()) {
                for (Action triggerAction : triggerActions) {
                    this.actionTasksKState.add((Object)this.createActionTask(key, triggerAction, event));
                }
            }
        }
        if (isInputEvent) {
            this.mailboxExecutor.submit(() -> this.tryProcessActionTaskForKey(key), "process action task");
        }
    }

    private void notifyEventProcessed(Event event) throws Exception {
        EventContext eventContext = new EventContext(event);
        if (this.eventLogger != null) {
            this.eventLogger.append(eventContext, event);
            this.eventLogger.flush();
        }
        if (this.eventListeners != null) {
            for (EventListener listener : this.eventListeners) {
                listener.onEventProcessed(eventContext, event);
            }
        }
        this.builtInMetrics.markEventProcessed();
    }

    private void tryProcessActionTaskForKey(Object key) {
        try {
            this.processActionTaskForKey(key);
        }
        catch (Exception e) {
            this.mailboxExecutor.execute(() -> ExceptionUtils.rethrow((Throwable)new ActionTaskExecutionException("Failed to execute action task", e)), "throw exception in mailbox");
        }
    }

    private void processActionTaskForKey(Object key) throws Exception {
        List<Event> outputEvents;
        boolean isFinished;
        this.setCurrentKey(key);
        ActionTask actionTask = StateUtil.pollFromListState(this.actionTasksKState);
        if (actionTask == null) {
            int removedCount = StateUtil.removeFromListState(this.currentProcessingKeysOpState, key);
            Preconditions.checkState((removedCount == 1 ? 1 : 0) != 0, (Object)("Current processing key count for key " + String.valueOf(key) + " should be 1, but got " + removedCount));
            Preconditions.checkState((boolean)this.keySegmentQueue.removeKey(key), (Object)("Current key" + String.valueOf(key) + " is missing from the segmentedQueue."));
            this.processEligibleWatermarks();
            return;
        }
        this.createAndSetRunnerContext(actionTask);
        long sequenceNumber = (Long)this.sequenceNumberKState.value();
        Optional<Object> generatedActionTaskOpt = Optional.empty();
        ActionState actionState = this.maybeGetActionState(key, sequenceNumber, actionTask.action, actionTask.event);
        if (actionState != null) {
            isFinished = true;
            outputEvents = actionState.getOutputEvents();
            for (MemoryUpdate memoryUpdate : actionState.getMemoryUpdates()) {
                actionTask.getRunnerContext().getShortTermMemory().set(memoryUpdate.getPath(), memoryUpdate.getValue());
            }
        } else {
            this.maybeInitActionState(key, sequenceNumber, actionTask.action, actionTask.event);
            ActionTask.ActionTaskResult actionTaskResult = actionTask.invoke();
            this.actionTaskRunnerContexts.remove(actionTask);
            this.maybePersistTaskResult(key, sequenceNumber, actionTask.action, actionTask.event, actionTask.getRunnerContext(), actionTaskResult);
            isFinished = actionTaskResult.isFinished();
            outputEvents = actionTaskResult.getOutputEvents();
            generatedActionTaskOpt = actionTaskResult.getGeneratedActionTask();
        }
        for (Event actionOutputEvent : outputEvents) {
            this.processEvent(key, actionOutputEvent);
        }
        boolean currentInputEventFinished = false;
        if (isFinished) {
            this.builtInMetrics.markActionExecuted(actionTask.action.getName());
            currentInputEventFinished = !this.currentKeyHasMoreActionTask();
            actionTask.getRunnerContext().persistMemory();
        } else {
            Preconditions.checkState((boolean)generatedActionTaskOpt.isPresent(), (Object)"ActionTask not finished, but the generated action task is null.");
            ActionTask generatedActionTask = (ActionTask)generatedActionTaskOpt.get();
            this.actionTaskRunnerContexts.put(generatedActionTask, actionTask.getRunnerContext());
            this.actionTasksKState.add((Object)generatedActionTask);
        }
        if (currentInputEventFinished) {
            int removedCount = StateUtil.removeFromListState(this.currentProcessingKeysOpState, key);
            this.maybePruneState(key, sequenceNumber);
            Preconditions.checkState((removedCount == 1 ? 1 : 0) != 0, (Object)("Current processing key count for key " + String.valueOf(key) + " should be 1, but got " + removedCount));
            Preconditions.checkState((boolean)this.keySegmentQueue.removeKey(key), (Object)("Current key" + String.valueOf(key) + " is missing from the segmentedQueue."));
            this.processEligibleWatermarks();
            Event pendingInputEvent = StateUtil.pollFromListState(this.pendingInputEventsKState);
            if (pendingInputEvent != null) {
                this.processEvent(key, pendingInputEvent);
            }
        } else if (this.currentKeyHasMoreActionTask()) {
            this.mailboxExecutor.submit(() -> this.tryProcessActionTaskForKey(key), "process action task");
        }
    }

    private void initPythonActionExecutor() throws Exception {
        boolean containPythonAction = this.agentPlan.getActions().values().stream().anyMatch(action -> action.getExec() instanceof PythonFunction);
        if (containPythonAction) {
            LOG.debug("Begin initialize PythonActionExecutor.");
            PythonDependencyInfo dependencyInfo = PythonDependencyInfo.create((ReadableConfig)this.getExecutionConfig().toConfiguration(), (DistributedCache)this.getRuntimeContext().getDistributedCache());
            PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(dependencyInfo, this.getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(), new HashMap<String, String>(System.getenv()), this.getRuntimeContext().getJobInfo().getJobId());
            this.pythonActionExecutor = new PythonActionExecutor(pythonEnvironmentManager, new ObjectMapper().writeValueAsString(this.agentPlan));
            this.pythonActionExecutor.open();
        }
    }

    public void endInput() throws Exception {
        this.waitInFlightEventsFinished();
    }

    @VisibleForTesting
    public void waitInFlightEventsFinished() throws Exception {
        while (StateUtil.listStateNotEmpty(this.currentProcessingKeysOpState)) {
            this.mailboxExecutor.yield();
        }
    }

    public void close() throws Exception {
        if (this.pythonActionExecutor != null) {
            this.pythonActionExecutor.close();
        }
        if (this.eventLogger != null) {
            this.eventLogger.close();
        }
        if (this.actionStateStore != null) {
            this.actionStateStore.close();
        }
        super.close();
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.actionStateStore != null) {
            ArrayList<Object> markers = new ArrayList<Object>();
            ListState recoveryMarkerOpState = this.getOperatorStateBackend().getUnionListState(new ListStateDescriptor(RECOVERY_MARKER_STATE_NAME, TypeInformation.of(Object.class)));
            Iterable recoveryMarkers = (Iterable)recoveryMarkerOpState.get();
            if (recoveryMarkers != null) {
                recoveryMarkers.forEach(markers::add);
            }
            this.actionStateStore.rebuildState(markers);
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        Object recoveryMarker;
        if (this.actionStateStore != null && (recoveryMarker = this.actionStateStore.getRecoveryMarker()) != null) {
            this.recoveryMarkerOpState.update(List.of(recoveryMarker));
        }
        HashMap keyToSeqNum = new HashMap();
        this.getKeyedStateBackend().applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor(MESSAGE_SEQUENCE_NUMBER_STATE_NAME, Long.class), (key, state) -> keyToSeqNum.put(key, (Long)state.value()));
        this.checkpointIdToSeqNums.put(context.getCheckpointId(), keyToSeqNum);
        super.snapshotState(context);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.actionStateStore != null) {
            Map keyToSeqNum = this.checkpointIdToSeqNums.getOrDefault(checkpointId, new HashMap());
            for (Map.Entry entry : keyToSeqNum.entrySet()) {
                this.actionStateStore.pruneState(entry.getKey(), (Long)entry.getValue());
            }
            this.checkpointIdToSeqNums.remove(checkpointId);
        }
        super.notifyCheckpointComplete(checkpointId);
    }

    private Event wrapToInputEvent(IN input) {
        if (this.inputIsJava.booleanValue()) {
            return new InputEvent(input);
        }
        Preconditions.checkState((input instanceof Row && ((Row)input).getArity() == 2 ? 1 : 0) != 0);
        return this.pythonActionExecutor.wrapToInputEvent(((Row)input).getField(1));
    }

    private OUT getOutputFromOutputEvent(Event event) {
        Preconditions.checkState((boolean)EventUtil.isOutputEvent(event));
        if (event instanceof OutputEvent) {
            return (OUT)((OutputEvent)event).getOutput();
        }
        if (event instanceof PythonEvent) {
            Object outputFromOutputEvent = this.pythonActionExecutor.getOutputFromOutputEvent(((PythonEvent)event).getEvent());
            return (OUT)outputFromOutputEvent;
        }
        throw new IllegalStateException("Unsupported event type: " + event.getClass().getName());
    }

    private List<Action> getActionsTriggeredBy(Event event) {
        if (event instanceof PythonEvent) {
            return this.agentPlan.getActionsTriggeredBy(((PythonEvent)event).getEventType());
        }
        return this.agentPlan.getActionsTriggeredBy(event.getClass().getName());
    }

    private MailboxProcessor getMailboxProcessor() throws Exception {
        Field field = MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor");
        field.setAccessible(true);
        return (MailboxProcessor)field.get(this.mailboxExecutor);
    }

    private void checkMailboxThread() {
        Preconditions.checkState((boolean)this.mailboxProcessor.isMailboxThread(), (Object)"Expected to be running on the task mailbox thread, but was not.");
    }

    private ActionTask createActionTask(Object key, Action action, Event event) {
        if (action.getExec() instanceof JavaFunction) {
            return new JavaActionTask(key, event, action);
        }
        if (action.getExec() instanceof PythonFunction) {
            return new PythonActionTask(key, event, action, this.pythonActionExecutor);
        }
        throw new IllegalStateException("Unsupported action type: " + String.valueOf(action.getExec().getClass()));
    }

    private void createAndSetRunnerContext(ActionTask actionTask) {
        RunnerContextImpl runnerContext;
        if (actionTask.getRunnerContext() != null) {
            return;
        }
        if (this.actionTaskRunnerContexts.containsKey(actionTask)) {
            runnerContext = this.actionTaskRunnerContexts.get(actionTask);
        } else if (actionTask.action.getExec() instanceof JavaFunction) {
            runnerContext = new RunnerContextImpl(new CachedMemoryStore(this.shortTermMemState), this.metricGroup, this::checkMailboxThread, this.agentPlan);
        } else if (actionTask.action.getExec() instanceof PythonFunction) {
            runnerContext = new PythonRunnerContextImpl(new CachedMemoryStore(this.shortTermMemState), this.metricGroup, this::checkMailboxThread, this.agentPlan);
        } else {
            throw new IllegalStateException("Unsupported action type: " + String.valueOf(actionTask.action.getExec().getClass()));
        }
        runnerContext.setActionName(actionTask.action.getName());
        actionTask.setRunnerContext(runnerContext);
    }

    private boolean currentKeyHasMoreActionTask() throws Exception {
        return StateUtil.listStateNotEmpty(this.actionTasksKState);
    }

    private void tryResumeProcessActionTasks() throws Exception {
        Iterable keys = (Iterable)this.currentProcessingKeysOpState.get();
        if (keys != null) {
            for (Object key2 : keys) {
                this.keySegmentQueue.addKeyToLastSegment(key2);
                this.mailboxExecutor.submit(() -> this.tryProcessActionTaskForKey(key2), "process action task");
            }
        }
        this.getKeyedStateBackend().applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor(PENDING_INPUT_EVENT_STATE_NAME, TypeInformation.of(Event.class)), (key, state) -> ((Iterable)state.get()).forEach(event -> this.keySegmentQueue.addKeyToLastSegment(key)));
    }

    private void initOrIncSequenceNumber() throws Exception {
        Long sequenceNumber = (Long)this.sequenceNumberKState.value();
        if (sequenceNumber == null) {
            this.sequenceNumberKState.update((Object)0L);
        } else {
            this.sequenceNumberKState.update((Object)(sequenceNumber + 1L));
        }
    }

    private ActionState maybeGetActionState(Object key, long sequenceNum, Action action, Event event) throws Exception {
        return this.actionStateStore == null ? null : this.actionStateStore.get(key.toString(), sequenceNum, action, event);
    }

    private void maybeInitActionState(Object key, long sequenceNum, Action action, Event event) throws Exception {
        if (this.actionStateStore != null && this.actionStateStore.get(key, sequenceNum, action, event) == null) {
            this.actionStateStore.put(key, sequenceNum, action, event, new ActionState(event));
        }
    }

    private void maybePersistTaskResult(Object key, long sequenceNum, Action action, Event event, RunnerContextImpl context, ActionTask.ActionTaskResult actionTaskResult) throws Exception {
        if (this.actionStateStore == null) {
            return;
        }
        if (!actionTaskResult.isFinished()) {
            return;
        }
        ActionState actionState = this.actionStateStore.get(key, sequenceNum, action, event);
        for (MemoryUpdate memoryUpdate : context.getAllMemoryUpdates()) {
            actionState.addMemoryUpdate(memoryUpdate);
        }
        for (Event outputEvent : actionTaskResult.getOutputEvents()) {
            actionState.addEvent(outputEvent);
        }
        this.actionStateStore.put(key, sequenceNum, action, event, actionState);
    }

    private void maybePruneState(Object key, long sequenceNum) throws Exception {
        if (this.actionStateStore != null) {
            this.actionStateStore.pruneState(key, sequenceNum);
        }
    }

    private void processEligibleWatermarks() throws Exception {
        Watermark mark = this.keySegmentQueue.popOldestWatermark();
        while (mark != null) {
            super.processWatermark(mark);
            mark = this.keySegmentQueue.popOldestWatermark();
        }
    }

    public static class ActionTaskExecutionException
    extends Exception {
        public ActionTaskExecutionException(String message, Throwable cause) {
            super(message, cause);
        }
    }
}

