Modern ActivityPub compliant server, designed for simplicity and accessibility. Includes calendar and sharing economy features to empower your federated community. https://code.freedombone.net/bashrc/epicyon Docs: https://epicyon.net/#install
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

threads.py 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. __filename__ = "threads.py"
  2. __author__ = "Bob Mottram"
  3. __license__ = "AGPL3+"
  4. __version__ = "1.2.0"
  5. __maintainer__ = "Bob Mottram"
  6. __email__ = "bob@freedombone.net"
  7. __status__ = "Production"
  8. import threading
  9. import sys
  10. import time
  11. import datetime
  12. class threadWithTrace(threading.Thread):
  13. def __init__(self, *args, **keywords):
  14. self.startTime = datetime.datetime.utcnow()
  15. self.isStarted = False
  16. tries = 0
  17. while tries < 3:
  18. try:
  19. self._args, self._keywords = args, keywords
  20. threading.Thread.__init__(self, *self._args, **self._keywords)
  21. self.killed = False
  22. break
  23. except Exception as e:
  24. print('ERROR: threads.py/__init__ failed - ' + str(e))
  25. time.sleep(1)
  26. tries += 1
  27. def start(self):
  28. tries = 0
  29. while tries < 3:
  30. try:
  31. self.__run_backup = self.run
  32. self.run = self.__run
  33. threading.Thread.start(self)
  34. break
  35. except Exception as e:
  36. print('ERROR: threads.py/start failed - ' + str(e))
  37. time.sleep(1)
  38. tries += 1
  39. # note that this is set True even if all tries failed
  40. self.isStarted = True
  41. def __run(self):
  42. sys.settrace(self.globaltrace)
  43. self.__run_backup()
  44. self.run = self.__run_backup
  45. def globaltrace(self, frame, event, arg):
  46. if event == 'call':
  47. return self.localtrace
  48. else:
  49. return None
  50. def localtrace(self, frame, event, arg):
  51. if self.killed:
  52. if event == 'line':
  53. raise SystemExit()
  54. return self.localtrace
  55. def kill(self):
  56. self.killed = True
  57. def clone(self, fn):
  58. return threadWithTrace(target=fn,
  59. args=self._args,
  60. daemon=True)
  61. def removeDormantThreads(baseDir: str, threadsList: [], debug: bool,
  62. timeoutMins=30) -> None:
  63. """Removes threads whose execution has completed
  64. """
  65. if len(threadsList) == 0:
  66. return
  67. timeoutSecs = int(timeoutMins * 60)
  68. dormantThreads = []
  69. currTime = datetime.datetime.utcnow()
  70. changed = False
  71. # which threads are dormant?
  72. noOfActiveThreads = 0
  73. for th in threadsList:
  74. removeThread = False
  75. if th.isStarted:
  76. if not th.is_alive():
  77. if (currTime - th.startTime).total_seconds() > 10:
  78. if debug:
  79. print('DEBUG: ' +
  80. 'thread is not alive ten seconds after start')
  81. removeThread = True
  82. # timeout for started threads
  83. if (currTime - th.startTime).total_seconds() > timeoutSecs:
  84. if debug:
  85. print('DEBUG: started thread timed out')
  86. removeThread = True
  87. else:
  88. # timeout for threads which havn't been started
  89. if (currTime - th.startTime).total_seconds() > timeoutSecs:
  90. if debug:
  91. print('DEBUG: unstarted thread timed out')
  92. removeThread = True
  93. if removeThread:
  94. dormantThreads.append(th)
  95. else:
  96. noOfActiveThreads += 1
  97. if debug:
  98. print('DEBUG: ' + str(noOfActiveThreads) +
  99. ' active threads out of ' + str(len(threadsList)))
  100. # remove the dormant threads
  101. dormantCtr = 0
  102. for th in dormantThreads:
  103. if debug:
  104. print('DEBUG: Removing dormant thread ' + str(dormantCtr))
  105. dormantCtr += 1
  106. threadsList.remove(th)
  107. th.kill()
  108. changed = True
  109. # start scheduled threads
  110. if len(threadsList) < 10:
  111. ctr = 0
  112. for th in threadsList:
  113. if not th.isStarted:
  114. print('Starting new send thread ' + str(ctr))
  115. th.start()
  116. changed = True
  117. break
  118. ctr += 1
  119. if not changed:
  120. return
  121. if debug:
  122. sendLogFilename = baseDir + '/send.csv'
  123. try:
  124. with open(sendLogFilename, "a+") as logFile:
  125. logFile.write(currTime.strftime("%Y-%m-%dT%H:%M:%SZ") +
  126. ',' + str(noOfActiveThreads) +
  127. ',' + str(len(threadsList)) + '\n')
  128. except BaseException:
  129. pass