Module src.main.python.scheduler
Expand source code
import uasyncio as asyncio
class Scheduler:
def __init__(self):
""" constructor.
"""
def __str__(self):
"""prints the object."""
return "Scheduler"
def __set_global_exception(self):
def handle_exception(loop, context):
import sys
print("\t Global Error!!")
sys.print_exception(context["exception"])
# sys.exit()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
def schedule_once(self, task):
asyncio.create_task(task)
async def __start(self, task):
self.__set_global_exception()
await asyncio.gather(*task, return_exceptions=False) # returns async task, has to be false to rise exception instead of returning it
def start(self, task):
"""passed an array of corutines to be executed"""
asyncio.run(self.__start(task)) # run needs async task
Classes
class Scheduler-
constructor.
Expand source code
class Scheduler: def __init__(self): """ constructor. """ def __str__(self): """prints the object.""" return "Scheduler" def __set_global_exception(self): def handle_exception(loop, context): import sys print("\t Global Error!!") sys.print_exception(context["exception"]) # sys.exit() loop = asyncio.get_event_loop() loop.set_exception_handler(handle_exception) def schedule_once(self, task): asyncio.create_task(task) async def __start(self, task): self.__set_global_exception() await asyncio.gather(*task, return_exceptions=False) # returns async task, has to be false to rise exception instead of returning it def start(self, task): """passed an array of corutines to be executed""" asyncio.run(self.__start(task)) # run needs async taskMethods
def schedule_once(self, task)-
Expand source code
def schedule_once(self, task): asyncio.create_task(task) def start(self, task)-
passed an array of corutines to be executed
Expand source code
def start(self, task): """passed an array of corutines to be executed""" asyncio.run(self.__start(task)) # run needs async task