我有一个Pandas DataFrame,它有一个秒频率的Date Time索引和代表金融工具价格的"Open"、"High"、"Low"、"Close"列.

我想将此DataFrame重新采样为15分钟(或任何频率),但不会窥视future ,仍然以一秒的频率保留原始DataFrame,但 for each 蜡烛添加四个新列.其目标是实时展示蜡烛是如何形成的.

例如,对于15分钟的蜡烛,我将在原始DataFrame中有四个新列,名为‘Open_15m’、‘High_15m’、‘Low_15m’和‘Close_15m’,它们将作为滚动的OHLC每秒更新值.

一支15分钟的蜡烛只能在HH:00:00或HH:15:00 HH:30:00或HH:45:00开始.这意味着,例如,如果我们的DataFrame在09:00:00开始,我们有从09:00:00到09:15:00的滚动OHLC,那么我们重置并重新开始,因为新的15分钟蜡烛在09:15:00开始形成.

我想出了这样做的代码,我认为它是正确的,但对于有数百万行的DataFrames来说,它是too slow.如果代码是正确的,则需要通过使用Numpy&Numba来以某种方式加快速度.

# Function to find the nearest 15-minute floor
def nearest_quarter_hour(timestamp):
    return timestamp.floor('15T')

# Find the nearest 15-minute floor for each timestamp
df['15_min_floor'] = df.index.map(nearest_quarter_hour)

# Group by the nearest 15-minute floor and calculate rolling OHLC
rolling_df = df.groupby('15_min_floor').rolling(window='15T').agg({
    'Open': lambda x: x.iloc[0],  # First value in the window
    'High': 'max',
    'Low': 'min',
    'Close': lambda x: x.iloc[-1]  # Last value in the window
}).reset_index(level=0, drop=True)

# add _15 to each column rolling df
rolling_df.columns = [f'{col}_15' for col in rolling_df.columns]

# Merge with original DataFrame
result_df = pd.concat([df, rolling_df], axis=1)

推荐答案

Here is version that computes OHLC your way which is significantly faster:

from numba import njit

@njit
def compute_ohlc(floor_15_min, O, H, L, C, O_out, H_out, L_out, C_out):
    first, curr_max, curr_min, last = O[0], H[0], L[0], C[0]

    last_v = floor_15_min[0]
    for i, v in enumerate(floor_15_min):
        if v != last_v:
            first, curr_max, curr_min, last = O[i], H[i], L[i], C[i]
            last_v = v
        else:
            curr_max = max(curr_max, H[i])
            curr_min = min(curr_min, L[i])
            last = C[i]

        O_out[i] = first
        H_out[i] = curr_max
        L_out[i] = curr_min
        C_out[i] = last


def compute_numba(df):
    df["15_min_floor_2"] = df.index.floor("15 min")
    df[["Open_15_2", "High_15_2", "Low_15_2", "Close_15_2"]] = np.nan

    compute_ohlc(
        df["15_min_floor_2"].values,
        df["Open"].values,
        df["High"].values,
        df["Low"].values,
        df["Close"].values,
        df["Open_15_2"].values,
        df["High_15_2"].values,
        df["Low_15_2"].values,
        df["Close_15_2"].values,
    )

compute_numba(df)

432001行随机df的基准测试:

from timeit import timeit

import pandas as pd
from numba import njit


# generate some random data:

np.random.seed(42)

idx = pd.date_range("1-1-2023", "1-6-2023", freq="1000ms")
df = pd.DataFrame(
    {
        "Open": 50 + np.random.random(len(idx)) * 100,
        "High": 50 + np.random.random(len(idx)) * 100,
        "Low": 50 + np.random.random(len(idx)) * 100,
        "Close": 50 + np.random.random(len(idx)) * 100,
    },
    index=idx,
)


def get_result_df(df):
    def nearest_quarter_hour(timestamp):
        return timestamp.floor("15min")

    # Find the nearest 15-minute floor for each timestamp
    df["15_min_floor"] = df.index.map(nearest_quarter_hour)

    # Group by the nearest 15-minute floor and calculate rolling OHLC
    rolling_df = (
        df.groupby("15_min_floor")
        .rolling(window="15min")
        .agg(
            {
                "Open": lambda x: x.iloc[0],  # First value in the window
                "High": "max",
                "Low": "min",
                "Close": lambda x: x.iloc[-1],  # Last value in the window
            }
        )
        .reset_index(level=0, drop=True)
    )

    # add _15 to each column rolling df
    rolling_df.columns = [f"{col}_15" for col in rolling_df.columns]

    # Merge with original DataFrame
    result_df = pd.concat([df, rolling_df], axis=1)

    return result_df


@njit
def compute_ohlc(floor_15_min, O, H, L, C, O_out, H_out, L_out, C_out):
    first, curr_max, curr_min, last = O[0], H[0], L[0], C[0]

    last_v = floor_15_min[0]
    for i, v in enumerate(floor_15_min):
        if v != last_v:
            first, curr_max, curr_min, last = O[i], H[i], L[i], C[i]
            last_v = v
        else:
            curr_max = max(curr_max, H[i])
            curr_min = min(curr_min, L[i])
            last = C[i]

        O_out[i] = first
        H_out[i] = curr_max
        L_out[i] = curr_min
        C_out[i] = last


def compute_numba(df):
    df["15_min_floor_2"] = df.index.floor("15 min")
    df[["Open_15_2", "High_15_2", "Low_15_2", "Close_15_2"]] = np.nan

    compute_ohlc(
        df["15_min_floor_2"].values,
        df["Open"].values,
        df["High"].values,
        df["Low"].values,
        df["Close"].values,
        df["Open_15_2"].values,
        df["High_15_2"].values,
        df["Low_15_2"].values,
        df["Close_15_2"].values,
    )


t1 = timeit("get_result_df(df)", number=1, globals=globals())
t2 = timeit("compute_numba(df)", number=1, globals=globals())

print(f"Time normal = {t1}")
print(f"Time numba =  {t2}")

在我的电脑上打印AMD 5700x(432001行):

Time normal = 29.57983471499756
Time numba =  0.2751060768496245

对于数据帧pd.date_range("1-1-2004", "1-1-2024", freq="1000ms")(约6.31亿行),结果是:

Time numba =  11.551695882808417

Python相关问答推荐

如何根据另一列值用字典中的值替换列值

如何调整spaCy token 化器,以便在德国模型中将数字拆分为行末端的点

删除最后一个pip安装的包

如何标记Spacy中不包含特定符号的单词?

为什么带有dropna=False的groupby会阻止后续的MultiIndex.dropna()工作?

按列分区,按另一列排序

如何列举Pandigital Prime Set

SQLAlchemy Like ALL ORM analog

为什么NumPy的向量化计算在将向量存储为类属性时较慢?'

使用NeuralProphet绘制置信区间时出错

将JSON对象转换为Dataframe

在Django admin中自动完成相关字段筛选

合并帧,但不按合并键排序

如何使用两个关键函数来排序一个多索引框架?

BeautifulSoup:超过24个字符(从a到z)的迭代失败:降低了首次深入了解数据集的复杂性:

如何在验证文本列表时使正则表达式无序?

SpaCy:Regex模式在基于规则的匹配器中不起作用

Scipy差分进化:如何传递矩阵作为参数进行优化?

具有不同坐标的tkinter canvs.cocords()和canvs.moveto()

Numpy`astype(Int)`给出`np.int64`而不是`int`-怎么办?