Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion splitio/client/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Default settings for the Split.IO SDK Python client."""
import os.path
import logging
import re

from splitio.engine.impressions import ImpressionsMode
from splitio.client.input_validator import validate_flag_sets
Expand Down
44 changes: 12 additions & 32 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%
_MAX_RETRY_SYNC_ALL = 3
_FLAG_SETS_LOCK = threading.RLock()
_TOTAL_FLAG_SETS = 0
_INVALID_FLAG_SETS = 0


class Status(Enum):
Expand Down Expand Up @@ -315,7 +312,8 @@ def _wrap_impression_listener(listener, metadata):


def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None,
total_flag_sets=0, invalid_flag_sets=0):
"""Build and return a split factory tailored to the supplied config."""
if not input_validator.validate_factory_instantiation(api_key):
return None
Expand Down Expand Up @@ -419,10 +417,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
telemetry_evaluation_producer
)

telemetry_init_producer.record_config(cfg, extra_cfg)
total_flag_sets, invalid_flag_sets = _get_total_and_invalid_flag_sets()
telemetry_init_producer.record_flag_sets(total_flag_sets)
telemetry_init_producer.record_invalid_flag_sets(invalid_flag_sets)
telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)

if preforked_initialization:
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
Expand Down Expand Up @@ -501,7 +496,7 @@ def _build_redis_factory(api_key, cfg):
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

telemetry_init_producer.record_config(cfg, {})
telemetry_init_producer.record_config(cfg, {}, 0, 0)

split_factory = SplitFactory(
api_key,
Expand All @@ -514,10 +509,7 @@ def _build_redis_factory(api_key, cfg):
telemetry_init_producer=telemetry_init_producer
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
total_flag_sets, invalid_flag_sets = _get_total_and_invalid_flag_sets()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
storages['telemetry'].record_flag_sets(total_flag_sets)
storages['telemetry'].record_invalid_flag_sets(invalid_flag_sets)
telemetry_submitter.synchronize_config()

return split_factory
Expand Down Expand Up @@ -582,7 +574,7 @@ def _build_pluggable_factory(api_key, cfg):
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

telemetry_init_producer.record_config(cfg, {})
telemetry_init_producer.record_config(cfg, {}, 0, 0)

split_factory = SplitFactory(
api_key,
Expand All @@ -595,10 +587,7 @@ def _build_pluggable_factory(api_key, cfg):
telemetry_init_producer=telemetry_init_producer
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
total_flag_sets, invalid_flag_sets = _get_total_and_invalid_flag_sets()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
storages['telemetry'].record_flag_sets(total_flag_sets)
storages['telemetry'].record_invalid_flag_sets(invalid_flag_sets)
telemetry_submitter.synchronize_config()

return split_factory
Expand Down Expand Up @@ -697,13 +686,11 @@ def get_factory(api_key, **kwargs):
_INSTANTIATED_FACTORIES_LOCK.release()

config_raw = kwargs.get('config', {})
total_flag_sets = 0
invalid_flag_sets = 0
if config_raw.get('flagSetsFilter') is not None and isinstance(config_raw.get('flagSetsFilter'), list):
global _TOTAL_FLAG_SETS
global _INVALID_FLAG_SETS
_FLAG_SETS_LOCK.acquire()
_TOTAL_FLAG_SETS = len(config_raw.get('flagSetsFilter'))
_INVALID_FLAG_SETS = _TOTAL_FLAG_SETS - len(input_validator.validate_flag_sets(config_raw.get('flagSetsFilter'), 'Telemetry Init'))
_FLAG_SETS_LOCK.release()
total_flag_sets = len(config_raw.get('flagSetsFilter'))
invalid_flag_sets = total_flag_sets - len(input_validator.validate_flag_sets(config_raw.get('flagSetsFilter'), 'Telemetry Init'))

config = sanitize_config(api_key, config_raw)

Expand All @@ -721,7 +708,9 @@ def get_factory(api_key, **kwargs):
kwargs.get('events_api_base_url'),
kwargs.get('auth_api_base_url'),
kwargs.get('streaming_api_base_url'),
kwargs.get('telemetry_api_base_url'))
kwargs.get('telemetry_api_base_url'),
total_flag_sets,
invalid_flag_sets)

return split_factory

Expand All @@ -734,12 +723,3 @@ def _get_active_and_redundant_count():
active_factory_count += _INSTANTIATED_FACTORIES[item]
_INSTANTIATED_FACTORIES_LOCK.release()
return redundant_factory_count, active_factory_count

def _get_total_and_invalid_flag_sets():
total_flag_sets = 0
invalid_flag_sets = 0
_FLAG_SETS_LOCK.acquire()
total_flag_sets = _TOTAL_FLAG_SETS
invalid_flag_sets = _INVALID_FLAG_SETS
_FLAG_SETS_LOCK.release()
return total_flag_sets, invalid_flag_sets
4 changes: 2 additions & 2 deletions splitio/engine/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def __init__(self, telemetry_storage):
"""Constructor."""
self._telemetry_storage = telemetry_storage

def record_config(self, config, extra_config):
def record_config(self, config, extra_config, total_flag_sets=0, invalid_flag_sets=0):
"""Record configurations."""
self._telemetry_storage.record_config(config, extra_config)
self._telemetry_storage.record_config(config, extra_config, total_flag_sets, invalid_flag_sets)
current_app, app_worker_id = self._get_app_worker_id()
if current_app is not None:
self.add_config_tag("initilization:" + current_app)
Expand Down
40 changes: 3 additions & 37 deletions splitio/models/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def _reset_all(self):
self._flag_sets = 0
self._flag_sets_invalid = 0

def record_config(self, config, extra_config):
def record_config(self, config, extra_config, total_flag_sets, invalid_flag_sets):
"""
Record configurations.

Expand Down Expand Up @@ -829,32 +829,14 @@ def record_config(self, config, extra_config):
self._impressions_mode = self._get_impressions_mode(config[_ConfigParams.IMPRESSIONS_MODE.value])
self._impression_listener = True if config[_ConfigParams.IMPRESSIONS_LISTENER.value] is not None else False
self._http_proxy = self._check_if_proxy_detected()
self._flag_sets = total_flag_sets
self._flag_sets_invalid = invalid_flag_sets

def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
with self._lock:
self._active_factory_count = active_factory_count
self._redundant_factory_count = redundant_factory_count

def record_flag_sets(self, flag_sets):
"""
Record flag sets

:param flag_sets: flag sets count
:type flag_sets: int
"""
with self._lock:
self._flag_sets = flag_sets

def record_invalid_flag_sets(self, flag_sets):
"""
Record invalid flag sets

:param flag_sets: flag sets count
:type flag_sets: int
"""
with self._lock:
self._flag_sets_invalid = flag_sets

def record_ready_time(self, ready_time):
"""
Record ready time.
Expand All @@ -881,22 +863,6 @@ def record_not_ready_usage(self):
with self._lock:
self._not_ready += 1

def get_flag_sets(self):
"""
Get flag sets

"""
with self._lock:
return self._flag_sets

def get_invalid_flag_sets(self):
"""
Get invalid flag sets

"""
with self._lock:
return self._flag_sets_invalid

def get_bur_time_outs(self):
"""
Get block until ready timeout.
Expand Down
1 change: 0 additions & 1 deletion splitio/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Base storage interfaces."""
import abc
import threading

class SplitStorage(object, metaclass=abc.ABCMeta):
"""Split storage interface implemented as an abstract class."""
Expand Down
20 changes: 2 additions & 18 deletions splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ def _reset_config_tags(self):
with self._lock:
self._config_tags = []

def record_config(self, config, extra_config):
def record_config(self, config, extra_config, total_flag_sets, invalid_flag_sets):
"""Record configurations."""
self._tel_config.record_config(config, extra_config)
self._tel_config.record_config(config, extra_config, total_flag_sets, invalid_flag_sets)

def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
"""Record active and redundant factories."""
Expand All @@ -654,14 +654,6 @@ def record_ready_time(self, ready_time):
"""Record ready time."""
self._tel_config.record_ready_time(ready_time)

def record_flag_sets(self, flag_sets):
"""Record flag sets."""
self._tel_config.record_flag_sets(flag_sets)

def record_invalid_flag_sets(self, flag_sets):
"""Record invalid flag sets."""
self._tel_config.record_invalid_flag_sets(flag_sets)

def add_tag(self, tag):
"""Record tag string."""
with self._lock:
Expand Down Expand Up @@ -730,14 +722,6 @@ def record_update_from_sse(self, event):
"""Record update from sse."""
self._counters.record_update_from_sse(event)

def get_flag_sets(self):
"""Get flag sets."""
self._tel_config.get_flag_sets()

def get_invalid_flag_sets(self):
"""Get invalid flag sets."""
self._tel_config.get_invalid_flag_sets()

def get_bur_time_outs(self):
"""Get block until ready timeout."""
return self._tel_config.get_bur_time_outs()
Expand Down
16 changes: 3 additions & 13 deletions splitio/storage/pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def add_config_tag(self, tag):
if len(self._config_tags) < MAX_TAGS:
self._config_tags.append(tag)

def record_config(self, config, extra_config):
def record_config(self, config, extra_config, total_flag_sets, invalid_flag_sets):
"""
initilize telemetry objects

Expand All @@ -804,15 +804,7 @@ def record_config(self, config, extra_config):
:param extra_config: any extra configs
:type extra_config: Dict
"""
self._tel_config.record_config(config, extra_config)

def record_flag_sets(self, flag_sets):
"""Record flag sets."""
self._tel_config.record_flag_sets(flag_sets)

def record_invalid_flag_sets(self, flag_sets):
"""Record invalid flag sets."""
self._tel_config.record_invalid_flag_sets(flag_sets)
self._tel_config.record_config(config, extra_config, total_flag_sets, invalid_flag_sets)

def pop_config_tags(self):
"""Get and reset configs."""
Expand All @@ -833,9 +825,7 @@ def _format_config_stats(self):
'rF': config_stats['rF'],
'sT': config_stats['sT'],
'oM': config_stats['oM'],
't': self.pop_config_tags(),
'fsT': self._tel_config.get_flag_sets(),
'fsI': self._tel_config.get_invalid_flag_sets()
't': self.pop_config_tags()
})

def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
Expand Down
14 changes: 2 additions & 12 deletions splitio/storage/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,22 +662,14 @@ def add_config_tag(self, tag):
if len(self._config_tags) < MAX_TAGS:
self._config_tags.append(tag)

def record_config(self, config, extra_config):
def record_config(self, config, extra_config, total_flag_sets, invalid_flag_sets):
"""
initilize telemetry objects

:param congif: factory configuration parameters
:type config: splitio.client.config
"""
self._tel_config.record_config(config, extra_config)

def record_flag_sets(self, flag_sets):
"""Record flag sets."""
self._tel_config.record_flag_sets(flag_sets)

def record_invalid_flag_sets(self, flag_sets):
"""Record invalid flag sets."""
self._tel_config.record_invalid_flag_sets(flag_sets)
self._tel_config.record_config(config, extra_config, total_flag_sets, invalid_flag_sets)

def pop_config_tags(self):
"""Get and reset tags."""
Expand All @@ -700,8 +692,6 @@ def _format_config_stats(self):
'rF': config_stats['rF'],
'sT': config_stats['sT'],
'oM': config_stats['oM'],
'fsT': self._tel_config.get_flag_sets(),
'fsI': self._tel_config.get_invalid_flag_sets(),
't': self.pop_config_tags()
})

Expand Down
18 changes: 1 addition & 17 deletions splitio/util/storage_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,4 @@ def combine_valid_flag_sets(result_sets):
for result_set in result_sets:
if isinstance(result_set, set) and len(result_set) > 0:
to_return.update(result_set)
return to_return

def combine_valid_flag_sets(result_sets):
"""
Check each flag set in given array of sets, combine all flag sets in one unique set

:param result_sets: Flag sets set
:type flag_sets: list(set)

:return: flag sets set
:rtype: set
"""
to_return = set()
for result_set in result_sets:
if isinstance(result_set, set) and len(result_set) > 0:
to_return.update(result_set)
return to_return
return to_return
4 changes: 1 addition & 3 deletions tests/engine/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ def test_record_config(self, mocker):
'metricsRefreshRate': 10,
'storageType': None
}
telemetry_init_producer.record_config(config, {})
telemetry_init_producer.record_config(config, {}, 5, 2)
telemetry_init_producer.record_active_and_redundant_factories(1, 0)
telemetry_init_producer.record_flag_sets(5)
telemetry_init_producer.record_invalid_flag_sets(2)

assert(telemetry_storage._tel_config.get_stats() == {'oM': 0,
'sT': telemetry_storage._tel_config._get_storage_type(config['operationMode'], config['storageType']),
Expand Down
12 changes: 3 additions & 9 deletions tests/models/test_telemetry_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_telemetry_config(self):
'storageType': None,
'flagSetsFilter': None
}
telemetry_config.record_config(config, {})
telemetry_config.record_config(config, {}, 5, 2)
assert(telemetry_config.get_stats() == {'oM': 0,
'sT': telemetry_config._get_storage_type(config['operationMode'], config['storageType']),
'sE': config['streamingEnabled'],
Expand All @@ -332,16 +332,13 @@ def test_telemetry_config(self):
'bT': 0,
'aF': 0,
'rF': 0,
'fsT': 0,
'fsI': 0}
'fsT': 5,
'fsI': 2}
)

telemetry_config.record_ready_time(10)
assert(telemetry_config._time_until_ready == 10)

telemetry_config.record_flag_sets(5)
assert(telemetry_config._flag_sets == 5)

assert(telemetry_config.get_bur_time_outs() == 0)
[telemetry_config.record_bur_time_out() for i in range(2)]
assert(telemetry_config.get_bur_time_outs() == 2)
Expand All @@ -350,9 +347,6 @@ def test_telemetry_config(self):
[telemetry_config.record_not_ready_usage() for i in range(5)]
assert(telemetry_config.get_non_ready_usage() == 5)

telemetry_config.record_invalid_flag_sets(2)
assert(telemetry_config._flag_sets_invalid == 2)

os.environ["https_proxy"] = "some_host_ip"
assert(telemetry_config._check_if_proxy_detected() == True)

Expand Down
Loading