Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
class InMemorySplitStorage(SplitStorage):
"""InMemory implementation of a split storage."""

def __init__(self):
def __init__(self, flag_sets=[]):
"""Constructor."""
self._lock = threading.RLock()
self._splits = {}
self._change_number = -1
self._traffic_types = Counter()
self._sets_feature_flag_map = {}
self.config_flag_sets_used = len(flag_sets)
for flag_set in flag_sets:
self._sets_feature_flag_map[flag_set] = set()

def get(self, split_name):
"""
Expand Down Expand Up @@ -73,13 +76,15 @@ def _put(self, split):
"""
with self._lock:
if split.name in self._splits:
self._remove_flag_sets(self._splits[split.name])
self._remove_from_flag_sets(self._splits[split.name])
self._decrease_traffic_type_count(self._splits[split.name].traffic_type_name)
self._splits[split.name] = split
self._increase_traffic_type_count(split.traffic_type_name)
if split.sets is not None:
for flag_set in split.sets:
if flag_set not in self._sets_feature_flag_map.keys():
if self.config_flag_sets_used > 0:
continue
self._sets_feature_flag_map[flag_set] = set()
self._sets_feature_flag_map[flag_set].add(split.name)

Expand All @@ -101,10 +106,10 @@ def _remove(self, split_name):

self._splits.pop(split_name)
self._decrease_traffic_type_count(split.traffic_type_name)
self._remove_flag_sets(split)
self._remove_from_flag_sets(split)
return True

def _remove_flag_sets(self, feature_flag):
def _remove_from_flag_sets(self, feature_flag):
"""
Remove flag sets associated to a split

Expand All @@ -114,7 +119,7 @@ def _remove_flag_sets(self, feature_flag):
if feature_flag.sets is not None:
Comment thread
EmilianoSanchez marked this conversation as resolved.
for flag_set in feature_flag.sets:
self._sets_feature_flag_map[flag_set].remove(feature_flag.name)
if len(self._sets_feature_flag_map[flag_set]) == 0:
if len(self._sets_feature_flag_map[flag_set]) == 0 and self.config_flag_sets_used == 0:
del self._sets_feature_flag_map[flag_set]

def get_feature_flags_by_set(self, set):
Expand Down Expand Up @@ -232,6 +237,20 @@ def _decrease_traffic_type_count(self, traffic_type_name):
self._traffic_types.subtract([traffic_type_name])
self._traffic_types += Counter()

def is_flag_set_exist(self, flag_set):
"""
Return whether a flag set exists in at least one feature flag in cache.

:param flag_set: Flag set to validate.
:type flag_set: str

:return: True if the flag_set exist. False otherwise.
:rtype: bool
"""
if flag_set in self._sets_feature_flag_map.keys():
return True
return False


class InMemorySegmentStorage(SegmentStorage):
"""In-memory implementation of a segment storage."""
Expand Down
44 changes: 33 additions & 11 deletions splitio/sync/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,38 @@ def _fetch_until(self, fetch_options, till=None):
_LOGGER.debug('Exception information: ', exc_info=True)
raise exc

to_add = []
to_delete = []
for feature_flag in feature_flag_changes.get('splits', []):
if feature_flag['status'] == splits.Status.ACTIVE.value:
if (self._feature_flag_storage.config_flag_sets_used == 0 and feature_flag['status'] == splits.Status.ACTIVE.value) or \
(feature_flag['status'] == splits.Status.ACTIVE.value and self._check_flag_sets(feature_flag)):
parsed = splits.from_raw(feature_flag)
self._feature_flag_storage.put(parsed)
to_add.append(parsed)
segment_list.update(set(parsed.get_segment_names()))
else:
self._feature_flag_storage.remove(feature_flag['name'])
self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
if self._feature_flag_storage.get(feature_flag['name']) is not None:
to_delete.append(feature_flag['name'])

self._feature_flag_storage.update(to_add, to_delete, feature_flag_changes['till'])
if feature_flag_changes['till'] == feature_flag_changes['since']:
return feature_flag_changes['till'], segment_list

def _check_flag_sets(self, feature_flag):
"""
Check all flag sets in a feature flag, return True if any of sets exist in storage

:param feature_flag: Flag set to validate.
:type feature_flag: json

:return: True if any of its flag_set exist. False otherwise.
:rtype: bool
"""
for flag_set in feature_flag['sets']:
if self._feature_flag_storage.is_flag_set_exist(flag_set):
return True
return False


def _attempt_feature_flag_sync(self, fetch_options, till=None):
"""
Hit endpoint, update storage and return True if sync is complete.
Expand Down Expand Up @@ -347,11 +368,10 @@ def _synchronize_legacy(self):
fetched = self._read_feature_flags_from_legacy_file(self._filename)
to_delete = [name for name in self._feature_flag_storage.get_split_names()
if name not in fetched.keys()]
for feature_flag in fetched.values():
self._feature_flag_storage.put(feature_flag)
to_add = []
[to_add.append(feature_flag) for feature_flag in fetched.values()]

for feature_flag in to_delete:
self._feature_flag_storage.remove(feature_flag)
self._feature_flag_storage.update(to_add, to_delete, 0)

return []

Expand All @@ -371,16 +391,18 @@ def _synchronize_json(self):
self._current_json_sha = fecthed_sha
if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
return []
to_add = []
to_delete = []
for feature_flag in fetched:
if feature_flag['status'] == splits.Status.ACTIVE.value:
parsed = splits.from_raw(feature_flag)
self._feature_flag_storage.put(parsed)
to_add.append(parsed)
_LOGGER.debug("feature flag %s is updated", parsed.name)
segment_list.update(set(parsed.get_segment_names()))
else:
self._feature_flag_storage.remove(feature_flag['name'])
to_delete.append(feature_flag['name'])

self._feature_flag_storage.set_change_number(till)
self._feature_flag_storage.update(to_add, to_delete, till)
return segment_list
except Exception as exc:
raise ValueError("Error reading feature flags from json.") from exc
Expand Down
8 changes: 6 additions & 2 deletions splitio/sync/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def __init__(self, split_synchronizers, split_tasks):
self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task)
if self._split_tasks.clear_filter_task:
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
self._break_sync_all = False

@property
def split_sync(self):
Expand Down Expand Up @@ -289,6 +290,7 @@ def synchronize_splits(self, till, sync_segments=True):
:returns: whether the synchronization was successful or not.
:rtype: bool
"""
self._break_sync_all = False
_LOGGER.debug('Starting feature flags synchronization')
try:
new_segments = []
Expand All @@ -304,7 +306,9 @@ def synchronize_splits(self, till, sync_segments=True):
else:
_LOGGER.debug('Segment sync scheduled.')
return True
except APIException:
except APIException as exc:
if exc._status_code is not None and exc._status_code == 414:
self._break_sync_all = True
_LOGGER.error('Failed syncing feature flags')
_LOGGER.debug('Error: ', exc_info=True)
return False
Expand Down Expand Up @@ -334,7 +338,7 @@ def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
_LOGGER.debug('Error: ', exc_info=True)
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
retry_attempts += 1
if retry_attempts > max_retry_attempts:
if retry_attempts > max_retry_attempts or self._break_sync_all:
break
how_long = self._backoff.get()
time.sleep(how_long)
Expand Down
62 changes: 57 additions & 5 deletions tests/storage/test_inmemory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,36 +213,88 @@ def test_kill_locally(self):
storage.kill_locally('some_split', 'default_treatment', 3)
assert storage.get('some_split').change_number == 3

def test_flag_sets(self):
storage = InMemorySplitStorage()
assert storage._sets_feature_flag_map == {}
def test_flag_sets_with_config_sets(self):
storage = InMemorySplitStorage(['set10', 'set02', 'set05'])
assert storage.config_flag_sets_used == 3
assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()}

split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set10', 'set02'])
split2 = Split('split2', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set05', 'set02'])
split3 = Split('split3', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set04', 'set05'])
storage.update([split1], [], 1)
assert storage.get_feature_flags_by_set('set10') == ['split1']
assert storage.get_feature_flags_by_set('set02') == ['split1']
assert storage.is_flag_set_exist('set10')
assert storage.is_flag_set_exist('set02')
assert not storage.is_flag_set_exist('set03')

storage.update([split2], [], 1)
assert storage.get_feature_flags_by_set('set05') == ['split2']
assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2']
assert storage.is_flag_set_exist('set05')

storage.update([], [split2.name], 1)
assert 'set5' not in storage._sets_feature_flag_map
assert storage.is_flag_set_exist('set05')
assert storage.get_feature_flags_by_set('set02') == ['split1']
assert storage.get_feature_flags_by_set('set05') == []

split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set02'])
storage.update([split1], [], 1)
assert 'set10' not in storage._sets_feature_flag_map
assert storage.is_flag_set_exist('set10')
assert storage.get_feature_flags_by_set('set02') == ['split1']

storage.update([], [split1.name], 1)
assert storage.get_feature_flags_by_set('set02') == []
assert storage._sets_feature_flag_map == {'set10': set(), 'set02': set(), 'set05': set()}

storage.update([split3], [], 1)
assert storage.get_feature_flags_by_set('set05') == ['split3']
assert not storage.is_flag_set_exist('set04')

def test_flag_sets_withut_config_sets(self):
storage = InMemorySplitStorage()
assert storage._sets_feature_flag_map == {}
assert storage.config_flag_sets_used == 0

split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set10', 'set02'])
split2 = Split('split2', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set05', 'set02'])
split3 = Split('split3', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set04', 'set05'])
storage.update([split1], [], 1)
assert storage.get_feature_flags_by_set('set10') == ['split1']
assert storage.get_feature_flags_by_set('set02') == ['split1']
assert storage.is_flag_set_exist('set10')
assert storage.is_flag_set_exist('set02')
assert not storage.is_flag_set_exist('set03')

storage.update([split2], [], 1)
assert storage.get_feature_flags_by_set('set05') == ['split2']
assert sorted(storage.get_feature_flags_by_set('set02')) == ['split1', 'split2']
assert storage.is_flag_set_exist('set05')

storage.update([], [split2.name], 1)
assert not storage.is_flag_set_exist('set05')
assert storage.get_feature_flags_by_set('set02') == ['split1']

split1 = Split('split1', 123456789, False, 'some', 'traffic_type',
'ACTIVE', 1, sets=['set02'])
storage.update([split1], [], 1)
assert not storage.is_flag_set_exist('set10')
assert storage.get_feature_flags_by_set('set02') == ['split1']

storage.update([], [split1.name], 1)
assert storage.get_feature_flags_by_set('set02') == []
assert storage._sets_feature_flag_map == {}

storage.update([split3], [], 1)
assert storage.get_feature_flags_by_set('set05') == ['split3']
assert storage.get_feature_flags_by_set('set04') == ['split3']


class InMemorySegmentStorageTests(object):
Expand Down
Loading