Module amplitude_experiment.remote.client
Expand source code
import json
import logging
import threading
import time
from time import sleep
from typing import Any
from .config import RemoteEvaluationConfig
from ..connection_pool import HTTPConnectionPool
from ..user import User
from ..variant import Variant
from ..version import __version__
class RemoteEvaluationClient:
"""Main client for fetching variant data."""
def __init__(self, api_key, config=None):
"""
Creates a new Experiment Client instance.
Parameters:
api_key (str): The environment API Key
config (RemoteEvaluationConfig): Config Object
Returns:
Experiment Client instance.
"""
if not api_key:
raise ValueError("Experiment API key is empty")
self.api_key = api_key
self.config = config or RemoteEvaluationConfig()
self.logger = logging.getLogger("Amplitude")
self.logger.addHandler(logging.StreamHandler())
if self.config.debug:
self.logger.setLevel(logging.DEBUG)
self.__setup_connection_pool()
def fetch(self, user: User):
"""
Fetch all variants for a user synchronous.This method will automatically retry if configured.
Parameters:
user (User): The Experiment User
Returns:
Variants Dictionary.
"""
try:
return self.__fetch_internal(user)
except Exception as e:
self.logger.error(f"[Experiment] Failed to fetch variants: {e}")
return {}
def fetch_async(self, user: User, callback=None):
"""
Fetch all variants for a user asynchronous. Will trigger callback after fetch complete
Parameters:
user (User): The Experiment User
callback (callable): Callback function, takes user and variants arguments
"""
thread = threading.Thread(target=self.__fetch_async_internal, args=(user, callback))
thread.start()
def __fetch_async_internal(self, user, callback):
try:
variants = self.__fetch_internal(user)
if callback:
callback(user, variants)
return variants
except Exception as e:
self.logger.error(f"[Experiment] Failed to fetch variants: {e}")
if callback:
callback(user, {})
return {}
def __fetch_internal(self, user):
self.logger.debug(f"[Experiment] Fetching variants for user: {user}")
try:
return self.__do_fetch(user)
except Exception as e:
self.logger.error(f"[Experiment] Fetch failed: {e}")
return self.__retry_fetch(user)
def __retry_fetch(self, user):
if self.config.fetch_retries == 0:
return {}
self.logger.debug("[Experiment] Retrying fetch")
err = None
delay_millis = self.config.fetch_retry_backoff_min_millis
for i in range(self.config.fetch_retries):
sleep(delay_millis / 1000.0)
try:
return self.__do_fetch(user)
except Exception as e:
self.logger.error(f"[Experiment] Retry failed: {e}")
err = e
delay_millis = min(delay_millis * self.config.fetch_retry_backoff_scalar,
self.config.fetch_retry_backoff_max_millis)
raise err
def __do_fetch(self, user):
start = time.time()
user_context = self.__add_context(user)
headers = {
'Authorization': f"Api-Key {self.api_key}",
'Content-Type': 'application/json;charset=utf-8'
}
conn = self._connection_pool.acquire()
body = user_context.to_json().encode('utf8')
if len(body) > 8000:
self.logger.warning(f"[Experiment] encoded user object length ${len(body)} "
f"cannot be cached by CDN; must be < 8KB")
self.logger.debug(f"[Experiment] Fetch variants for user: {str(user_context)}")
try:
response = conn.request('POST', '/sdk/vardata', body, headers)
elapsed = '%.3f' % ((time.time() - start) * 1000)
self.logger.debug(f"[Experiment] Fetch complete in {elapsed} ms")
json_response = json.loads(response.read().decode("utf8"))
variants = self.__parse_json_variants(json_response)
self.logger.debug(f"[Experiment] Fetched variants: {json.dumps(variants, default=str)}")
return variants
finally:
self._connection_pool.release(conn)
def __setup_connection_pool(self):
scheme, _, host = self.config.server_url.split('/', 3)
timeout = self.config.fetch_timeout_millis / 1000
self._connection_pool = HTTPConnectionPool(host, max_size=1, idle_timeout=30,
read_timeout=timeout, scheme=scheme)
def close(self) -> None:
"""
Close resource like connection pool with client
"""
self._connection_pool.close()
def __enter__(self) -> 'RemoteEvaluationClient':
return self
def __exit__(self, *exit_info: Any) -> None:
self.close()
def __add_context(self, user):
user = user or {}
user.library = user.library or f"experiment-python-server/{__version__}"
return user
def __parse_json_variants(self, json_response):
variants = {}
for key, value in json_response.items():
variant_value = ''
if 'value' in value:
variant_value = value['value']
elif 'key' in value:
variant_value = value['key']
variants[key] = Variant(variant_value, value.get('payload'))
return variants
Classes
class RemoteEvaluationClient (api_key, config=None)
-
Main client for fetching variant data.
Creates a new Experiment Client instance. Parameters: api_key (str): The environment API Key config (RemoteEvaluationConfig): Config Object
Returns: Experiment Client instance.
Expand source code
class RemoteEvaluationClient: """Main client for fetching variant data.""" def __init__(self, api_key, config=None): """ Creates a new Experiment Client instance. Parameters: api_key (str): The environment API Key config (RemoteEvaluationConfig): Config Object Returns: Experiment Client instance. """ if not api_key: raise ValueError("Experiment API key is empty") self.api_key = api_key self.config = config or RemoteEvaluationConfig() self.logger = logging.getLogger("Amplitude") self.logger.addHandler(logging.StreamHandler()) if self.config.debug: self.logger.setLevel(logging.DEBUG) self.__setup_connection_pool() def fetch(self, user: User): """ Fetch all variants for a user synchronous.This method will automatically retry if configured. Parameters: user (User): The Experiment User Returns: Variants Dictionary. """ try: return self.__fetch_internal(user) except Exception as e: self.logger.error(f"[Experiment] Failed to fetch variants: {e}") return {} def fetch_async(self, user: User, callback=None): """ Fetch all variants for a user asynchronous. Will trigger callback after fetch complete Parameters: user (User): The Experiment User callback (callable): Callback function, takes user and variants arguments """ thread = threading.Thread(target=self.__fetch_async_internal, args=(user, callback)) thread.start() def __fetch_async_internal(self, user, callback): try: variants = self.__fetch_internal(user) if callback: callback(user, variants) return variants except Exception as e: self.logger.error(f"[Experiment] Failed to fetch variants: {e}") if callback: callback(user, {}) return {} def __fetch_internal(self, user): self.logger.debug(f"[Experiment] Fetching variants for user: {user}") try: return self.__do_fetch(user) except Exception as e: self.logger.error(f"[Experiment] Fetch failed: {e}") return self.__retry_fetch(user) def __retry_fetch(self, user): if self.config.fetch_retries == 0: return {} self.logger.debug("[Experiment] Retrying fetch") err = None delay_millis = self.config.fetch_retry_backoff_min_millis for i in range(self.config.fetch_retries): sleep(delay_millis / 1000.0) try: return self.__do_fetch(user) except Exception as e: self.logger.error(f"[Experiment] Retry failed: {e}") err = e delay_millis = min(delay_millis * self.config.fetch_retry_backoff_scalar, self.config.fetch_retry_backoff_max_millis) raise err def __do_fetch(self, user): start = time.time() user_context = self.__add_context(user) headers = { 'Authorization': f"Api-Key {self.api_key}", 'Content-Type': 'application/json;charset=utf-8' } conn = self._connection_pool.acquire() body = user_context.to_json().encode('utf8') if len(body) > 8000: self.logger.warning(f"[Experiment] encoded user object length ${len(body)} " f"cannot be cached by CDN; must be < 8KB") self.logger.debug(f"[Experiment] Fetch variants for user: {str(user_context)}") try: response = conn.request('POST', '/sdk/vardata', body, headers) elapsed = '%.3f' % ((time.time() - start) * 1000) self.logger.debug(f"[Experiment] Fetch complete in {elapsed} ms") json_response = json.loads(response.read().decode("utf8")) variants = self.__parse_json_variants(json_response) self.logger.debug(f"[Experiment] Fetched variants: {json.dumps(variants, default=str)}") return variants finally: self._connection_pool.release(conn) def __setup_connection_pool(self): scheme, _, host = self.config.server_url.split('/', 3) timeout = self.config.fetch_timeout_millis / 1000 self._connection_pool = HTTPConnectionPool(host, max_size=1, idle_timeout=30, read_timeout=timeout, scheme=scheme) def close(self) -> None: """ Close resource like connection pool with client """ self._connection_pool.close() def __enter__(self) -> 'RemoteEvaluationClient': return self def __exit__(self, *exit_info: Any) -> None: self.close() def __add_context(self, user): user = user or {} user.library = user.library or f"experiment-python-server/{__version__}" return user def __parse_json_variants(self, json_response): variants = {} for key, value in json_response.items(): variant_value = '' if 'value' in value: variant_value = value['value'] elif 'key' in value: variant_value = value['key'] variants[key] = Variant(variant_value, value.get('payload')) return variants
Methods
def close(self) ‑> None
-
Close resource like connection pool with client
Expand source code
def close(self) -> None: """ Close resource like connection pool with client """ self._connection_pool.close()
def fetch(self, user: User)
-
Fetch all variants for a user synchronous.This method will automatically retry if configured. Parameters: user (User): The Experiment User
Returns: Variants Dictionary.
Expand source code
def fetch(self, user: User): """ Fetch all variants for a user synchronous.This method will automatically retry if configured. Parameters: user (User): The Experiment User Returns: Variants Dictionary. """ try: return self.__fetch_internal(user) except Exception as e: self.logger.error(f"[Experiment] Failed to fetch variants: {e}") return {}
def fetch_async(self, user: User, callback=None)
-
Fetch all variants for a user asynchronous. Will trigger callback after fetch complete Parameters: user (User): The Experiment User callback (callable): Callback function, takes user and variants arguments
Expand source code
def fetch_async(self, user: User, callback=None): """ Fetch all variants for a user asynchronous. Will trigger callback after fetch complete Parameters: user (User): The Experiment User callback (callable): Callback function, takes user and variants arguments """ thread = threading.Thread(target=self.__fetch_async_internal, args=(user, callback)) thread.start()