Source code for ironic.common.fsm

# -*- coding: utf-8 -*-

#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
"""State machine modelling, copied from TaskFlow project.

This work will be turned into a library.
See https://github.com/harlowja/automaton

This is being used in the implementation of:
http://specs.openstack.org/openstack/ironic-specs/specs/kilo/new-ironic-state-machine.html
"""

from collections import OrderedDict  # noqa

import six

from ironic.common import exception as excp
from ironic.common.i18n import _


class _Jump(object):
    """A FSM transition tracks this data while jumping."""
    def __init__(self, name, on_enter, on_exit):
        self.name = name
        self.on_enter = on_enter
        self.on_exit = on_exit


[docs]class FSM(object): """A finite state machine. This class models a state machine, and expects an outside caller to manually trigger the state changes one at a time by invoking process_event """ def __init__(self, start_state=None): self._transitions = {} self._states = OrderedDict() self._start_state = start_state self._target_state = None # Note that _current is a _Jump instance self._current = None @property def start_state(self): return self._start_state @property def current_state(self): if self._current is not None: return self._current.name return None @property def target_state(self): return self._target_state @property def terminated(self): """Returns whether the state machine is in a terminal state.""" if self._current is None: return False return self._states[self._current.name]['terminal']
[docs] def add_state(self, state, on_enter=None, on_exit=None, target=None, terminal=None, stable=False): """Adds a given state to the state machine. The on_enter and on_exit callbacks, if provided will be expected to take two positional parameters, these being the state being exited (for on_exit) or the state being entered (for on_enter) and a second parameter which is the event that is being processed that caused the state transition. :param stable: Use this to specify that this state is a stable/passive state. A state must have been previously defined as 'stable' before it can be used as a 'target' :param target: The target state for 'state' to go to. Before a state can be used as a target it must have been previously added and specified as 'stable' """ if state in self._states: raise excp.Duplicate(_("State '%s' already defined") % state) if on_enter is not None: if not six.callable(on_enter): raise ValueError(_("On enter callback must be callable")) if on_exit is not None: if not six.callable(on_exit): raise ValueError(_("On exit callback must be callable")) if target is not None and target not in self._states: raise excp.InvalidState(_("Target state '%s' does not exist") % target) if target is not None and not self._states[target]['stable']: raise excp.InvalidState( _("Target state '%s' is not a 'stable' state") % target) self._states[state] = { 'terminal': bool(terminal), 'reactions': {}, 'on_enter': on_enter, 'on_exit': on_exit, 'target': target, 'stable': stable, } self._transitions[state] = OrderedDict()
[docs] def add_transition(self, start, end, event): """Adds an allowed transition from start -> end for the given event.""" if start not in self._states: raise excp.NotFound( _("Can not add a transition on event '%(event)s' that " "starts in a undefined state '%(state)s'") % {'event': event, 'state': start}) if end not in self._states: raise excp.NotFound( _("Can not add a transition on event '%(event)s' that " "ends in a undefined state '%(state)s'") % {'event': event, 'state': end}) self._transitions[start][event] = _Jump(end, self._states[end]['on_enter'], self._states[start]['on_exit'])
[docs] def process_event(self, event): """Trigger a state change in response to the provided event.""" current = self._current if current is None: raise excp.InvalidState(_("Can only process events after" " being initialized (not before)")) if self._states[current.name]['terminal']: raise excp.InvalidState( _("Can not transition from terminal " "state '%(state)s' on event '%(event)s'") % {'state': current.name, 'event': event}) if event not in self._transitions[current.name]: raise excp.InvalidState( _("Can not transition from state '%(state)s' on " "event '%(event)s' (no defined transition)") % {'state': current.name, 'event': event}) replacement = self._transitions[current.name][event] if current.on_exit is not None: current.on_exit(current.name, event) if replacement.on_enter is not None: replacement.on_enter(replacement.name, event) self._current = replacement # clear _target if we've reached it if (self._target_state is not None and self._target_state == replacement.name): self._target_state = None # if new state has a different target, update the target if self._states[replacement.name]['target'] is not None: self._target_state = self._states[replacement.name]['target']
[docs] def is_valid_event(self, event): """Check whether the event is actionable in the current state.""" current = self._current if current is None: return False if self._states[current.name]['terminal']: return False if event not in self._transitions[current.name]: return False return True
[docs] def initialize(self, state=None): """Sets up the state machine. sets the current state to the specified state, or start_state if no state was specified.. """ if state is None: state = self._start_state if state not in self._states: raise excp.NotFound(_("Can not start from an undefined" " state '%s'") % (state)) if self._states[state]['terminal']: raise excp.InvalidState(_("Can not start from a terminal" " state '%s'") % (state)) self._current = _Jump(state, None, None) self._target_state = self._states[state]['target']
[docs] def copy(self, shallow=False): """Copies the current state machine (shallow or deep). NOTE(harlowja): the copy will be left in an *uninitialized* state. NOTE(harlowja): when a shallow copy is requested the copy will share the same transition table and state table as the source; this can be advantageous if you have a machine and transitions + states that is defined somewhere and want to use copies to run with (the copies have the current state that is different between machines). """ c = FSM(self.start_state) if not shallow: for state, data in six.iteritems(self._states): copied_data = data.copy() copied_data['reactions'] = copied_data['reactions'].copy() c._states[state] = copied_data for state, data in six.iteritems(self._transitions): c._transitions[state] = data.copy() else: c._transitions = self._transitions c._states = self._states return c
def __contains__(self, state): """Returns if this state exists in the machines known states.""" return state in self._states @property def states(self): """Returns a list of the state names.""" return list(six.iterkeys(self._states)) def __iter__(self): """Iterates over (start, event, end) transition tuples.""" for state in six.iterkeys(self._states): for event, target in six.iteritems(self._transitions[state]): yield (state, event, target.name) @property def events(self): """Returns how many events exist.""" c = 0 for state in six.iterkeys(self._states): c += len(self._transitions[state]) return c