Source code for heat.tests.test_common_policy

# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the 'License'); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import os.path

import mox
from nose.plugins.attrib import attr
from oslo.config import cfg
import unittest

import heat.api  # pyflakes_bypass  review 23102

from heat.common import context
from heat.common import policy
from heat.common import exception


@attr(tag=['unit', 'common-policy', 'Enforcer'])
@attr(speed='fast')
[docs]class TestPolicyEnforcer(unittest.TestCase): cfn_actions = ("ListStacks", "CreateStack", "DescribeStacks", "DeleteStack", "UpdateStack", "DescribeStackEvents", "ValidateTemplate", "GetTemplate", "EstimateTemplateCost", "DescribeStackResource", "DescribeStackResources") cw_actions = ("DeleteAlarms", "DescribeAlarmHistory", "DescribeAlarms", "DescribeAlarmsForMetric", "DisableAlarmActions", "EnableAlarmActions", "GetMetricStatistics", "ListMetrics", "PutMetricAlarm", "PutMetricData", "SetAlarmState")
[docs] def setUp(self): self.path = os.path.dirname(os.path.realpath(__file__)) + "/policy/" self.m = mox.Mox() opts = [ cfg.StrOpt('config_dir', default=self.path), cfg.StrOpt('config_file', default='foo'), cfg.StrOpt('project', default='heat'), ] cfg.CONF.register_opts(opts) print "setup complete"
[docs] def tearDown(self): self.m.UnsetStubs() print "teardown complete"
[docs] def test_policy_cfn_default(self): enforcer = policy.Enforcer(scope='cloudformation') ctx = context.RequestContext(roles=[]) for action in self.cfn_actions: # Everything should be allowed enforcer.enforce(ctx, action, {})
[docs] def test_policy_cfn_notallowed(self): pf = self.path + 'notallowed.json' self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file') policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf) self.m.ReplayAll() enforcer = policy.Enforcer(scope='cloudformation') ctx = context.RequestContext(roles=[]) for action in self.cfn_actions: # Everything should raise the default exception.Forbidden self.assertRaises(exception.Forbidden, enforcer.enforce, ctx, action, {}) self.m.VerifyAll()
[docs] def test_policy_cfn_deny_stack_user(self): pf = self.path + 'deny_stack_user.json' self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file') policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf) self.m.ReplayAll() enforcer = policy.Enforcer(scope='cloudformation') ctx = context.RequestContext(roles=['heat_stack_user']) for action in self.cfn_actions: # Everything apart from DescribeStackResource should be Forbidden if action == "DescribeStackResource": enforcer.enforce(ctx, action, {}) else: self.assertRaises(exception.Forbidden, enforcer.enforce, ctx, action, {}) self.m.VerifyAll()
[docs] def test_policy_cfn_allow_non_stack_user(self): pf = self.path + 'deny_stack_user.json' self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file') policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf) self.m.ReplayAll() enforcer = policy.Enforcer(scope='cloudformation') ctx = context.RequestContext(roles=['not_a_stack_user']) for action in self.cfn_actions: # Everything should be allowed enforcer.enforce(ctx, action, {}) self.m.VerifyAll()
[docs] def test_policy_cw_deny_stack_user(self): pf = self.path + 'deny_stack_user.json' self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file') policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf) self.m.ReplayAll() enforcer = policy.Enforcer(scope='cloudwatch') ctx = context.RequestContext(roles=['heat_stack_user']) for action in self.cw_actions: # Everything apart from PutMetricData should be Forbidden if action == "PutMetricData": enforcer.enforce(ctx, action, {}) else: self.assertRaises(exception.Forbidden, enforcer.enforce, ctx, action, {}) self.m.VerifyAll()
[docs] def test_policy_cw_allow_non_stack_user(self): pf = self.path + 'deny_stack_user.json' self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file') policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf) self.m.ReplayAll() enforcer = policy.Enforcer(scope='cloudwatch') ctx = context.RequestContext(roles=['not_a_stack_user']) for action in self.cw_actions: # Everything should be allowed enforcer.enforce(ctx, action, {}) self.m.VerifyAll()