Module amplitude_experiment.local.poller

Expand source code
import threading
import time


class Poller:
    """
    Poller to run a function every interval
    """
    def __init__(self, interval, function, *args, **kwargs):
        self._timer = None
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.next_call = 0

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            if self.next_call == 0:
                self.next_call = time.time()
            self.next_call += self.interval
            self._timer = threading.Timer(self.next_call - time.time(), self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        if self._timer:
            self._timer.cancel()
        self.is_running = False

Classes

class Poller (interval, function, *args, **kwargs)

Poller to run a function every interval

Expand source code
class Poller:
    """
    Poller to run a function every interval
    """
    def __init__(self, interval, function, *args, **kwargs):
        self._timer = None
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.next_call = 0

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            if self.next_call == 0:
                self.next_call = time.time()
            self.next_call += self.interval
            self._timer = threading.Timer(self.next_call - time.time(), self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        if self._timer:
            self._timer.cancel()
        self.is_running = False

Methods

def start(self)
Expand source code
def start(self):
    if not self.is_running:
        if self.next_call == 0:
            self.next_call = time.time()
        self.next_call += self.interval
        self._timer = threading.Timer(self.next_call - time.time(), self._run)
        self._timer.start()
        self.is_running = True
def stop(self)
Expand source code
def stop(self):
    if self._timer:
        self._timer.cancel()
    self.is_running = False