我目前面临一个问题,我给一个线程一个集合的引用,我希望能够用模拟的数据库调用替换该集合.到目前为止,我已经完成了
import logging
import threading
import time
from typing import Callable
from loguru import logger
class MonitorProduct:
def __init__(self, term: str, is_alive: Callable[[str], bool]) -> None:
self.is_alive = is_alive
self.term = term
def do_request(self) -> None:
time.sleep(.1)
while True:
logger.info(f'Checking {self.term}')
if not self.is_alive(self.term):
logger.info(f'Deleting term from monitoring: "{self.term}"')
return
time.sleep(5)
# mocked database
def database_terms() -> set[str]:
return {
'hello world',
'python 3',
'world',
'wth',
}
def database_terms_2() -> set[str]:
return {
'what am I doing wrong',
}
def main() -> None:
terms: set[str] = set()
while True:
db_terms = database_terms()
diff = db_terms - terms
terms.symmetric_difference_update(db_terms)
for url in diff:
logger.info(f'Starting URL: {url}')
threading.Thread(
target=MonitorProduct(url, terms.__contains__).do_request
).start()
time.sleep(2)
# ----------------------------------------------- #
db_terms = database_terms_2()
diff = db_terms - terms
terms.symmetric_difference_update(db_terms) # <--- terms should only now contain `what am I doing wrong`
# Start the new URLS
for url in diff:
logger.info(f'Starting URL 2: {url}')
threading.Thread(
target=MonitorProduct(url, terms.__contains__).do_request
).start()
time.sleep(10)
if __name__ == '__main__':
main()
我现在遇到的问题是,当我们执行第一个db调用时,它应该 for each 术语启动线程:
{
'hello world',
'python 3',
'world',
'wth',
}
正如你们所见,我们还 for each 线程发送了terms.__contains__
.
当我们第二次调用db时,该集合应该替换terms
到
{
'what am I doing wrong',
}
由于以下原因,最终应退出四个运行线程:
def do_request(self) -> None:
time.sleep(.1)
while True:
logger.info(f'Checking {self.term}')
if not self.is_alive(self.term):
logger.info(f'Deleting term from monitoring: "{self.term}"')
return
time.sleep(5)
然而,问题是,我们不能以行动取代条款
术语=…因为我们正在创建一个新的集合,然后将该集合绑定到变量术语,而线程仍然有一个对旧集合的引用.
我的问题是,如何在不绑定新集合的情况下更新到最新集合来替换旧术语?