我想判断大约100个IP地址的可达性,并设置信号量并发任务的限制.但现在我不确定它到底是如何工作的,或者为什么它在代码示例中不工作.正如我所观察到的,"task_reachable"功能仍然正确执行.如果没有可访问的地址,那么在"try_ssh_connection"中,"所有"任务都是并行执行的,这使得代码速度非常慢.

class test_class():
    def __init__(self):
        self.username = "username"
        self.password = "password"

        self.ips = open("open_ip_list")

    def create_async(self):

        asyncio.run(self.create_tasks())

    async def boundary_task(self,ip):
        sem = asyncio.Semaphore(2)
        async with sem:
            return await self.task_reachable(ip)

    async def create_tasks(self):
        timer = Timer(text=f" task time: {{:.1f}}")
        timer.start()
        tasks = [
            asyncio.ensure_future(self.boundary_task(i))
            for i
            in self.ips
        ]
        await asyncio.gather(*tasks)
        timer.stop()

    async def task_reachable(self, ip):
        url = "http://" + ip.strip("\n") + "/example_website.html"
        session = aiohttp.ClientSession()
        try:
            resp = await session.get(url, ssl=False, timeout = 1)
            await resp.read()
            await session.close()

        except:
            await session.close()
            await self.try_ssh_connection(ip, url)
        await session.close()

    async def try_ssh_connection(self, host, url):
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(host, username=self.username, password=self.password)
            print("go")

        except paramiko.AuthenticationException:
            print( "Username or Password wrong!")
            await self.stop_fun()

        except OSError:
            print("Network is not reachable")
            await self.stop_fun()

    async def stop_fun(self):
        stop_event = asyncio.Event()
        try:
            stop_event.set()
        except RuntimeError:
            pass

if __name__ == "__main__":
    app = test_class()
    app.create_async()

推荐答案

你的问题是boundary_task的每个运行实例都有自己的信号量.

async def boundary_task(self, ip):
    sem = asyncio.Semaphore(2)

如果希望它们都使用相同的信号量,boundary_task的所有实例都需要共享它.

async def boundary_task(self, ip, semaphore):
    async with sem:
        return await self.task_reachable(ip)

async def create_tasks(self):
    sem = asyncio.Semaphore(2)
    tasks = [
        self.boundary_task(i, sem)
        for i
        in self.ips
    ]
    await asyncio.gather(*tasks)

因为您使用的是一个类,所以还可以在__init__中创建信号量.

def __init__(self):
    ...

    self.sem = asyncio.Semaphore(2)

async def boundary_task(self, ip):
    async with self.sem:
        return await self.task_reachable(ip)

Python-3.x相关问答推荐

如何使用Python将嵌套的XML转换为CSV

Django将任何查询显示为html表格

使用递归将int转换为字符串

检测点坐标 - opencv findContours()

GUI 仍然有效并且没有错误消息时图形意外冻结 |具有多线程的 Pyside6 和 pyqtgraph (Python 3.11.4)

try 使用 GEKKO 求解非线性方程组.系统有多种解决方案,但 GEKKO 给出了错误的解决方案.我该如何解决?

Pytest顺序测试A,然后测试B,然后再测试A

从列表的元素和python中的多个多索引数据帧执行方程

仅当从 USB 摄像头接收到新图像时才处理图像

如何对具有多个列值的 pandas 数据框进行数据透视/数据透视表

如何在python中将列表转换为其他格式

合并两个numpy数组

Pythonic,自定义警告

Python 解包运算符 (*)

为什么等效的 Python 代码要慢得多

用于 Django 应用程序的 Cython:它会工作吗?

如何使用 python 库连接到 poloniex.com websocket api

SQLAlchemy:如果不存在则创建模式

将字符串拆分为最大长度 X 的片段 - 仅在空格处拆分

Django Rest 框架 ListField 和 DictField