# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from automaton import machines
from automaton import runners
from taskflow import logging
from taskflow import states as st
from taskflow.types import failure
# Waiting state timeout (in seconds).
_WAITING_TIMEOUT = 60
# Meta states the state machine uses.
_UNDEFINED = 'UNDEFINED'
_GAME_OVER = 'GAME_OVER'
_META_STATES = (_GAME_OVER, _UNDEFINED)
# Event name constants the state machine uses.
_SCHEDULE = 'schedule_next'
_WAIT = 'wait_finished'
_ANALYZE = 'examine_finished'
_FINISH = 'completed'
_FAILED = 'failed'
_SUSPENDED = 'suspended'
_SUCCESS = 'success'
_REVERTED = 'reverted'
_START = 'start'
LOG = logging.getLogger(__name__)
class _MachineMemory(object):
"""State machine memory."""
def __init__(self):
self.next_nodes = set()
self.not_done = set()
self.failures = []
self.done = set()
[docs]class Runner(object):
"""State machine *builder* + *runner* that powers the engine components.
NOTE(harlowja): the machine (states and events that will trigger
transitions) that this builds is represented by the following
table::
+--------------+------------------+------------+----------+---------+
Start | Event | End | On Enter | On Exit
+--------------+------------------+------------+----------+---------+
ANALYZING | completed | GAME_OVER | |
ANALYZING | schedule_next | SCHEDULING | |
ANALYZING | wait_finished | WAITING | |
FAILURE[$] | | | |
GAME_OVER | failed | FAILURE | |
GAME_OVER | reverted | REVERTED | |
GAME_OVER | success | SUCCESS | |
GAME_OVER | suspended | SUSPENDED | |
RESUMING | schedule_next | SCHEDULING | |
REVERTED[$] | | | |
SCHEDULING | wait_finished | WAITING | |
SUCCESS[$] | | | |
SUSPENDED[$] | | | |
UNDEFINED[^] | start | RESUMING | |
WAITING | examine_finished | ANALYZING | |
+--------------+------------------+------------+----------+---------+
Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
if the engine has been suspended or the engine has failed (due to a
non-resolveable task failure or scheduling failure) the machine will stop
executing new tasks (currently running tasks will be allowed to complete)
and this machines run loop will be broken.
NOTE(harlowja): If the runtimes scheduler component is able to schedule
tasks in parallel, this enables parallel running and/or reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter):
self._runtime = runtime
self._analyzer = runtime.analyzer
self._completer = runtime.completer
self._scheduler = runtime.scheduler
self._storage = runtime.storage
self._waiter = waiter
[docs] def runnable(self):
"""Checks if the storage says the flow is still runnable/running."""
return self._storage.get_flow_state() == st.RUNNING
[docs] def build(self, timeout=None):
"""Builds a state-machine (that can be/is used during running)."""
memory = _MachineMemory()
if timeout is None:
timeout = _WAITING_TIMEOUT
# Cache some local functions/methods...
do_schedule = self._scheduler.schedule
do_complete = self._completer.complete
def iter_next_nodes(target_node=None):
# Yields and filters and tweaks the next nodes to execute...
maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
for node, late_decider in maybe_nodes:
proceed = late_decider.check_and_affect(self._runtime)
if proceed:
yield node
def resume(old_state, new_state, event):
# This reaction function just updates the state machines memory
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
memory.next_nodes.update(self._completer.resume())
memory.next_nodes.update(iter_next_nodes())
return _SCHEDULE
def game_over(old_state, new_state, event):
# This reaction function is mainly a intermediary delegation
# function that analyzes the current memory and transitions to
# the appropriate handler that will deal with the memory values,
# it is *always* called before the final state is entered.
if memory.failures:
return _FAILED
if any(1 for node in iter_next_nodes()):
return _SUSPENDED
elif self._analyzer.is_success():
return _SUCCESS
else:
return _REVERTED
def schedule(old_state, new_state, event):
# This reaction function starts to schedule the memory's next
# nodes (iff the engine is still runnable, which it may not be
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
if self.runnable() and memory.next_nodes:
not_done, failures = do_schedule(memory.next_nodes)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_nodes.clear()
return _WAIT
def wait(old_state, new_state, event):
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
# py2 and py3.
if memory.not_done:
done, not_done = self._waiter(memory.not_done, timeout=timeout)
memory.done.update(done)
memory.not_done = not_done
return _ANALYZE
def analyze(old_state, new_state, event):
# This reaction function is responsible for analyzing all nodes
# that have finished executing and completing them and figuring
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_nodes = set()
while memory.done:
fut = memory.done.pop()
node = fut.atom
try:
event, result = fut.result()
retain = do_complete(node, event, result)
if isinstance(result, failure.Failure):
if retain:
memory.failures.append(result)
else:
# NOTE(harlowja): avoid making any
# intention request to storage unless we are
# sure we are in DEBUG enabled logging (otherwise
# we will call this all the time even when DEBUG
# is not enabled, which would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = self._storage.get_atom_intention(
node.name)
LOG.debug("Discarding failure '%s' (in"
" response to event '%s') under"
" completion units request during"
" completion of node '%s' (intention"
" is to %s)", result, event,
node, intention)
except Exception:
memory.failures.append(failure.Failure())
else:
try:
more_nodes = set(iter_next_nodes(target_node=node))
except Exception:
memory.failures.append(failure.Failure())
else:
next_nodes.update(more_nodes)
if self.runnable() and next_nodes and not memory.failures:
memory.next_nodes.update(next_nodes)
return _SCHEDULE
elif memory.not_done:
return _WAIT
else:
return _FINISH
def on_exit(old_state, event):
LOG.debug("Exiting old state '%s' in response to event '%s'",
old_state, event)
def on_enter(new_state, event):
LOG.debug("Entering new state '%s' in response to event '%s'",
new_state, event)
# NOTE(harlowja): when ran in blather mode it is quite useful
# to track the various state transitions as they happen...
watchers = {}
if LOG.isEnabledFor(logging.BLATHER):
watchers['on_exit'] = on_exit
watchers['on_enter'] = on_enter
m = machines.FiniteMachine()
m.add_state(_GAME_OVER, **watchers)
m.add_state(_UNDEFINED, **watchers)
m.add_state(st.ANALYZING, **watchers)
m.add_state(st.RESUMING, **watchers)
m.add_state(st.REVERTED, terminal=True, **watchers)
m.add_state(st.SCHEDULING, **watchers)
m.add_state(st.SUCCESS, terminal=True, **watchers)
m.add_state(st.SUSPENDED, terminal=True, **watchers)
m.add_state(st.WAITING, **watchers)
m.add_state(st.FAILURE, terminal=True, **watchers)
m.default_start_state = _UNDEFINED
m.add_transition(_GAME_OVER, st.REVERTED, _REVERTED)
m.add_transition(_GAME_OVER, st.SUCCESS, _SUCCESS)
m.add_transition(_GAME_OVER, st.SUSPENDED, _SUSPENDED)
m.add_transition(_GAME_OVER, st.FAILURE, _FAILED)
m.add_transition(_UNDEFINED, st.RESUMING, _START)
m.add_transition(st.ANALYZING, _GAME_OVER, _FINISH)
m.add_transition(st.ANALYZING, st.SCHEDULING, _SCHEDULE)
m.add_transition(st.ANALYZING, st.WAITING, _WAIT)
m.add_transition(st.RESUMING, st.SCHEDULING, _SCHEDULE)
m.add_transition(st.SCHEDULING, st.WAITING, _WAIT)
m.add_transition(st.WAITING, st.ANALYZING, _ANALYZE)
m.add_reaction(_GAME_OVER, _FINISH, game_over)
m.add_reaction(st.ANALYZING, _ANALYZE, analyze)
m.add_reaction(st.RESUMING, _START, resume)
m.add_reaction(st.SCHEDULING, _SCHEDULE, schedule)
m.add_reaction(st.WAITING, _WAIT, wait)
m.freeze()
r = runners.FiniteRunner(m)
return (m, r, memory)
[docs] def run_iter(self, timeout=None):
"""Runs iteratively using a locally built state machine."""
machine, runner, memory = self.build(timeout=timeout)
for (_prior_state, new_state) in runner.run_iter(_START):
# NOTE(harlowja): skip over meta-states.
if new_state not in _META_STATES:
if new_state == st.FAILURE:
yield (new_state, memory.failures)
else:
yield (new_state, [])