diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 746fdb61..b8a621a6 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -17,13 +17,16 @@ class InMemorySplitStorage(SplitStorage): """InMemory implementation of a split storage.""" - def __init__(self): + def __init__(self, flag_sets=[]): """Constructor.""" self._lock = threading.RLock() self._splits = {} self._change_number = -1 self._traffic_types = Counter() self._sets_feature_flag_map = {} + self.config_flag_sets_used = len(flag_sets) + for flag_set in flag_sets: + self._sets_feature_flag_map[flag_set] = set() def get(self, split_name): """ @@ -73,13 +76,15 @@ def _put(self, split): """ with self._lock: if split.name in self._splits: - self._remove_flag_sets(self._splits[split.name]) + self._remove_from_flag_sets(self._splits[split.name]) self._decrease_traffic_type_count(self._splits[split.name].traffic_type_name) self._splits[split.name] = split self._increase_traffic_type_count(split.traffic_type_name) if split.sets is not None: for flag_set in split.sets: if flag_set not in self._sets_feature_flag_map.keys(): + if self.config_flag_sets_used > 0: + continue self._sets_feature_flag_map[flag_set] = set() self._sets_feature_flag_map[flag_set].add(split.name) @@ -101,10 +106,10 @@ def _remove(self, split_name): self._splits.pop(split_name) self._decrease_traffic_type_count(split.traffic_type_name) - self._remove_flag_sets(split) + self._remove_from_flag_sets(split) return True - def _remove_flag_sets(self, feature_flag): + def _remove_from_flag_sets(self, feature_flag): """ Remove flag sets associated to a split @@ -114,7 +119,7 @@ def _remove_flag_sets(self, feature_flag): if feature_flag.sets is not None: for flag_set in feature_flag.sets: self._sets_feature_flag_map[flag_set].remove(feature_flag.name) - if len(self._sets_feature_flag_map[flag_set]) == 0: + if len(self._sets_feature_flag_map[flag_set]) == 0 and self.config_flag_sets_used == 0: del self._sets_feature_flag_map[flag_set] def get_feature_flags_by_set(self, set): @@ -232,6 +237,20 @@ def _decrease_traffic_type_count(self, traffic_type_name): self._traffic_types.subtract([traffic_type_name]) self._traffic_types += Counter() + def is_flag_set_exist(self, flag_set): + """ + Return whether a flag set exists in at least one feature flag in cache. + + :param flag_set: Flag set to validate. + :type flag_set: str + + :return: True if the flag_set exist. False otherwise. + :rtype: bool + """ + if flag_set in self._sets_feature_flag_map.keys(): + return True + return False + class InMemorySegmentStorage(SegmentStorage): """In-memory implementation of a segment storage.""" diff --git a/splitio/sync/split.py b/splitio/sync/split.py index a39f42d1..c904d9d1 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -80,17 +80,38 @@ def _fetch_until(self, fetch_options, till=None): _LOGGER.debug('Exception information: ', exc_info=True) raise exc + to_add = [] + to_delete = [] for feature_flag in feature_flag_changes.get('splits', []): - if feature_flag['status'] == splits.Status.ACTIVE.value: + if (self._feature_flag_storage.config_flag_sets_used == 0 and feature_flag['status'] == splits.Status.ACTIVE.value) or \ + (feature_flag['status'] == splits.Status.ACTIVE.value and self._check_flag_sets(feature_flag)): parsed = splits.from_raw(feature_flag) - self._feature_flag_storage.put(parsed) + to_add.append(parsed) segment_list.update(set(parsed.get_segment_names())) else: - self._feature_flag_storage.remove(feature_flag['name']) - self._feature_flag_storage.set_change_number(feature_flag_changes['till']) + if self._feature_flag_storage.get(feature_flag['name']) is not None: + to_delete.append(feature_flag['name']) + + self._feature_flag_storage.update(to_add, to_delete, feature_flag_changes['till']) if feature_flag_changes['till'] == feature_flag_changes['since']: return feature_flag_changes['till'], segment_list + def _check_flag_sets(self, feature_flag): + """ + Check all flag sets in a feature flag, return True if any of sets exist in storage + + :param feature_flag: Flag set to validate. + :type feature_flag: json + + :return: True if any of its flag_set exist. False otherwise. + :rtype: bool + """ + for flag_set in feature_flag['sets']: + if self._feature_flag_storage.is_flag_set_exist(flag_set): + return True + return False + + def _attempt_feature_flag_sync(self, fetch_options, till=None): """ Hit endpoint, update storage and return True if sync is complete. @@ -347,11 +368,10 @@ def _synchronize_legacy(self): fetched = self._read_feature_flags_from_legacy_file(self._filename) to_delete = [name for name in self._feature_flag_storage.get_split_names() if name not in fetched.keys()] - for feature_flag in fetched.values(): - self._feature_flag_storage.put(feature_flag) + to_add = [] + [to_add.append(feature_flag) for feature_flag in fetched.values()] - for feature_flag in to_delete: - self._feature_flag_storage.remove(feature_flag) + self._feature_flag_storage.update(to_add, to_delete, 0) return [] @@ -371,16 +391,18 @@ def _synchronize_json(self): self._current_json_sha = fecthed_sha if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL: return [] + to_add = [] + to_delete = [] for feature_flag in fetched: if feature_flag['status'] == splits.Status.ACTIVE.value: parsed = splits.from_raw(feature_flag) - self._feature_flag_storage.put(parsed) + to_add.append(parsed) _LOGGER.debug("feature flag %s is updated", parsed.name) segment_list.update(set(parsed.get_segment_names())) else: - self._feature_flag_storage.remove(feature_flag['name']) + to_delete.append(feature_flag['name']) - self._feature_flag_storage.set_change_number(till) + self._feature_flag_storage.update(to_add, to_delete, till) return segment_list except Exception as exc: raise ValueError("Error reading feature flags from json.") from exc diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 3b5a4251..59c57f01 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -251,6 +251,7 @@ def __init__(self, split_synchronizers, split_tasks): self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task) if self._split_tasks.clear_filter_task: self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task) + self._break_sync_all = False @property def split_sync(self): @@ -289,6 +290,7 @@ def synchronize_splits(self, till, sync_segments=True): :returns: whether the synchronization was successful or not. :rtype: bool """ + self._break_sync_all = False _LOGGER.debug('Starting feature flags synchronization') try: new_segments = [] @@ -304,7 +306,9 @@ def synchronize_splits(self, till, sync_segments=True): else: _LOGGER.debug('Segment sync scheduled.') return True - except APIException: + except APIException as exc: + if exc._status_code is not None and exc._status_code == 414: + self._break_sync_all = True _LOGGER.error('Failed syncing feature flags') _LOGGER.debug('Error: ', exc_info=True) return False @@ -334,7 +338,7 @@ def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES): _LOGGER.debug('Error: ', exc_info=True) if max_retry_attempts != _SYNC_ALL_NO_RETRIES: retry_attempts += 1 - if retry_attempts > max_retry_attempts: + if retry_attempts > max_retry_attempts or self._break_sync_all: break how_long = self._backoff.get() time.sleep(how_long) diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 061159d4..a0e7fff3 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -213,36 +213,88 @@ def test_kill_locally(self): storage.kill_locally('some_split', 'default_treatment', 3) assert storage.get('some_split').change_number == 3 - def test_flag_sets(self): - storage = InMemorySplitStorage() - assert storage._sets_feature_flag_map == {} + def test_flag_sets_with_config_sets(self): + storage = InMemorySplitStorage(['set10', 'set02', 'set05']) + assert storage.config_flag_sets_used == 3 + assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()} split1 = Split('split1', 123456789, False, 'some', 'traffic_type', 'ACTIVE', 1, sets=['set10', 'set02']) split2 = Split('split2', 123456789, False, 'some', 'traffic_type', 'ACTIVE', 1, sets=['set05', 'set02']) + split3 = Split('split3', 123456789, False, 'some', 'traffic_type', + 'ACTIVE', 1, sets=['set04', 'set05']) storage.update([split1], [], 1) assert storage.get_feature_flags_by_set('set10') == ['split1'] assert storage.get_feature_flags_by_set('set02') == ['split1'] + assert storage.is_flag_set_exist('set10') + assert storage.is_flag_set_exist('set02') + assert not storage.is_flag_set_exist('set03') storage.update([split2], [], 1) assert storage.get_feature_flags_by_set('set05') == ['split2'] assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2'] + assert storage.is_flag_set_exist('set05') storage.update([], [split2.name], 1) - assert 'set5' not in storage._sets_feature_flag_map + assert storage.is_flag_set_exist('set05') assert storage.get_feature_flags_by_set('set02') == ['split1'] assert storage.get_feature_flags_by_set('set05') == [] split1 = Split('split1', 123456789, False, 'some', 'traffic_type', 'ACTIVE', 1, sets=['set02']) storage.update([split1], [], 1) - assert 'set10' not in storage._sets_feature_flag_map + assert storage.is_flag_set_exist('set10') assert storage.get_feature_flags_by_set('set02') == ['split1'] storage.update([], [split1.name], 1) + assert storage.get_feature_flags_by_set('set02') == [] + assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()} + + storage.update([split3], [], 1) + assert storage.get_feature_flags_by_set('set05') == ['split3'] + assert not storage.is_flag_set_exist('set04') + + def test_flag_sets_withut_config_sets(self): + storage = InMemorySplitStorage() assert storage._sets_feature_flag_map == {} + assert storage.config_flag_sets_used == 0 + + split1 = Split('split1', 123456789, False, 'some', 'traffic_type', + 'ACTIVE', 1, sets=['set10', 'set02']) + split2 = Split('split2', 123456789, False, 'some', 'traffic_type', + 'ACTIVE', 1, sets=['set05', 'set02']) + split3 = Split('split3', 123456789, False, 'some', 'traffic_type', + 'ACTIVE', 1, sets=['set04', 'set05']) + storage.update([split1], [], 1) + assert storage.get_feature_flags_by_set('set10') == ['split1'] + assert storage.get_feature_flags_by_set('set02') == ['split1'] + assert storage.is_flag_set_exist('set10') + assert storage.is_flag_set_exist('set02') + assert not storage.is_flag_set_exist('set03') + + storage.update([split2], [], 1) + assert storage.get_feature_flags_by_set('set05') == ['split2'] + assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2'] + assert storage.is_flag_set_exist('set05') + + storage.update([], [split2.name], 1) + assert not storage.is_flag_set_exist('set05') + assert storage.get_feature_flags_by_set('set02') == ['split1'] + + split1 = Split('split1', 123456789, False, 'some', 'traffic_type', + 'ACTIVE', 1, sets=['set02']) + storage.update([split1], [], 1) + assert not storage.is_flag_set_exist('set10') + assert storage.get_feature_flags_by_set('set02') == ['split1'] + + storage.update([], [split1.name], 1) assert storage.get_feature_flags_by_set('set02') == [] + assert storage._sets_feature_flag_map == {} + + storage.update([split3], [], 1) + assert storage.get_feature_flags_by_set('set05') == ['split3'] + assert storage.get_feature_flags_by_set('set04') == ['split3'] class InMemorySegmentStorageTests(object): diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 9799ba4d..69df2bec 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -13,12 +13,49 @@ from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer, LocalhostMode from tests.integration import splits_json +splits = [{ + 'changeNumber': 123, + 'trafficTypeName': 'user', + 'name': 'some_name', + 'trafficAllocation': 100, + 'trafficAllocationSeed': 123456, + 'seed': 321654, + 'status': 'ACTIVE', + 'killed': False, + 'defaultTreatment': 'off', + 'algo': 2, + 'conditions': [ + { + 'partitions': [ + {'treatment': 'on', 'size': 50}, + {'treatment': 'off', 'size': 50} + ], + 'contitionType': 'WHITELIST', + 'label': 'some_label', + 'matcherGroup': { + 'matchers': [ + { + 'matcherType': 'WHITELIST', + 'whitelistMatcherData': { + 'whitelist': ['k1', 'k2', 'k3'] + }, + 'negate': False, + } + ], + 'combiner': 'AND' + } + } + ], + 'sets': ['set1', 'set2'] +}] + + class SplitsSynchronizerTests(object): """Split synchronizer test cases.""" def test_synchronize_splits_error(self, mocker): """Test that if fetching splits fails at some_point, the task will continue running.""" - storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=InMemorySplitStorage) api = mocker.Mock() def run(x, c): @@ -34,7 +71,7 @@ def run(x, c): def test_synchronize_splits(self, mocker): """Test split sync.""" - storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=InMemorySplitStorage) def change_number_mock(): change_number_mock._calls += 1 @@ -43,43 +80,9 @@ def change_number_mock(): return 123 change_number_mock._calls = 0 storage.get_change_number.side_effect = change_number_mock + storage.config_flag_sets_used = 0 api = mocker.Mock() - splits = [{ - 'changeNumber': 123, - 'trafficTypeName': 'user', - 'name': 'some_name', - 'trafficAllocation': 100, - 'trafficAllocationSeed': 123456, - 'seed': 321654, - 'status': 'ACTIVE', - 'killed': False, - 'defaultTreatment': 'off', - 'algo': 2, - 'conditions': [ - { - 'partitions': [ - {'treatment': 'on', 'size': 50}, - {'treatment': 'off', 'size': 50} - ], - 'contitionType': 'WHITELIST', - 'label': 'some_label', - 'matcherGroup': { - 'matchers': [ - { - 'matcherType': 'WHITELIST', - 'whitelistMatcherData': { - 'whitelist': ['k1', 'k2', 'k3'] - }, - 'negate': False, - } - ], - 'combiner': 'AND' - } - } - ] - }] - def get_changes(*args, **kwargs): get_changes.called += 1 @@ -104,13 +107,13 @@ def get_changes(*args, **kwargs): assert mocker.call(-1, FetchOptions(True)) in api.fetch_splits.mock_calls assert mocker.call(123, FetchOptions(True)) in api.fetch_splits.mock_calls - inserted_split = storage.put.mock_calls[0][1][0] + inserted_split = storage.update.mock_calls[0][1][0][0] assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' def test_not_called_on_till(self, mocker): """Test that sync is not called when till is less than previous changenumber""" - storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=InMemorySplitStorage) def change_number_mock(): return 2 @@ -134,7 +137,7 @@ def test_synchronize_splits_cdn(self, mocker): """Test split sync with bypassing cdn.""" mocker.patch('splitio.sync.split._ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES', new=3) - storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=InMemorySplitStorage) def change_number_mock(): change_number_mock._calls += 1 @@ -149,41 +152,6 @@ def change_number_mock(): storage.get_change_number.side_effect = change_number_mock api = mocker.Mock() - splits = [{ - 'changeNumber': 123, - 'trafficTypeName': 'user', - 'name': 'some_name', - 'trafficAllocation': 100, - 'trafficAllocationSeed': 123456, - 'seed': 321654, - 'status': 'ACTIVE', - 'killed': False, - 'defaultTreatment': 'off', - 'algo': 2, - 'conditions': [ - { - 'partitions': [ - {'treatment': 'on', 'size': 50}, - {'treatment': 'off', 'size': 50} - ], - 'contitionType': 'WHITELIST', - 'label': 'some_label', - 'matcherGroup': { - 'matchers': [ - { - 'matcherType': 'WHITELIST', - 'whitelistMatcherData': { - 'whitelist': ['k1', 'k2', 'k3'] - }, - 'negate': False, - } - ], - 'combiner': 'AND' - } - } - ] - }] - def get_changes(*args, **kwargs): get_changes.called += 1 if get_changes.called == 1: @@ -199,6 +167,7 @@ def get_changes(*args, **kwargs): return { 'splits': [], 'since': 12345, 'till': 12345 } get_changes.called = 0 api.fetch_splits.side_effect = get_changes + storage.config_flag_sets_used = 0 split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer._backoff = Backoff(1, 1) @@ -212,10 +181,92 @@ def get_changes(*args, **kwargs): assert mocker.call(12345, FetchOptions(True, 1234)) in api.fetch_splits.mock_calls assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till) - inserted_split = storage.put.mock_calls[0][1][0] + inserted_split = storage.update.mock_calls[0][1][0][0] assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name' + def test_sync_flag_sets_with_config_sets(self, mocker): + """Test split sync with flag sets.""" + storage = InMemorySplitStorage(['set1', 'set2']) + + split = splits[0].copy() + split['name'] = 'second' + splits1 = [splits[0].copy(), split] + splits2 = splits.copy() + splits3 = splits.copy() + splits4 = splits.copy() + api = mocker.Mock() + def get_changes(*args, **kwargs): + get_changes.called += 1 + if get_changes.called == 1: + return { 'splits': splits1, 'since': 123, 'till': 123 } + elif get_changes.called == 2: + splits2[0]['sets'] = ['set3'] + return { 'splits': splits2, 'since': 124, 'till': 124 } + elif get_changes.called == 3: + splits3[0]['sets'] = ['set1'] + return { 'splits': splits3, 'since': 12434, 'till': 12434 } + splits4[0]['sets'] = ['set6'] + splits4[0]['name'] = 'new_split' + return { 'splits': splits4, 'since': 12438, 'till': 12438 } + get_changes.called = 0 + api.fetch_splits.side_effect = get_changes + + split_synchronizer = SplitSynchronizer(api, storage) + split_synchronizer._backoff = Backoff(1, 1) + split_synchronizer.synchronize_splits() + assert isinstance(storage.get('some_name'), Split) + + split_synchronizer.synchronize_splits(124) + assert storage.get('some_name') == None + + split_synchronizer.synchronize_splits(12434) + assert isinstance(storage.get('some_name'), Split) + + split_synchronizer.synchronize_splits(12438) + assert storage.get('new_name') == None + + def test_sync_flag_sets_without_config_sets(self, mocker): + """Test split sync with flag sets.""" + storage = InMemorySplitStorage() + + split = splits[0].copy() + split['name'] = 'second' + splits1 = [splits[0].copy(), split] + splits2 = splits.copy() + splits3 = splits.copy() + splits4 = splits.copy() + api = mocker.Mock() + def get_changes(*args, **kwargs): + get_changes.called += 1 + if get_changes.called == 1: + return { 'splits': splits1, 'since': 123, 'till': 123 } + elif get_changes.called == 2: + splits2[0]['sets'] = ['set3'] + return { 'splits': splits2, 'since': 124, 'till': 124 } + elif get_changes.called == 3: + splits3[0]['sets'] = ['set1'] + return { 'splits': splits3, 'since': 12434, 'till': 12434 } + splits4[0]['sets'] = ['set6'] + splits4[0]['name'] = 'third_split' + return { 'splits': splits4, 'since': 12438, 'till': 12438 } + get_changes.called = 0 + api.fetch_splits.side_effect = get_changes + + split_synchronizer = SplitSynchronizer(api, storage) + split_synchronizer._backoff = Backoff(1, 1) + split_synchronizer.synchronize_splits() + assert isinstance(storage.get('new_split'), Split) + + split_synchronizer.synchronize_splits(124) + assert isinstance(storage.get('new_split'), Split) + + split_synchronizer.synchronize_splits(12434) + assert isinstance(storage.get('new_split'), Split) + + split_synchronizer.synchronize_splits(12438) + assert isinstance(storage.get('third_split'), Split) + class LocalSplitsSynchronizerTests(object): """Split synchronizer test cases.""" diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index c57c9453..70c61ff2 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -2,7 +2,7 @@ from turtle import clear import unittest.mock as mock - +import pytest from splitio.sync.synchronizer import Synchronizer, SplitTasks, SplitSynchronizers, LocalhostSynchronizer from splitio.tasks.split_sync import SplitSynchronizationTask from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask @@ -38,6 +38,26 @@ def run(x, c): # test forcing to have only one retry attempt and then exit sychronizer.sync_all(1) # sync_all should not throw! + def test_sync_all_failed_splits_with_flagsets(self, mocker): + api = mocker.Mock() + storage = mocker.Mock() + + def run(x, c): + raise APIException("something broke", 414) + api.fetch_splits.side_effect = run + + split_sync = SplitSynchronizer(api, storage) + split_synchronizers = SplitSynchronizers(split_sync, mocker.Mock(), mocker.Mock(), + mocker.Mock(), mocker.Mock()) + synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) + + synchronizer.synchronize_splits(None) # APIExceptions are handled locally and should not be propagated! + + # test forcing to have only one retry attempt and then exit + synchronizer.sync_all(3) # sync_all should not throw! + assert synchronizer._break_sync_all + assert synchronizer._backoff._attempt == 0 + def test_sync_all_failed_segments(self, mocker): api = mocker.Mock() storage = mocker.Mock() @@ -141,6 +161,7 @@ def test_sync_all(self, mocker): split_storage = mocker.Mock(spec=SplitStorage) split_storage.get_change_number.return_value = 123 split_storage.get_segment_names.return_value = ['segmentA'] + split_storage.config_flag_sets_used = 0 split_api = mocker.Mock() split_api.fetch_splits.return_value = {'splits': self.splits, 'since': 123, 'till': 123} @@ -159,7 +180,7 @@ def test_sync_all(self, mocker): synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) synchronizer.sync_all() - inserted_split = split_storage.put.mock_calls[0][1][0] + inserted_split = split_storage.update.mock_calls[0][1][0][0] assert isinstance(inserted_split, Split) assert inserted_split.name == 'some_name'