我正在使用pystata包,它允许我从Python运行stata代码,并将数据从Python发送到stata和返回.

我的理解方式是,有一个在后台运行的stata实例.我想 bootstrap 一些围绕stata代码的代码,并且我想并行运行它.

本质上,我希望拥有这样的东西

from joblib import Parallel, delayed
import pandas as pd

def single_instance(seed):
    # initialize stata

    from pystata import config, stata
    config.init('be')
    # run some stata code (load a data set and collapse, for example)   
    stata.run('some code')
    # load stata data to python
    df = stata.pdataframe_from_data()
    out = do_something_with_data(df, seed)
    return out


if __name__ == '__main__':

   seeds = np.arange(1, 100)
   Parallel(backend='loky', n_jobs=-1)(
        delayed(single_instance)(seeds[i]) for i in values)

其中有一些并行运行的代码,并且每个线程都并行初始化自己的stata实例.然而,我担心所有这些并行线程都在访问同一个stata实例--这能像我预期的那样工作吗?我应该如何设置这个?

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/x/miniconda3/envs/stata/lib/python3.12/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 649, in subimport
    __import__(name)
  File "/usr/local/stata/utilities/pystata/stata.py", line 8, in <module>
    config.check_initialized()
  File "/usr/local/stata/utilities/pystata/config.py", line 281, in check_initialized
    _RaiseSystemException('''
  File "/usr/local/stata/utilities/pystata/config.py", line 86, in _RaiseSystemException
    raise SystemError(msg)
SystemError: 
    Note: Stata environment has not been initialized yet. 
    To proceed, you must call init() function in the config module as follows:

        from pystata import config
        config.init()
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 299, in <module>
    bootstrap(aggregation='occ')
  File "test.py", line 277, in bootstrap
    z = Parallel(backend='loky', n_jobs=-1)(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/site-packages/joblib/parallel.py", line 1098, in __call__
    self.retrieve()
  File "/home/x/miniconda3/envs/stata/lib/python3.12/site-packages/joblib/parallel.py", line 975, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/site-packages/joblib/_parallel_backends.py", line 567, in wrap_future_result
    return future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/x/miniconda3/envs/stata/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

推荐答案

使用backend="multiprocessing"作为joblib.Parallel的参数将在不同的进程中启动Stata实例.

Python相关问答推荐

基本链合同的地址是如何计算的?

是pandas.DataFrame使用方法查询后仍然排序吗?

由于瓶颈,Python代码执行太慢-寻求性能优化

Python中的函数中是否有充分的理由接受float而不接受int?

Python plt.text中重叠,包adjust_text不起作用,如何修复?

通过仅导入pandas来在for循环中进行多情节

使用polars .滤镜进行切片速度比pandas .loc慢

Python 约束无法解决n皇后之谜

如何请求使用Python将文件下载到带有登录名的门户网站?

Python—从np.array中 Select 复杂的列子集

Pre—Commit MyPy无法禁用非错误消息

将输入聚合到统一词典中

Asyncio:如何从子进程中读取stdout?

在单个对象中解析多个Python数据帧

UNIQUE约束失败:customuser. username

Python中的变量每次增加超过1

替换现有列名中的字符,而不创建新列

当条件满足时停止ODE集成?

浏览超过10k页获取数据,解析:欧洲搜索服务:从欧盟站点收集机会的微小刮刀&

比较两个有条件的数据帧并删除所有不合格的数据帧