This repository has been archived by the owner on Feb 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
metadata_test.py
105 lines (85 loc) · 4.01 KB
/
metadata_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at: http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distrib-
# uted under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# specific language governing permissions and limitations under the License.
"""Tests for metadata.py."""
__author__ = '[email protected] (Cihat Imamoglu)'
import json
import metadata
import test_utils
MAPROOT = {
'title': 't1',
'layers': [{'id': 12,
'type': 'KML',
'source': {'kml': {'url': 'http://x.com/a'}}},
{'id': 15,
'type': 'GEORSS',
'source': {'georss': {'url': 'http://y.com/b'}}},
{'id': 16,
'type': 'GOOGLE_TRAFFIC'}]
}
class MetadataTest(test_utils.BaseTest):
def testGetSourceAddresses(self):
self.assertEquals(
{'KML:http://x.com/a', 'GEORSS:http://y.com/b'},
set(metadata.GetSourceAddresses(MAPROOT)))
def testCacheSourceAddresses(self):
cache_key1, sources = metadata.CacheSourceAddresses('abc', MAPROOT)
self.assertEquals(
{'KML:http://x.com/a', 'GEORSS:http://y.com/b'},
set(metadata.SOURCE_ADDRESS_CACHE.Get(cache_key1)))
self.assertEquals(
{'KML:http://x.com/a', 'GEORSS:http://y.com/b'},
set(sources))
# Same map_version_key should yield the same cache key.
cache_key2, sources = metadata.CacheSourceAddresses('abc', MAPROOT)
self.assertEquals(cache_key1, cache_key2)
def testActivateSources(self):
sources = ['KML:http://x.com/a', 'GEORSS:http://y.com/b']
metadata.ActivateSources(sources)
# Both sources should now be queued for metadata fetches.
urls = sorted(task['url'] for task in self.PopTasks('metadata'))
self.assertEquals(2, len(urls))
self.AssertEqualsUrlWithUnorderedParams(
'/root/.metadata_fetch?source=GEORSS:http://y.com/b', urls[0])
self.AssertEqualsUrlWithUnorderedParams(
'/root/.metadata_fetch?source=KML:http://x.com/a', urls[1])
# Activating multiple times should not add redundant tasks.
metadata.ActivateSources(sources)
metadata.ActivateSources(sources)
self.assertEquals(0, len(self.PopTasks('metadata')))
def testGet(self):
cache_key, _ = metadata.CacheSourceAddresses('abc', MAPROOT)
metadata.METADATA_CACHE.Set('KML:http://x.com/a', {'length': 123})
metadata.METADATA_CACHE.Set('KML:http://p.com/q', {'length': 456})
# Map cache key, an address with metadata, and an address without metadata.
response = self.DoGet('/.metadata?ck=' + cache_key +
'&source=KML:http://p.com/q' +
'&source=KML:http://z.com/z')
self.assertEquals({
'KML:http://x.com/a': {'length': 123}, # in map, has metadata
'GEORSS:http://y.com/b': None, # in map, no metadata
'KML:http://p.com/q': {'length': 456}, # source param, has metadata
'KML:http://z.com/z': None # source param, no metadata
}, json.loads(response.body))
def testGetAndActivate(self):
self.DoGet('/.metadata?source=KML:http://u.com/v')
# Requesting metadata should activate the source and queue a task.
self.assertEquals(1, metadata.ACTIVE_CACHE.Get('KML:http://u.com/v'))
urls = sorted(task['url'] for task in self.PopTasks('metadata'))
self.assertEquals(1, len(urls))
self.AssertEqualsUrlWithUnorderedParams(
'/root/.metadata_fetch?source=KML:http://u.com/v', urls[0])
# Requesting multiple times should not add redundant tasks.
self.DoGet('/.metadata?source=KML:http://u.com/v')
self.DoGet('/.metadata?source=KML:http://u.com/v')
self.assertEquals(0, len(self.PopTasks('metadata')))
if __name__ == '__main__':
test_utils.main()