Module amplitude_experiment.local.client
Expand source code
import json
import logging
from threading import Lock
from typing import Any, List, Dict
from .config import LocalEvaluationConfig
from ..user import User
from ..connection_pool import HTTPConnectionPool
from .poller import Poller
from .evaluation.evaluation import evaluate
from ..variant import Variant
class LocalEvaluationClient:
"""Experiment client for evaluating variants for a user locally."""
def __init__(self, api_key: str, config : LocalEvaluationConfig = None):
"""
Creates a new Experiment LocalEvaluationClient instance.
Parameters:
api_key (str): The environment API Key
config (LocalEvaluationConfig): Config Object
Returns:
Experiment Client instance.
"""
if not api_key:
raise ValueError("Experiment API key is empty")
self.api_key = api_key
self.config = config or LocalEvaluationConfig()
self.logger = logging.getLogger("Amplitude")
self.logger.addHandler(logging.StreamHandler())
if self.config.debug:
self.logger.setLevel(logging.DEBUG)
self.__setup_connection_pool()
self.rules = {}
self.poller = Poller(self.config.flag_config_polling_interval_millis / 1000, self.__do_rules)
self.lock = Lock()
def start(self):
"""
Fetch initial flag configurations and start polling for updates. You must call this function to begin
polling for flag config updates.
"""
self.__do_rules()
self.poller.start()
def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant]:
"""
Locally evaluates flag variants for a user.
Parameters:
user (User): The user to evaluate
flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags from the flag cache are evaluated.
Returns:
The evaluated variants.
"""
no_flag_keys = flag_keys is None or len(flag_keys) == 0
rules = []
for key, value in self.rules.items():
if no_flag_keys or key in flag_keys:
rules.append(value)
rules_json = json.dumps(rules)
user_json = str(user)
self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Rules: {rules_json}")
result_json = evaluate(rules_json, user_json)
self.logger.debug(f"[Experiment] Evaluate Result: {result_json}")
evaluation_result = json.loads(result_json)
variants = {}
for key, value in evaluation_result.items():
if value.get('isDefaultVariant'):
continue
variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload'))
return variants
def __do_rules(self):
conn = self._connection_pool.acquire()
headers = {
'Authorization': f"Api-Key {self.api_key}",
'Content-Type': 'application/json;charset=utf-8'
}
body = None
self.logger.debug('[Experiment] Get flag configs')
try:
response = conn.request('GET', '/sdk/rules?eval_mode=local', body, headers)
response_body = response.read().decode("utf8")
if response.status != 200:
raise Exception(f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
self.logger.debug(f"[Experiment] Got flag configs: {response_body}")
parsed_rules = self.__parse(json.loads(response_body))
self.lock.acquire()
self.rules = parsed_rules
self.lock.release()
finally:
self._connection_pool.release(conn)
def __parse(self, flag_configs_array):
flag_configs_record = {}
for value in flag_configs_array:
flag_configs_record[value.get('flagKey')] = value
return flag_configs_record
def __setup_connection_pool(self):
scheme, _, host = self.config.server_url.split('/', 3)
timeout = self.config.flag_config_poller_request_timeout_millis / 1000
self._connection_pool = HTTPConnectionPool(host, max_size=1, idle_timeout=30,
read_timeout=timeout, scheme=scheme)
def stop(self) -> None:
"""
Stop polling for flag configurations. Close resource like connection pool with client
"""
self.poller.stop()
self._connection_pool.close()
def __enter__(self) -> 'LocalEvaluationClient':
return self
def __exit__(self, *exit_info: Any) -> None:
self.stop()
Classes
class LocalEvaluationClient (api_key: str, config: LocalEvaluationConfig = None)
-
Experiment client for evaluating variants for a user locally.
Creates a new Experiment LocalEvaluationClient instance. Parameters: api_key (str): The environment API Key config (LocalEvaluationConfig): Config Object
Returns: Experiment Client instance.
Expand source code
class LocalEvaluationClient: """Experiment client for evaluating variants for a user locally.""" def __init__(self, api_key: str, config : LocalEvaluationConfig = None): """ Creates a new Experiment LocalEvaluationClient instance. Parameters: api_key (str): The environment API Key config (LocalEvaluationConfig): Config Object Returns: Experiment Client instance. """ if not api_key: raise ValueError("Experiment API key is empty") self.api_key = api_key self.config = config or LocalEvaluationConfig() self.logger = logging.getLogger("Amplitude") self.logger.addHandler(logging.StreamHandler()) if self.config.debug: self.logger.setLevel(logging.DEBUG) self.__setup_connection_pool() self.rules = {} self.poller = Poller(self.config.flag_config_polling_interval_millis / 1000, self.__do_rules) self.lock = Lock() def start(self): """ Fetch initial flag configurations and start polling for updates. You must call this function to begin polling for flag config updates. """ self.__do_rules() self.poller.start() def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant]: """ Locally evaluates flag variants for a user. Parameters: user (User): The user to evaluate flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags from the flag cache are evaluated. Returns: The evaluated variants. """ no_flag_keys = flag_keys is None or len(flag_keys) == 0 rules = [] for key, value in self.rules.items(): if no_flag_keys or key in flag_keys: rules.append(value) rules_json = json.dumps(rules) user_json = str(user) self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Rules: {rules_json}") result_json = evaluate(rules_json, user_json) self.logger.debug(f"[Experiment] Evaluate Result: {result_json}") evaluation_result = json.loads(result_json) variants = {} for key, value in evaluation_result.items(): if value.get('isDefaultVariant'): continue variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload')) return variants def __do_rules(self): conn = self._connection_pool.acquire() headers = { 'Authorization': f"Api-Key {self.api_key}", 'Content-Type': 'application/json;charset=utf-8' } body = None self.logger.debug('[Experiment] Get flag configs') try: response = conn.request('GET', '/sdk/rules?eval_mode=local', body, headers) response_body = response.read().decode("utf8") if response.status != 200: raise Exception(f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}") self.logger.debug(f"[Experiment] Got flag configs: {response_body}") parsed_rules = self.__parse(json.loads(response_body)) self.lock.acquire() self.rules = parsed_rules self.lock.release() finally: self._connection_pool.release(conn) def __parse(self, flag_configs_array): flag_configs_record = {} for value in flag_configs_array: flag_configs_record[value.get('flagKey')] = value return flag_configs_record def __setup_connection_pool(self): scheme, _, host = self.config.server_url.split('/', 3) timeout = self.config.flag_config_poller_request_timeout_millis / 1000 self._connection_pool = HTTPConnectionPool(host, max_size=1, idle_timeout=30, read_timeout=timeout, scheme=scheme) def stop(self) -> None: """ Stop polling for flag configurations. Close resource like connection pool with client """ self.poller.stop() self._connection_pool.close() def __enter__(self) -> 'LocalEvaluationClient': return self def __exit__(self, *exit_info: Any) -> None: self.stop()
Methods
def evaluate(self, user: User, flag_keys: List[str] = None) ‑> Dict[str, Variant]
-
Locally evaluates flag variants for a user.
Parameters
user (User): The user to evaluate flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags from the flag cache are evaluated.
Returns: The evaluated variants.
Expand source code
def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant]: """ Locally evaluates flag variants for a user. Parameters: user (User): The user to evaluate flag_keys (List[str]): The flags to evaluate with the user. If empty, all flags from the flag cache are evaluated. Returns: The evaluated variants. """ no_flag_keys = flag_keys is None or len(flag_keys) == 0 rules = [] for key, value in self.rules.items(): if no_flag_keys or key in flag_keys: rules.append(value) rules_json = json.dumps(rules) user_json = str(user) self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Rules: {rules_json}") result_json = evaluate(rules_json, user_json) self.logger.debug(f"[Experiment] Evaluate Result: {result_json}") evaluation_result = json.loads(result_json) variants = {} for key, value in evaluation_result.items(): if value.get('isDefaultVariant'): continue variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload')) return variants
def start(self)
-
Fetch initial flag configurations and start polling for updates. You must call this function to begin polling for flag config updates.
Expand source code
def start(self): """ Fetch initial flag configurations and start polling for updates. You must call this function to begin polling for flag config updates. """ self.__do_rules() self.poller.start()
def stop(self) ‑> None
-
Stop polling for flag configurations. Close resource like connection pool with client
Expand source code
def stop(self) -> None: """ Stop polling for flag configurations. Close resource like connection pool with client """ self.poller.stop() self._connection_pool.close()