我正在try 为法国的公共药物数据库(https://base-donnees-publique.medicaments.gouv.fr/)编写一个解析器/API.它由8个csv文件组成(实际上是tsv,因为它们使用制表符),每个文件从几kB到4MB不等,最大的行有大约20000行(每行表示一种药物,带有其名称、代码、价格等).

由于这些文件可能会定期出现,所以我希望直接解析它们,而不是创建一个更干净的数据库(因为无论如何我都可能需要定期重新创建它).

导入这些文件花费了一些时间(大约一秒钟),所以我试图加快速度,并对几种方法进行了一些基准测试,我惊讶地发现,最基本的方法似乎也是最快的.

这是我的测试代码(对不起,它太长了).每个文件都与一个专用类相关联,以解析其行.基本上,这些类是命名的元组,带有一个用于解析日期、数字等的定制类方法.

import pathlib
import enum
import datetime
from decimal import Decimal
from collections import namedtuple
import csv


def parse_date(date: str) -> datetime.datetime:
    return datetime.datetime.strptime(date, "%d/%m/%Y").date()


def parse_date_bis(date: str) -> datetime.datetime:
    return datetime.datetime.strptime(date, "%Y%m%d").date()


def parse_text(text):
    if not text:
        return ""
    return text.replace("<br>", "\n").strip()


def parse_list(raw):
    return raw.split(";")


def parse_price(price: str) -> Decimal:
    if not price:
        return None
    # Handles cases like "4,417,08".
    price = '.'.join(price.rsplit(",", 1)).replace(',', '')
    return Decimal(price)


def parse_percentage(raw: str) -> int:
    if not raw:
        return None
    return int(raw.replace("%", "").strip())


class StatutAdministratifPresentation(enum.Enum):
    ACTIVE = "Présentation active"
    ABROGEE = "Présentation abrogée"


class EtatCommercialisation(enum.Enum):
    DC = "Déclaration de commercialisation"
    S = "Déclaration de suspension de commercialisation"
    DAC = "Déclaration d'arrêt de commercialisation"
    AC = "Arrêt de commercialisation (le médicament n'a plus d'autorisation)"


class MotifAvisSMR(enum.Enum):
    INSCRIPTION = "Inscription (CT)"
    RENOUVELLEMENT = "Renouvellement d'inscription (CT)"
    EXT = "Extension d'indication"
    EXTNS = "Extension d'indication non sollicitée"
    REEV_SMR = "Réévaluation SMR"
    REEV_ASMR = "Réévaluation ASMR"
    REEV_SMR_ASMR = "Réévaluation SMR et ASMR"
    REEV_ETUDE = "Réévaluation suite à résultats étude post-inscript"
    REEV_SAISINE = "Réévaluation suite saisine Ministères (CT)"
    NOUV_EXAM = "Nouvel examen suite au dépôt de nouvelles données"
    MODIF_COND = "Modification des conditions d'inscription (CT)"
    AUTRE = "Autre demande"


class ImportanceSMR(enum.Enum):
    IMPORTANT = "Important"
    MODERE = "Modéré"
    FAIBLE = "Faible"
    INSUFFISANT = "Insuffisant"
    COMMENTAIRES = "Commentaires"
    NP = "Non précisé"


class ImportanceASMR(enum.Enum):
    COM = "Commentaires sans chiffrage de l'ASMR"
    I = "I"
    II = "II"
    III = "III"
    IV = "IV"
    V = "V"
    NP = "Non précisée"
    SO = "Sans objet"


class Specialite(namedtuple("Specialite", ("cis", "denomation", "forme", "voies_administration", "statut_amm", "type_amm", "commercialisation", "date_amm", "statut_bdm", "numero_autorisation_europeenne", "titulaire", "surveillance_renforcee"))):
    @classmethod
    def from_line(cls, line):
        line[2] = line[2].replace("  ", " ").strip()
        line[3] = parse_list(line[3])
        line[7] = parse_date(line[7])
        line[10] = line[10].strip()  # There are often leading spaces here (like ' OPELLA HEALTHCARE FRANCE').
        return cls(*line)


class Presentation(namedtuple("Specialite", ("cis", "cip7", "libelle", "statut", "commercialisation", "date_commercialisation", "cip13", "agrement_collectivites", "taux_remboursement", "prix", "prix_hors_honoraires", "montant_honoraires", "indications_remboursement"))):
    @classmethod
    def from_line(cls, line):
        if line[3] == "Présentation active":
            line[3] = StatutAdministratifPresentation.ACTIVE
        else:
            line[3] = StatutAdministratifPresentation.ABROGEE
        line[4] = {
            "Déclaration de commercialisation": EtatCommercialisation.DC,
            "Déclaration de suspension de commercialisation": EtatCommercialisation.S,
            "Déclaration d'arrêt de commercialisation": EtatCommercialisation.DAC,
            "Arrêt de commercialisation (le médicament n'a plus d'autorisation)": EtatCommercialisation.AC
        }.get(line[4])
        line[5] = parse_date(line[5])
        line[7] = True if line[7] == "oui" else False
        line[8] = parse_percentage(line[8])
        line[9] = parse_price(line[9])
        line[10] = parse_price(line[10])
        line[11] = parse_price(line[11])
        line[12] = parse_text(line[12])
        return cls(*line)


class Composition(namedtuple("Composition", ("cis", "element", "code", "substance", "dosage", "ref_dosage", "nature_composant", "cle"))):
    @classmethod
    def from_line(cls, line):
        line.pop(-1)
        return cls(*line)


class AvisSMR(namedtuple("AvisSMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
    @classmethod
    def from_line(cls, line):
        line[2] = MotifAvisSMR(line[2])
        line[3] = parse_date_bis(line[3])
        line[4] = ImportanceSMR(line[4])
        line[5] = parse_text(line[5])
        return cls(*line)


class AvisASMR(namedtuple("AvisASMR", ("cis", "dossier_has", "motif", "date", "valeur", "libelle"))):
    @classmethod
    def from_line(cls, line):
        line[2] = MotifAvisSMR(line[2])
        line[3] = parse_date_bis(line[3])
        line[4] = ImportanceASMR(line[4])
        line[5] = parse_text(line[5])
        return cls(*line)


class AvisCT(namedtuple("AvisCT", ("dossier_has", "lien"))):
    @classmethod
    def from_line(cls, line):
        return cls(*line)


FILE_MATCHES = {
    "CIS_bdpm.txt": Specialite,
    "CIS_CIP_bdpm.txt": Presentation,
    "CIS_COMPO_bdpm.txt": Composition,
    "CIS_HAS_ASMR_bdpm.txt": AvisASMR,
    "CIS_HAS_SMR_bdpm.txt": AvisSMR,
    "HAS_LiensPageCT_bdpm.txt": AvisCT
}


def sequential_import_file_data(filename, cls):
    result = {cls: []}
    with (pathlib.Path("data") / filename).open("r", encoding="latin1") as f:
        rows = csv.reader(f, delimiter="\t")
        for line in rows:
            data = cls.from_line(line)
            result[cls].append(data)
    return result


def import_data_sequential():
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(sequential_import_file_data(filename, cls))


from multiprocessing.pool import ThreadPool

def import_data_mp_tp(n=2):
    pool = ThreadPool(n)
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(pool.apply_async(
            sequential_import_file_data,
            (filename, cls)
        ))
    results = [r.get() for r in results]


from multiprocessing.pool import Pool

def import_data_mp_p(n=2):
    pool = Pool(n)
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(pool.apply_async(
            sequential_import_file_data,
            (filename, cls)
        ))
    results = [r.get() for r in results]


import asyncio
import aiofiles
from aiocsv import AsyncReader

async def async_import_file_data(filename, cls):
    results = {cls: []}
    async with aiofiles.open(
        (pathlib.Path("data") / filename),
        mode="r",
        encoding="latin1"
    ) as afp:
        async for line in AsyncReader(afp, delimiter="\t"):
            data = cls.from_line(line)
            results[cls].append(data)
    return results


def import_data_async():
    results = []
    for filename, cls in FILE_MATCHES.items():
        results.append(asyncio.run(async_import_file_data(filename, cls)))


def main():
    import timeit
    print(
        "Sequential:",
        timeit.timeit(lambda: import_data_sequential(), number=10)
    )
    print(
        "Multi ThreadPool:",
        timeit.timeit(lambda: import_data_mp_tp(), number=10)
    )
    print(
        "Multi Pool:",
        timeit.timeit(lambda: import_data_mp_p(), number=10)
    )
    print(
        "Async:",
        timeit.timeit(lambda: import_data_async(), number=10)
    )


if __name__ == "__main__":
    main()

因此,当我运行它时,我会得到以下结果.

Sequential: 9.821639589001279
Multi ThreadPool: 10.137484730999859
Multi Pool: 12.531487682997977
Async: 30.953154197999538

遍历所有文件及其所有行的最基本解决方案似乎也是最快的.

那么,我是不是做错了什么,让进口变慢了?或者有这样的时差是正常的/意料之中的吗?

推荐答案

像往常一样:在您的代码上运行分析器,以查看它将时间花在哪里.(这是PyCharm的,它包装了stdlib cProfile.)

顺序:7.865187874995172

enter image description here

嗯,好吧.strptime,我看得出datetime.datetime.strptime会打来电话.另外,奇怪的是,getlocale...为什么我们需要那里的地区?点击调用图可以看到,strptime实际上查找了当前的语言环境,并且有一堆锁等等--如果我们用我们自己的实现替换这些parse_date会怎么样?

def parse_date(date: str) -> datetime.date:
    d, m, y = (int(x) for x in date.split("/", 2))
    return datetime.date(2000 + y, m, d)


def parse_date_bis(date: str) -> datetime.datetime:
    y = int(date[:4])
    m = int(date[4:6])
    d = int(date[6:8])
    return datetime.datetime(y, m, d)

顺序:3.8978060420195106

好了,我们做饭了!52%的进步就在这里!

enter image description here

(它没有出现在这里的屏幕截图上,因为我是一只愚蠢的鹅来修剪它,但strptime在引擎盖下使用的re种东西也直接掉了下来.)

现在,让我们假设将有许多相同的日期,并在热parse_date_*函数上打出@lru_cache(maxsize=None)个(内存灵活,无限缓存),运行代码,并打印缓存信息:

Sequential: 3.2240814580000006
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)

在我看来很不错,我们在最后一个数字上又打了15%的折扣.

不过,parse_price显然也可以使用缓存:

enter image description here

Sequential: 2.928746833000332
CacheInfo(hits=358989, misses=6991, maxsize=None, currsize=6991)
CacheInfo(hits=221607, misses=513, maxsize=None, currsize=513)
CacheInfo(hits=622064, misses=4096, maxsize=None, currsize=4096)

嘿,谁知道,数据中只有4096个单独的价格字符串.

如果您有足够的内存,其余的解析函数也可以使用缓存,但是只需要一点分析和解析工作,它现在的速度就提高了2.7倍[当所有操作都运行10次时,这意味着这些缓存将是热的--单次运行的加速比没有那么显著],不需要并行处理.魔术!

为了更公平一些,这里有一个hyperfine的基准测试,其中每个导入都从头开始使用Python解释器(每个解释器只运行一次导入):

$ hyperfine 'python3 so76781391-orig.py' 'python3 so76781391-opt.py' --warmup 5 --min-benchmarking-time 10
Benchmark 1: python3 so76781391-orig.py
  Time (mean ± σ):     363.0 ms ±   2.7 ms    [User: 340.8 ms, System: 20.7 ms]
  Range (min … max):   358.9 ms … 367.9 ms    27 runs

Benchmark 2: python3 so76781391-opt.py
  Time (mean ± σ):     234.1 ms ±   2.5 ms    [User: 215.6 ms, System: 17.0 ms]
  Range (min … max):   228.2 ms … 238.5 ms    42 runs

Summary
  'python3 so76781391-opt.py' ran
    1.55 ± 0.02 times faster than 'python3 so76781391-orig.py'

因此,通过快速查看分析器(以及一些额外的优化,例如不在from_line个函数内创建映射DICT等),具体的速度提高了55%.

Python相关问答推荐

为什么图像结果翻转了90度?

如何判断. text文件中的某个字符,然后读取该行

如何对行使用分段/部分.diff()或.pct_change()?

将嵌套列表的字典转换为数据框中的行

使用Python C API重新启动Python解释器

telegram 机器人API setMyName不起作用

数字梯度的意外值

通过交换 node 对链接列表进行 Select 排序

跟踪我已从数组中 Select 的样本的最有效方法

剧作家Python没有得到回应

GL pygame无法让缓冲区与vertextPointer和colorPointer一起可靠地工作

如何在Deliveryter笔记本中从同步上下文正确地安排和等待Delivercio代码中的结果?

Deliveryter Notebook -无法在for循环中更新matplotlib情节(保留之前的情节),也无法使用动画子功能对情节进行动画

查找两极rame中组之间的所有差异

如何让程序打印新段落上的每一行?

joblib:无法从父目录的另一个子文件夹加载转储模型

matplotlib图中的复杂箭头形状

如何创建引用列表并分配值的Systemrame列

替换现有列名中的字符,而不创建新列

用SymPy在Python中求解指数函数