Module amplitude_experiment.connection_pool

Expand source code
import threading
import time
from typing import Any

from http.client import HTTPConnection, HTTPResponse, HTTPSConnection


class WrapperHTTPConnection:

    def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
        """
        Wrapped Http Connection, used with connection pool
        :param pool: Connection pool this connection belongs to
        :param conn: Wrapped HTTPConnection
        """
        self.pool = pool
        self.conn = conn
        self.response = None
        self.last_time = time.time()
        self.is_available = True

    def __enter__(self) -> 'WrapperHTTPConnection':
        return self

    def __exit__(self, *exit_info: Any) -> None:
        if not self.response.will_close and not self.response.is_closed():
            self.close()
        self.pool.release(self)

    def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
        self.conn.request(*args, **kwargs)
        self.response = self.conn.getresponse()
        return self.response

    def close(self) -> None:
        self.conn.close()
        self.is_available = False


class HTTPConnectionPool:

    def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None,
                 read_timeout: float = None, scheme: str = 'https') -> None:
        """
        A simple connection pool to reuse the http connections
        :param host: pass
        :param port: pass
        :param max_size: Max connection allowed
        :param idle_timeout: Idle timeout to clear the connection
        :param read_timeout: Read timeout with connection
        :param scheme: http or https
        """
        self.host = host
        self.port = port
        self.max_size = max_size
        self.idle_timeout = idle_timeout
        self.read_timeout = read_timeout
        self.scheme = scheme
        self._lock = threading.Condition()
        self._pool = []
        self.conn_num = 0
        self.is_closed = False
        self._clearer = None
        self.start_clear_conn()

    def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
        if self.is_closed:
            raise ConnectionPoolClosed
        with self._lock:
            if self.max_size is None or not self.is_full():
                if self.is_pool_empty():
                    self._put_connection(self._create_connection())
            else:
                if not blocking:
                    if self.is_pool_empty():
                        raise EmptyPoolError
                elif timeout is None:
                    while self.is_pool_empty():
                        self._lock.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    end_time = time.time() + timeout
                    while self.is_pool_empty():
                        remaining = end_time - time.time()
                        if remaining <= 0:
                            raise EmptyPoolError
                        self._lock.wait(remaining)
            return self._get_connection()

    def release(self, conn: WrapperHTTPConnection) -> None:
        if self.is_closed:
            conn.close()
            return
        with self._lock:
            if not conn.is_available:
                conn.close()
                self.conn_num -= 1
                conn = self._create_connection()
            self._put_connection(conn)
            self._lock.notify()

    def _get_connection(self) -> WrapperHTTPConnection:
        try:
            return self._pool.pop()
        except IndexError:
            raise EmptyPoolError

    def _put_connection(self, conn: WrapperHTTPConnection) -> None:
        conn.last_time = time.time()
        self._pool.append(conn)

    def _create_connection(self) -> WrapperHTTPConnection:
        self.conn_num += 1
        connection = HTTPConnection if self.scheme == 'http:' else HTTPSConnection
        return WrapperHTTPConnection(self, connection(self.host, self.port, timeout=self.read_timeout))

    def is_pool_empty(self) -> bool:
        return len(self._pool) == 0

    def is_full(self) -> bool:
        if self.max_size is None:
            return False
        return self.conn_num >= self.max_size

    def close(self) -> None:
        if self.is_closed:
            return
        self.is_closed = True
        self.stop_clear_conn()
        pool, self._pool = self._pool, None
        for conn in pool:
            conn.close()

    def clear_idle_conn(self) -> None:
        if self.is_closed:
            raise ConnectionPoolClosed
        # Staring a thread to clear idle connections
        threading.Thread(target=self._clear_idle_conn).start()

    def _clear_idle_conn(self) -> None:
        if not self._lock.acquire(timeout=self.idle_timeout):
            return
        current_time = time.time()
        if self.is_pool_empty():
            pass
        elif current_time - self._pool[-1].last_time >= self.idle_timeout:
            self.conn_num -= len(self._pool)
            self._pool.clear()
        else:
            left, right = 0, len(self._pool) - 1
            while left < right:
                mid = (left + right) // 2
                if current_time - self._pool[mid].last_time >= self.idle_timeout:
                    left = mid + 1
                else:
                    right = mid
            self._pool = self._pool[left:]
            self.conn_num -= left
        self._lock.release()

    def start_clear_conn(self) -> None:
        if self.idle_timeout is None:
            return
        self.clear_idle_conn()
        self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
        self._clearer.start()

    def stop_clear_conn(self) -> None:
        if self._clearer is not None:
            self._clearer.cancel()

    def __enter__(self) -> 'HTTPConnectionPool':
        return self

    def __exit__(self, *exit_info: Any) -> None:
        self.close()


class EmptyPoolError(Exception):
    pass


class ConnectionPoolClosed(Exception):
    pass

Classes

class ConnectionPoolClosed (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ConnectionPoolClosed(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class EmptyPoolError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class EmptyPoolError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class HTTPConnectionPool (host: str, port: int = None, max_size: int = None, idle_timeout: int = None, read_timeout: float = None, scheme: str = 'https')

A simple connection pool to reuse the http connections :param host: pass :param port: pass :param max_size: Max connection allowed :param idle_timeout: Idle timeout to clear the connection :param read_timeout: Read timeout with connection :param scheme: http or https

Expand source code
class HTTPConnectionPool:

    def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None,
                 read_timeout: float = None, scheme: str = 'https') -> None:
        """
        A simple connection pool to reuse the http connections
        :param host: pass
        :param port: pass
        :param max_size: Max connection allowed
        :param idle_timeout: Idle timeout to clear the connection
        :param read_timeout: Read timeout with connection
        :param scheme: http or https
        """
        self.host = host
        self.port = port
        self.max_size = max_size
        self.idle_timeout = idle_timeout
        self.read_timeout = read_timeout
        self.scheme = scheme
        self._lock = threading.Condition()
        self._pool = []
        self.conn_num = 0
        self.is_closed = False
        self._clearer = None
        self.start_clear_conn()

    def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
        if self.is_closed:
            raise ConnectionPoolClosed
        with self._lock:
            if self.max_size is None or not self.is_full():
                if self.is_pool_empty():
                    self._put_connection(self._create_connection())
            else:
                if not blocking:
                    if self.is_pool_empty():
                        raise EmptyPoolError
                elif timeout is None:
                    while self.is_pool_empty():
                        self._lock.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    end_time = time.time() + timeout
                    while self.is_pool_empty():
                        remaining = end_time - time.time()
                        if remaining <= 0:
                            raise EmptyPoolError
                        self._lock.wait(remaining)
            return self._get_connection()

    def release(self, conn: WrapperHTTPConnection) -> None:
        if self.is_closed:
            conn.close()
            return
        with self._lock:
            if not conn.is_available:
                conn.close()
                self.conn_num -= 1
                conn = self._create_connection()
            self._put_connection(conn)
            self._lock.notify()

    def _get_connection(self) -> WrapperHTTPConnection:
        try:
            return self._pool.pop()
        except IndexError:
            raise EmptyPoolError

    def _put_connection(self, conn: WrapperHTTPConnection) -> None:
        conn.last_time = time.time()
        self._pool.append(conn)

    def _create_connection(self) -> WrapperHTTPConnection:
        self.conn_num += 1
        connection = HTTPConnection if self.scheme == 'http:' else HTTPSConnection
        return WrapperHTTPConnection(self, connection(self.host, self.port, timeout=self.read_timeout))

    def is_pool_empty(self) -> bool:
        return len(self._pool) == 0

    def is_full(self) -> bool:
        if self.max_size is None:
            return False
        return self.conn_num >= self.max_size

    def close(self) -> None:
        if self.is_closed:
            return
        self.is_closed = True
        self.stop_clear_conn()
        pool, self._pool = self._pool, None
        for conn in pool:
            conn.close()

    def clear_idle_conn(self) -> None:
        if self.is_closed:
            raise ConnectionPoolClosed
        # Staring a thread to clear idle connections
        threading.Thread(target=self._clear_idle_conn).start()

    def _clear_idle_conn(self) -> None:
        if not self._lock.acquire(timeout=self.idle_timeout):
            return
        current_time = time.time()
        if self.is_pool_empty():
            pass
        elif current_time - self._pool[-1].last_time >= self.idle_timeout:
            self.conn_num -= len(self._pool)
            self._pool.clear()
        else:
            left, right = 0, len(self._pool) - 1
            while left < right:
                mid = (left + right) // 2
                if current_time - self._pool[mid].last_time >= self.idle_timeout:
                    left = mid + 1
                else:
                    right = mid
            self._pool = self._pool[left:]
            self.conn_num -= left
        self._lock.release()

    def start_clear_conn(self) -> None:
        if self.idle_timeout is None:
            return
        self.clear_idle_conn()
        self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
        self._clearer.start()

    def stop_clear_conn(self) -> None:
        if self._clearer is not None:
            self._clearer.cancel()

    def __enter__(self) -> 'HTTPConnectionPool':
        return self

    def __exit__(self, *exit_info: Any) -> None:
        self.close()

Methods

def acquire(self, blocking: bool = True, timeout: int = None) ‑> WrapperHTTPConnection
Expand source code
def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
    if self.is_closed:
        raise ConnectionPoolClosed
    with self._lock:
        if self.max_size is None or not self.is_full():
            if self.is_pool_empty():
                self._put_connection(self._create_connection())
        else:
            if not blocking:
                if self.is_pool_empty():
                    raise EmptyPoolError
            elif timeout is None:
                while self.is_pool_empty():
                    self._lock.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                end_time = time.time() + timeout
                while self.is_pool_empty():
                    remaining = end_time - time.time()
                    if remaining <= 0:
                        raise EmptyPoolError
                    self._lock.wait(remaining)
        return self._get_connection()
def clear_idle_conn(self) ‑> None
Expand source code
def clear_idle_conn(self) -> None:
    if self.is_closed:
        raise ConnectionPoolClosed
    # Staring a thread to clear idle connections
    threading.Thread(target=self._clear_idle_conn).start()
def close(self) ‑> None
Expand source code
def close(self) -> None:
    if self.is_closed:
        return
    self.is_closed = True
    self.stop_clear_conn()
    pool, self._pool = self._pool, None
    for conn in pool:
        conn.close()
def is_full(self) ‑> bool
Expand source code
def is_full(self) -> bool:
    if self.max_size is None:
        return False
    return self.conn_num >= self.max_size
def is_pool_empty(self) ‑> bool
Expand source code
def is_pool_empty(self) -> bool:
    return len(self._pool) == 0
def release(self, conn: WrapperHTTPConnection) ‑> None
Expand source code
def release(self, conn: WrapperHTTPConnection) -> None:
    if self.is_closed:
        conn.close()
        return
    with self._lock:
        if not conn.is_available:
            conn.close()
            self.conn_num -= 1
            conn = self._create_connection()
        self._put_connection(conn)
        self._lock.notify()
def start_clear_conn(self) ‑> None
Expand source code
def start_clear_conn(self) -> None:
    if self.idle_timeout is None:
        return
    self.clear_idle_conn()
    self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
    self._clearer.start()
def stop_clear_conn(self) ‑> None
Expand source code
def stop_clear_conn(self) -> None:
    if self._clearer is not None:
        self._clearer.cancel()
class WrapperHTTPConnection (pool: HTTPConnectionPool, conn: http.client.HTTPConnection)

Wrapped Http Connection, used with connection pool :param pool: Connection pool this connection belongs to :param conn: Wrapped HTTPConnection

Expand source code
class WrapperHTTPConnection:

    def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
        """
        Wrapped Http Connection, used with connection pool
        :param pool: Connection pool this connection belongs to
        :param conn: Wrapped HTTPConnection
        """
        self.pool = pool
        self.conn = conn
        self.response = None
        self.last_time = time.time()
        self.is_available = True

    def __enter__(self) -> 'WrapperHTTPConnection':
        return self

    def __exit__(self, *exit_info: Any) -> None:
        if not self.response.will_close and not self.response.is_closed():
            self.close()
        self.pool.release(self)

    def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
        self.conn.request(*args, **kwargs)
        self.response = self.conn.getresponse()
        return self.response

    def close(self) -> None:
        self.conn.close()
        self.is_available = False

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    self.conn.close()
    self.is_available = False
def request(self, *args: Any, **kwargs: Any) ‑> http.client.HTTPResponse
Expand source code
def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
    self.conn.request(*args, **kwargs)
    self.response = self.conn.getresponse()
    return self.response