Module amplitude_experiment.local.poller
Expand source code
import threading
import time
class Poller:
"""
Poller to run a function every interval
"""
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = 0
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
if self.next_call == 0:
self.next_call = time.time()
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
if self._timer:
self._timer.cancel()
self.is_running = False
Classes
class Poller (interval, function, *args, **kwargs)
-
Poller to run a function every interval
Expand source code
class Poller: """ Poller to run a function every interval """ def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.next_call = 0 def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: if self.next_call == 0: self.next_call = time.time() self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True def stop(self): if self._timer: self._timer.cancel() self.is_running = False
Methods
def start(self)
-
Expand source code
def start(self): if not self.is_running: if self.next_call == 0: self.next_call = time.time() self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True
def stop(self)
-
Expand source code
def stop(self): if self._timer: self._timer.cancel() self.is_running = False