This repository has been archived by the owner on Feb 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
mox.py
2056 lines (1530 loc) · 58.6 KB
/
mox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python2.4
#
# Copyright 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mox, an object-mocking framework for Python.
Mox works in the record-replay-verify paradigm. When you first create
a mock object, it is in record mode. You then programmatically set
the expected behavior of the mock object (what methods are to be
called on it, with what parameters, what they should return, and in
what order).
Once you have set up the expected mock behavior, you put it in replay
mode. Now the mock responds to method calls just as you told it to.
If an unexpected method (or an expected method with unexpected
parameters) is called, then an exception will be raised.
Once you are done interacting with the mock, you need to verify that
all the expected interactions occured. (Maybe your code exited
prematurely without calling some cleanup method!) The verify phase
ensures that every expected method was called; otherwise, an exception
will be raised.
WARNING! Mock objects created by Mox are not thread-safe. If you are
call a mock in multiple threads, it should be guarded by a mutex.
TODO(user): Add the option to make mocks thread-safe!
Suggested usage / workflow:
# Create Mox factory
my_mox = Mox()
# Create a mock data access object
mock_dao = my_mox.CreateMock(DAOClass)
# Set up expected behavior
mock_dao.RetrievePersonWithIdentifier('1').AndReturn(person)
mock_dao.DeletePerson(person)
# Put mocks in replay mode
my_mox.ReplayAll()
# Inject mock object and run test
controller.SetDao(mock_dao)
controller.DeletePersonById('1')
# Verify all methods were called as expected
my_mox.VerifyAll()
"""
from collections import deque
import difflib
import inspect
import re
import types
import unittest
import stubout
class Error(AssertionError):
"""Base exception for this module."""
pass
class ExpectedMethodCallsError(Error):
"""Raised when Verify() is called before all expected methods have been called
"""
def __init__(self, expected_methods):
"""Init exception.
Args:
# expected_methods: A sequence of MockMethod objects that should have been
# called.
expected_methods: [MockMethod]
Raises:
ValueError: if expected_methods contains no methods.
"""
if not expected_methods:
raise ValueError("There must be at least one expected method")
Error.__init__(self)
self._expected_methods = expected_methods
def __str__(self):
calls = "\n".join(["%3d. %s" % (i, m)
for i, m in enumerate(self._expected_methods)])
return "Verify: Expected methods never called:\n%s" % (calls,)
class UnexpectedMethodCallError(Error):
"""Raised when an unexpected method is called.
This can occur if a method is called with incorrect parameters, or out of the
specified order.
"""
def __init__(self, unexpected_method, expected):
"""Init exception.
Args:
# unexpected_method: MockMethod that was called but was not at the head of
# the expected_method queue.
# expected: MockMethod or UnorderedGroup the method should have
# been in.
unexpected_method: MockMethod
expected: MockMethod or UnorderedGroup
"""
Error.__init__(self)
if expected is None:
self._str = "Unexpected method call %s" % (unexpected_method,)
else:
differ = difflib.Differ()
diff = differ.compare(str(unexpected_method).splitlines(True),
str(expected).splitlines(True))
self._str = ("Unexpected method call. unexpected:- expected:+\n%s"
% ("\n".join(line.rstrip() for line in diff),))
def __str__(self):
return self._str
class UnknownMethodCallError(Error):
"""Raised if an unknown method is requested of the mock object."""
def __init__(self, unknown_method_name):
"""Init exception.
Args:
# unknown_method_name: Method call that is not part of the mocked class's
# public interface.
unknown_method_name: str
"""
Error.__init__(self)
self._unknown_method_name = unknown_method_name
def __str__(self):
return "Method called is not a member of the object: %s" % \
self._unknown_method_name
class PrivateAttributeError(Error):
"""
Raised if a MockObject is passed a private additional attribute name.
"""
def __init__(self, attr):
Error.__init__(self)
self._attr = attr
def __str__(self):
return ("Attribute '%s' is private and should not be available in a mock "
"object." % attr)
class ExpectedMockCreationError(Error):
"""Raised if mocks should have been created by StubOutClassWithMocks."""
def __init__(self, expected_mocks):
"""Init exception.
Args:
# expected_mocks: A sequence of MockObjects that should have been
# created
Raises:
ValueError: if expected_mocks contains no methods.
"""
if not expected_mocks:
raise ValueError("There must be at least one expected method")
Error.__init__(self)
self._expected_mocks = expected_mocks
def __str__(self):
mocks = "\n".join(["%3d. %s" % (i, m)
for i, m in enumerate(self._expected_mocks)])
return "Verify: Expected mocks never created:\n%s" % (mocks,)
class UnexpectedMockCreationError(Error):
"""Raised if too many mocks were created by StubOutClassWithMocks."""
def __init__(self, instance, *params, **named_params):
"""Init exception.
Args:
# instance: the type of obejct that was created
# params: parameters given during instantiation
# named_params: named parameters given during instantiation
"""
Error.__init__(self)
self._instance = instance
self._params = params
self._named_params = named_params
def __str__(self):
args = ", ".join(["%s" % v for i, v in enumerate(self._params)])
error = "Unexpected mock creation: %s(%s" % (self._instance, args)
if self._named_params:
error += ", " + ", ".join(["%s=%s" % (k, v) for k, v in
self._named_params.iteritems()])
error += ")"
return error
class Mox(object):
"""Mox: a factory for creating mock objects."""
# A list of types that should be stubbed out with MockObjects (as
# opposed to MockAnythings).
_USE_MOCK_OBJECT = [types.ClassType, types.FunctionType, types.InstanceType,
types.ModuleType, types.ObjectType, types.TypeType,
types.MethodType, types.UnboundMethodType,
]
# A list of types that may be stubbed out with a MockObjectFactory.
_USE_MOCK_FACTORY = [types.ClassType, types.ObjectType, types.TypeType]
def __init__(self):
"""Initialize a new Mox."""
self._mock_objects = []
self.stubs = stubout.StubOutForTesting()
def CreateMock(self, class_to_mock, attrs=None):
"""Create a new mock object.
Args:
# class_to_mock: the class to be mocked
class_to_mock: class
attrs: dict of attribute names to values that will be set on the mock
object. Only public attributes may be set.
Returns:
MockObject that can be used as the class_to_mock would be.
"""
if attrs is None:
attrs = {}
new_mock = MockObject(class_to_mock, attrs=attrs)
self._mock_objects.append(new_mock)
return new_mock
def CreateMockAnything(self, description=None):
"""Create a mock that will accept any method calls.
This does not enforce an interface.
Args:
description: str. Optionally, a descriptive name for the mock object being
created, for debugging output purposes.
"""
new_mock = MockAnything(description=description)
self._mock_objects.append(new_mock)
return new_mock
def ReplayAll(self):
"""Set all mock objects to replay mode."""
for mock_obj in self._mock_objects:
mock_obj._Replay()
def VerifyAll(self):
"""Call verify on all mock objects created."""
for mock_obj in self._mock_objects:
mock_obj._Verify()
def ResetAll(self):
"""Call reset on all mock objects. This does not unset stubs."""
for mock_obj in self._mock_objects:
mock_obj._Reset()
def StubOutWithMock(self, obj, attr_name, use_mock_anything=False):
"""Replace a method, attribute, etc. with a Mock.
This will replace a class or module with a MockObject, and everything else
(method, function, etc) with a MockAnything. This can be overridden to
always use a MockAnything by setting use_mock_anything to True.
Args:
obj: A Python object (class, module, instance, callable).
attr_name: str. The name of the attribute to replace with a mock.
use_mock_anything: bool. True if a MockAnything should be used regardless
of the type of attribute.
"""
attr_to_replace = getattr(obj, attr_name)
attr_type = type(attr_to_replace)
if attr_type == MockAnything or attr_type == MockObject:
raise TypeError('Cannot mock a MockAnything! Did you remember to '
'call UnsetStubs in your previous test?')
if attr_type in self._USE_MOCK_OBJECT and not use_mock_anything:
stub = self.CreateMock(attr_to_replace)
else:
stub = self.CreateMockAnything(description='Stub for %s' % attr_to_replace)
stub.__name__ = attr_name
self.stubs.Set(obj, attr_name, stub)
def StubOutClassWithMocks(self, obj, attr_name):
"""Replace a class with a "mock factory" that will create mock objects.
This is useful if the code-under-test directly instantiates
dependencies. Previously some boilder plate was necessary to
create a mock that would act as a factory. Using
StubOutClassWithMocks, once you've stubbed out the class you may
use the stubbed class as you would any other mock created by mox:
during the record phase, new mock instances will be created, and
during replay, the recorded mocks will be returned.
In replay mode
# Example using StubOutWithMock (the old, clunky way):
mock1 = mox.CreateMock(my_import.FooClass)
mock2 = mox.CreateMock(my_import.FooClass)
foo_factory = mox.StubOutWithMock(my_import, 'FooClass',
use_mock_anything=True)
foo_factory(1, 2).AndReturn(mock1)
foo_factory(9, 10).AndReturn(mock2)
mox.ReplayAll()
my_import.FooClass(1, 2) # Returns mock1 again.
my_import.FooClass(9, 10) # Returns mock2 again.
mox.VerifyAll()
# Example using StubOutClassWithMocks:
mox.StubOutClassWithMocks(my_import, 'FooClass')
mock1 = my_import.FooClass(1, 2) # Returns a new mock of FooClass
mock2 = my_import.FooClass(9, 10) # Returns another mock instance
mox.ReplayAll()
my_import.FooClass(1, 2) # Returns mock1 again.
my_import.FooClass(9, 10) # Returns mock2 again.
mox.VerifyAll()
"""
attr_to_replace = getattr(obj, attr_name)
attr_type = type(attr_to_replace)
if attr_type == MockAnything or attr_type == MockObject:
raise TypeError('Cannot mock a MockAnything! Did you remember to '
'call UnsetStubs in your previous test?')
if attr_type not in self._USE_MOCK_FACTORY:
raise TypeError('Given attr is not a Class. Use StubOutWithMock.')
factory = _MockObjectFactory(attr_to_replace, self)
self._mock_objects.append(factory)
self.stubs.Set(obj, attr_name, factory)
def UnsetStubs(self):
"""Restore stubs to their original state."""
self.stubs.UnsetAll()
def Replay(*args):
"""Put mocks into Replay mode.
Args:
# args is any number of mocks to put into replay mode.
"""
for mock in args:
mock._Replay()
def Verify(*args):
"""Verify mocks.
Args:
# args is any number of mocks to be verified.
"""
for mock in args:
mock._Verify()
def Reset(*args):
"""Reset mocks.
Args:
# args is any number of mocks to be reset.
"""
for mock in args:
mock._Reset()
class MockAnything:
"""A mock that can be used to mock anything.
This is helpful for mocking classes that do not provide a public interface.
"""
def __init__(self, description=None):
"""Initialize a new MockAnything.
Args:
description: str. Optionally, a descriptive name for the mock object being
created, for debugging output purposes.
"""
self._description = description
self._Reset()
def __repr__(self):
if self._description:
return '<MockAnything instance of %s>' % self._description
else:
return '<MockAnything instance>'
def __getattr__(self, method_name):
"""Intercept method calls on this object.
A new MockMethod is returned that is aware of the MockAnything's
state (record or replay). The call will be recorded or replayed
by the MockMethod's __call__.
Args:
# method name: the name of the method being called.
method_name: str
Returns:
A new MockMethod aware of MockAnything's state (record or replay).
"""
if method_name == '__dir__':
return self.__class__.__dir__.__get__(self, self.__class__)
return self._CreateMockMethod(method_name)
def _CreateMockMethod(self, method_name, method_to_mock=None):
"""Create a new mock method call and return it.
Args:
# method_name: the name of the method being called.
# method_to_mock: The actual method being mocked, used for introspection.
method_name: str
method_to_mock: a method object
Returns:
A new MockMethod aware of MockAnything's state (record or replay).
"""
return MockMethod(method_name, self._expected_calls_queue,
self._replay_mode, method_to_mock=method_to_mock,
description=self._description)
def __nonzero__(self):
"""Return 1 for nonzero so the mock can be used as a conditional."""
return 1
def __eq__(self, rhs):
"""Provide custom logic to compare objects."""
return (isinstance(rhs, MockAnything) and
self._replay_mode == rhs._replay_mode and
self._expected_calls_queue == rhs._expected_calls_queue)
def __ne__(self, rhs):
"""Provide custom logic to compare objects."""
return not self == rhs
def _Replay(self):
"""Start replaying expected method calls."""
self._replay_mode = True
def _Verify(self):
"""Verify that all of the expected calls have been made.
Raises:
ExpectedMethodCallsError: if there are still more method calls in the
expected queue.
"""
# If the list of expected calls is not empty, raise an exception
if self._expected_calls_queue:
# The last MultipleTimesGroup is not popped from the queue.
if (len(self._expected_calls_queue) == 1 and
isinstance(self._expected_calls_queue[0], MultipleTimesGroup) and
self._expected_calls_queue[0].IsSatisfied()):
pass
else:
raise ExpectedMethodCallsError(self._expected_calls_queue)
def _Reset(self):
"""Reset the state of this mock to record mode with an empty queue."""
# Maintain a list of method calls we are expecting
self._expected_calls_queue = deque()
# Make sure we are in setup mode, not replay mode
self._replay_mode = False
class MockObject(MockAnything, object):
"""A mock object that simulates the public/protected interface of a class."""
def __init__(self, class_to_mock, attrs=None):
"""Initialize a mock object.
This determines the methods and properties of the class and stores them.
Args:
# class_to_mock: class to be mocked
class_to_mock: class
attrs: dict of attribute names to values that will be set on the mock
object. Only public attributes may be set.
Raises:
PrivateAttributeError: if a supplied attribute is not public.
ValueError: if an attribute would mask an existing method.
"""
if attrs is None:
attrs = {}
# This is used to hack around the mixin/inheritance of MockAnything, which
# is not a proper object (it can be anything. :-)
MockAnything.__dict__['__init__'](self)
# Get a list of all the public and special methods we should mock.
self._known_methods = set()
self._known_vars = set()
self._class_to_mock = class_to_mock
try:
if inspect.isclass(self._class_to_mock):
self._description = class_to_mock.__name__
else:
self._description = type(class_to_mock).__name__
except Exception:
pass
for method in dir(class_to_mock):
attr = getattr(class_to_mock, method)
if callable(attr):
self._known_methods.add(method)
elif not (type(attr) is property):
# treating properties as class vars makes little sense.
self._known_vars.add(method)
# Set additional attributes at instantiation time; this is quicker
# than manually setting attributes that are normally created in
# __init__.
for attr, value in attrs.items():
if attr.startswith("_"):
raise PrivateAttributeError(attr)
elif attr in self._known_methods:
raise ValueError("'%s' is a method of '%s' objects." % (attr,
class_to_mock))
else:
setattr(self, attr, value)
def __getattr__(self, name):
"""Intercept attribute request on this object.
If the attribute is a public class variable, it will be returned and not
recorded as a call.
If the attribute is not a variable, it is handled like a method
call. The method name is checked against the set of mockable
methods, and a new MockMethod is returned that is aware of the
MockObject's state (record or replay). The call will be recorded
or replayed by the MockMethod's __call__.
Args:
# name: the name of the attribute being requested.
name: str
Returns:
Either a class variable or a new MockMethod that is aware of the state
of the mock (record or replay).
Raises:
UnknownMethodCallError if the MockObject does not mock the requested
method.
"""
if name in self._known_vars:
return getattr(self._class_to_mock, name)
if name in self._known_methods:
return self._CreateMockMethod(
name,
method_to_mock=getattr(self._class_to_mock, name))
raise UnknownMethodCallError(name)
def __eq__(self, rhs):
"""Provide custom logic to compare objects."""
return (isinstance(rhs, MockObject) and
self._class_to_mock == rhs._class_to_mock and
self._replay_mode == rhs._replay_mode and
self._expected_calls_queue == rhs._expected_calls_queue)
def __setitem__(self, key, value):
"""Provide custom logic for mocking classes that support item assignment.
Args:
key: Key to set the value for.
value: Value to set.
Returns:
Expected return value in replay mode. A MockMethod object for the
__setitem__ method that has already been called if not in replay mode.
Raises:
TypeError if the underlying class does not support item assignment.
UnexpectedMethodCallError if the object does not expect the call to
__setitem__.
"""
# Verify the class supports item assignment.
if '__setitem__' not in dir(self._class_to_mock):
raise TypeError('object does not support item assignment')
# If we are in replay mode then simply call the mock __setitem__ method.
if self._replay_mode:
return MockMethod('__setitem__', self._expected_calls_queue,
self._replay_mode)(key, value)
# Otherwise, create a mock method __setitem__.
return self._CreateMockMethod('__setitem__')(key, value)
def __getitem__(self, key):
"""Provide custom logic for mocking classes that are subscriptable.
Args:
key: Key to return the value for.
Returns:
Expected return value in replay mode. A MockMethod object for the
__getitem__ method that has already been called if not in replay mode.
Raises:
TypeError if the underlying class is not subscriptable.
UnexpectedMethodCallError if the object does not expect the call to
__getitem__.
"""
# Verify the class supports item assignment.
if '__getitem__' not in dir(self._class_to_mock):
raise TypeError('unsubscriptable object')
# If we are in replay mode then simply call the mock __getitem__ method.
if self._replay_mode:
return MockMethod('__getitem__', self._expected_calls_queue,
self._replay_mode)(key)
# Otherwise, create a mock method __getitem__.
return self._CreateMockMethod('__getitem__')(key)
def __iter__(self):
"""Provide custom logic for mocking classes that are iterable.
Returns:
Expected return value in replay mode. A MockMethod object for the
__iter__ method that has already been called if not in replay mode.
Raises:
TypeError if the underlying class is not iterable.
UnexpectedMethodCallError if the object does not expect the call to
__iter__.
"""
methods = dir(self._class_to_mock)
# Verify the class supports iteration.
if '__iter__' not in methods:
# If it doesn't have iter method and we are in replay method, then try to
# iterate using subscripts.
if '__getitem__' not in methods or not self._replay_mode:
raise TypeError('not iterable object')
else:
results = []
index = 0
try:
while True:
results.append(self[index])
index += 1
except IndexError:
return iter(results)
# If we are in replay mode then simply call the mock __iter__ method.
if self._replay_mode:
return MockMethod('__iter__', self._expected_calls_queue,
self._replay_mode)()
# Otherwise, create a mock method __iter__.
return self._CreateMockMethod('__iter__')()
def __contains__(self, key):
"""Provide custom logic for mocking classes that contain items.
Args:
key: Key to look in container for.
Returns:
Expected return value in replay mode. A MockMethod object for the
__contains__ method that has already been called if not in replay mode.
Raises:
TypeError if the underlying class does not implement __contains__
UnexpectedMethodCaller if the object does not expect the call to
__contains__.
"""
contains = self._class_to_mock.__dict__.get('__contains__', None)
if contains is None:
raise TypeError('unsubscriptable object')
if self._replay_mode:
return MockMethod('__contains__', self._expected_calls_queue,
self._replay_mode)(key)
return self._CreateMockMethod('__contains__')(key)
def __call__(self, *params, **named_params):
"""Provide custom logic for mocking classes that are callable."""
# Verify the class we are mocking is callable.
callable = hasattr(self._class_to_mock, '__call__')
if not callable:
raise TypeError('Not callable')
# Because the call is happening directly on this object instead of a method,
# the call on the mock method is made right here
# If we are mocking a Function, then use the function, and not the
# __call__ method
method = None
if type(self._class_to_mock) in (types.FunctionType, types.MethodType):
method = self._class_to_mock;
else:
method = getattr(self._class_to_mock, '__call__')
mock_method = self._CreateMockMethod('__call__', method_to_mock=method)
return mock_method(*params, **named_params)
@property
def __class__(self):
"""Return the class that is being mocked."""
return self._class_to_mock
@property
def __name__(self):
"""Return the name that is being mocked."""
return self._description
class _MockObjectFactory(MockObject):
"""A MockObjectFactory creates mocks and verifies __init__ params.
A MockObjectFactory removes the boiler plate code that was previously
necessary to stub out direction instantiation of a class.
The MockObjectFactory creates new MockObjects when called and verifies the
__init__ params are correct when in record mode. When replaying, existing
mocks are returned, and the __init__ params are verified.
See StubOutWithMock vs StubOutClassWithMocks for more detail.
"""
def __init__(self, class_to_mock, mox_instance):
MockObject.__init__(self, class_to_mock)
self._mox = mox_instance
self._instance_queue = deque()
def __call__(self, *params, **named_params):
"""Instantiate and record that a new mock has been created."""
method = getattr(self._class_to_mock, '__init__')
mock_method = self._CreateMockMethod('__init__', method_to_mock=method)
# Note: calling mock_method() is deferred in order to catch the
# empty instance_queue first.
if self._replay_mode:
if not self._instance_queue:
raise UnexpectedMockCreationError(self._class_to_mock, *params,
**named_params)
mock_method(*params, **named_params)
return self._instance_queue.pop()
else:
mock_method(*params, **named_params)
instance = self._mox.CreateMock(self._class_to_mock)
self._instance_queue.appendleft(instance)
return instance
def _Verify(self):
"""Verify that all mocks have been created."""
if self._instance_queue:
raise ExpectedMockCreationError(self._instance_queue)
super(_MockObjectFactory, self)._Verify()
class MethodSignatureChecker(object):
"""Ensures that methods are called correctly."""
_NEEDED, _DEFAULT, _GIVEN = range(3)
def __init__(self, method):
"""Creates a checker.
Args:
# method: A method to check.
method: function
Raises:
ValueError: method could not be inspected, so checks aren't possible.
Some methods and functions like built-ins can't be inspected.
"""
try:
self._args, varargs, varkw, defaults = inspect.getargspec(method)
except TypeError:
raise ValueError('Could not get argument specification for %r'
% (method,))
if inspect.ismethod(method):
self._args = self._args[1:] # Skip 'self'.
self._method = method
self._instance = None # May contain the instance this is bound to.
self._has_varargs = varargs is not None
self._has_varkw = varkw is not None
if defaults is None:
self._required_args = self._args
self._default_args = []
else:
self._required_args = self._args[:-len(defaults)]
self._default_args = self._args[-len(defaults):]
def _RecordArgumentGiven(self, arg_name, arg_status):
"""Mark an argument as being given.
Args:
# arg_name: The name of the argument to mark in arg_status.
# arg_status: Maps argument names to one of _NEEDED, _DEFAULT, _GIVEN.
arg_name: string
arg_status: dict
Raises:
AttributeError: arg_name is already marked as _GIVEN.
"""
if arg_status.get(arg_name, None) == MethodSignatureChecker._GIVEN:
raise AttributeError('%s provided more than once' % (arg_name,))
arg_status[arg_name] = MethodSignatureChecker._GIVEN
def Check(self, params, named_params):
"""Ensures that the parameters used while recording a call are valid.
Args:
# params: A list of positional parameters.
# named_params: A dict of named parameters.
params: list
named_params: dict
Raises:
AttributeError: the given parameters don't work with the given method.
"""
arg_status = dict((a, MethodSignatureChecker._NEEDED)
for a in self._required_args)
for arg in self._default_args:
arg_status[arg] = MethodSignatureChecker._DEFAULT
# WARNING: Suspect hack ahead.
#
# Check to see if this is an unbound method, where the instance
# should be bound as the first argument. We try to determine if
# the first argument (param[0]) is an instance of the class, or it
# is equivalent to the class (used to account for Comparators).
#
# NOTE: If a Func() comparator is used, and the signature is not
# correct, this will cause extra executions of the function.
if inspect.ismethod(self._method):
# The extra param accounts for the bound instance.
if len(params) > len(self._required_args):
expected = getattr(self._method, 'im_class', None)
# Check if the param is an instance of the expected class,
# or check equality (useful for checking Comparators).
# This is a hack to work around the fact that the first
# parameter can be a Comparator, and the comparison may raise
# an exception during this comparison, which is OK.
try:
param_equality = (params[0] == expected)
except:
param_equality = False;
if isinstance(params[0], expected) or param_equality:
params = params[1:]
# If the IsA() comparator is being used, we need to check the
# inverse of the usual case - that the given instance is a subclass
# of the expected class. For example, the code under test does
# late binding to a subclass.
elif isinstance(params[0], IsA) and params[0]._IsSubClass(expected):
params = params[1:]
# Check that each positional param is valid.
for i in range(len(params)):
try:
arg_name = self._args[i]
except IndexError:
if not self._has_varargs:
raise AttributeError('%s does not take %d or more positional '
'arguments' % (self._method.__name__, i))
else:
self._RecordArgumentGiven(arg_name, arg_status)
# Check each keyword argument.
for arg_name in named_params:
if arg_name not in arg_status and not self._has_varkw:
raise AttributeError('%s is not expecting keyword argument %s'
% (self._method.__name__, arg_name))
self._RecordArgumentGiven(arg_name, arg_status)
# Ensure all the required arguments have been given.
still_needed = [k for k, v in arg_status.iteritems()
if v == MethodSignatureChecker._NEEDED]
if still_needed:
raise AttributeError('No values given for arguments: %s'
% (' '.join(sorted(still_needed))))
class MockMethod(object):
"""Callable mock method.
A MockMethod should act exactly like the method it mocks, accepting parameters
and returning a value, or throwing an exception (as specified). When this
method is called, it can optionally verify whether the called method (name and
signature) matches the expected method.
"""
def __init__(self, method_name, call_queue, replay_mode,
method_to_mock=None, description=None):
"""Construct a new mock method.
Args:
# method_name: the name of the method
# call_queue: deque of calls, verify this call against the head, or add
# this call to the queue.
# replay_mode: False if we are recording, True if we are verifying calls
# against the call queue.
# method_to_mock: The actual method being mocked, used for introspection.
# description: optionally, a descriptive name for this method. Typically
# this is equal to the descriptive name of the method's class.
method_name: str
call_queue: list or deque
replay_mode: bool
method_to_mock: a method object
description: str or None
"""
self._name = method_name
self.__name__ = method_name
self._call_queue = call_queue
if not isinstance(call_queue, deque):
self._call_queue = deque(self._call_queue)
self._replay_mode = replay_mode
self._description = description
self._params = None
self._named_params = None
self._return_value = None