我在一个数据帧中有my_data_base个初始数据,其中第一列是name,后续列是日志(log).为简单起见,我用5个逻辑列做了一个小例子,在实际数据中大约有my_data_base0列

set.seed(1)
make_dat <- function(nrow=1, ncol=5) {
  cbind.data.frame(name=stringi::stri_rand_strings(n=nrow, length=3), 
                   matrix(data=sample(c(T, F), nrow*ncol, T), ncol=ncol))
}
my_data_base <- make_dat(nrow=10, ncol=5)

..

my_data_base

   name     1     2     3     4     5
1   GNZ FALSE FALSE FALSE  TRUE FALSE
2   uCt FALSE FALSE  TRUE FALSE FALSE
3   wed  TRUE  TRUE FALSE FALSE  TRUE
4   3CA FALSE FALSE FALSE FALSE  TRUE
5   gNl  TRUE FALSE  TRUE FALSE  TRUE
6   Uiz  TRUE FALSE  TRUE FALSE FALSE
7   Nmv FALSE FALSE FALSE FALSE FALSE
8   De7  TRUE FALSE FALSE  TRUE  TRUE
9   GN0 FALSE  TRUE FALSE FALSE  TRUE
10  NrL FALSE  TRUE  TRUE FALSE FALSE

此外,我还有作为新行传入的新数据

new_data <- make_dat(nrow=1, ncol=5)

..

 new_data
  name    1    2     3    4     5
1  QiO TRUE TRUE FALSE TRUE FALSE

我的算法的本质非常简单. 当new_data到达时,我判断new_data相对于my_data_base中的行是否是唯一的.如果是,则添加新的行,如果不是,则一切保持原样.

for (i in 1:100) {
  new_data <- make_dat(nrow=1, ncol=5)
  is_unique_new_data <- all(apply(my_data_base[, -1], 1, \(row) 
                                  all(new_data[, -1] == row)) == FALSE)
  if (is_unique_new_data) my_data_base <- rbind.data.frame(my_data_base, new_data)
}

问题是,my_data_base的大小增长非常快,我希望找到一种方法来保持和增加my_data_base的内存,因为我需要判断新行的唯一性.

如何才能做到这一点呢? 或许可以使用arrowpolarsdatabaseR的标准功能或其他功能来实现这一点?

推荐答案

Complete rewrite.

我相信你正在try 将唯一的行插入到数据集中.我的回答的前一个版本做到了这一点,但都是在内存中.在R中,当谈到内存不足的数据集时,有几种方法可以工作,但没有多少方法可以重复添加数据.

前面:对于R和数据库,列名为12等都是evil.许多与DBI相关的工具不能很好地防止列名为1,因此我strongly敦促您重新考虑这些名称.在这个演示中,我将只使用v1v2等.

因此,函数和初始数据将为:

set.seed(1)
make_dat <- function(nrow=1, ncol=5) {
  out <- cbind.data.frame(name=stringi::stri_rand_strings(n=nrow, length=3), 
                          matrix(data=sample(c(T, F), nrow*ncol, T), ncol=ncol))
  colnames(out)[-1] <- paste0("v", colnames(out)[-1])
  out
}
my_data_base <- make_dat(nrow=10, ncol=5)
head(my_data_base, 2)
#   name    v1    v2    v3    v4    v5
# 1  GNZ FALSE FALSE FALSE  TRUE FALSE
# 2  uCt FALSE FALSE  TRUE FALSE FALSE

我有两个快速的方法:(1)一个"合适的"数据库;(2)CSV判断/附加黑客.

我能想到的最大区别是:

  • CSV黑客攻击方法一次只能在一行上工作,因此您的nrow=参数可能需要硬编码为1,否则您将迭代new_data中的每一行.这将使它变得更慢.
  • 如果您在某个时候说"即使列XYZ相同,也要插入",那么CSV黑客不容易允许这样做(尽管它可能会进一步被黑客攻击,但我不建议这样做).

CSV黑客攻击

这是一个小技巧,但如果你不是在寻找正式的东西,这可能会很好用.为此,您可以将最初的my_data_base写入CSV,然后只有当grep -q(命令行)指示该行不存在时,每行才可以是appended(header=FALSE,只是数据)到文件.

Once you want to _use_ the data, I'd go with `arrow::read_csv_arrow(.., as_data_frame=FALSE)` and `dplyr` processing, which allows lazy filtering/querying with a simple `%>% collect()` at the end where filtered data is finally brought into memory.

两个命令:"check"和"append".

对于"check",我们将使用write.table来创建该行应该是什么样子,并使用grep -q来针对文件创建它,如果找到它,则返回状态代码0,如果没有找到,则返回1.

对于"append",我们将采用write.table创建的文本并将其附加到文件中.

概念和组件的演示:

### works on only one row at a time, though it can
### be modified/extended to work on multiple rows at
### the expense of performance/efficiency
maybe_add_row <- function(x, file) {
  ### CHECK
  txt <- capture.output(write.table(x, row.names=FALSE, col.names=FALSE, sep=","))
  res <- processx::run("grep", args = c("-q", txt, file), error_on_status = FALSE)
  if (res$status == 1) {
    ### APPEND
    cat(txt, "\n", file = file, append = TRUE)
    TRUE
  } else FALSE
}

### initial file creation
dbfn <- "mydata.csv"
write.csv(my_data_base, dbfn, row.names=FALSE)
processx::run("wc", c("-l", dbfn))$stdout   # verify number of lines in the file
# [1] "11 mydata.csv\n"

new_data <- my_data_base[3,]       # will be the same, should not be inserted
maybe_add_row_csv(new_data, dbfn)  # should not be added
# [1] FALSE
processx::run("wc", c("-l", dbfn))$stdout
# [1] "11 mydata.csv\n"

new_data$v1 <- (!new_data$v1)      # will be different, should be inserted
maybe_add_row_csv(new_data, dbfn)  # should be added
# [1] TRUE
processx::run("wc", c("-l", dbfn))$stdout
# [1] "12 mydata.csv\n"

现在我们已经验证了这一点,我们可以迭代并使用它.在这个循环中,我将添加一个runif(1)来随机 Select 放入一个已知的已有文件,只是为了证明这一点.

print(processx::run("wc", c("-l", dbfn))$stdout)
# [1] "12 mydata.csv\n"
system.time({
  for (i in 1:100) {
    new_data <-
      if (runif(1) < 0.1) {
        message("forcing a dupe!")
        my_data_base[sample(nrow(my_data_base), 1),]
      } else make_dat(nrow=1, ncol=5)
    res <- maybe_add_row_csv(new_data, dbfn)
    if (!res) message(".. not inserted")
  }
  print(processx::run("wc", c("-l", dbfn))$stdout)
})
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# [1] "97 mydata.csv\n"
#    user  system elapsed 
#   0.265  13.760  13.925 
print(processx::run("wc", c("-l", dbfn))$stdout)
# [1] "97 mydata.csv\n"

This is not fast!那次操作花了14秒,显然反复拨打grep将是一个问题.(请注意,不要try 使用R grep(readLines(..)),在这方面,您正在违背内存不足操作的目的.)

从这里,我们可以使用arrowdplyrsome个数据缓慢地读入内存.

library(arrow)
library(dplyr)
arr <- arrow::read_csv_arrow(dbfn, as_data_frame = FALSE)
arr
# Table
# 96 rows x 6 columns
# $name <string>
# $v1 <bool>
# $v2 <bool>
# $v3 <bool>
# $v4 <bool>
# $v5 <string>
nrow(arr)
# [1] 96
arr %>%
  filter(grepl("a", name)) %>%
  collect()
# # A tibble: 3 × 6
#   name  v1    v2    v3    v4    v5      
#   <chr> <lgl> <lgl> <lgl> <lgl> <chr>   
# 1 a9L   FALSE FALSE FALSE FALSE "TRUE " 
# 2 XYa   TRUE  TRUE  TRUE  FALSE "FALSE "
# 3 xha   TRUE  TRUE  TRUE  FALSE "FALSE "

filter(和mutate等)支持许多R表达式,但绝对不支持多少任意表达式;如果不支持,它会清楚地告诉您,但在这样做时,它可能会try 将其读入内存,因此请注意.一种安全的技术是执行arr %>% head() %>% filter(something_crazy),如果它不能进行直接转换,它将只拉入6行(显然,一旦您验证了表达式可以工作,就删除head().有关这方面的更多信息,请阅读https://arrow.apache.org/docs/2.0/r/articles/dataset.html.

"适当的"数据库

我们可以针对现有数据对新数据执行SQL"反联接",只插入那些尚未出现的行.

library(duckdb) # if you have another, feel free to adapt to that
### you will probably want a non-temp file
tf <- tempfile(fileext = ".duckdb")
db <- DBI::dbConnect(duckdb::duckdb(), tf)

tbl <- "mydata"
tmptbl <- "mydata_temp"
dbWriteTable(db, tbl, my_data_base)
dbWriteTable(db, tmptbl, my_data_base[0,]) # create empty table
dbGetQuery(db, sprintf("select * from %s limit 3", tbl))
#   name    v1    v2    v3    v4    v5
# 1  GNZ FALSE FALSE FALSE  TRUE FALSE
# 2  uCt FALSE FALSE  TRUE FALSE FALSE
# 3  wed  TRUE  TRUE FALSE FALSE  TRUE

"反联接"是一个简单的概念,类似于常规的左联接,但仅当一方(全部)为空时才返回数据.我将演示一个测试,判断是否所有字段都为空,但是如果您知道有一个字段(例如name)在R中永远不会是NA,或者在数据库中永远不会是null,那么您可以将where子句限制为只有一个字段,以提高SQL效率.

cns <- colnames(my_data_base)
antijoin_sql <- 
  sprintf("
    select t1.* 
    from %s t1 left join %s t2 on %s
    where %s", tmptbl, tbl,
    paste(sprintf("t1.%s=t2.%s", cns, cns), collapse = " and "),
    paste(sprintf("t2.%s is null", cns), collapse = " and "))

### clearly this is all duplicated
cns <- colnames(my_data_base)
new_data <- my_data_base[3,]
new_data
#   name   v1   v2    v3    v4   v5
# 3  wed TRUE TRUE FALSE FALSE TRUE
dbWriteTable(db, tmptbl, new_data, append=T)

### change one field in the temp table
dbExecute(db, sprintf("update %s set v1=not v1", tmptbl))
# [1] 1
dbGetQuery(db, antijoin_sql)
#   name    v1   v2    v3    v4   v5
# 1  wed FALSE TRUE FALSE FALSE TRUE

我们将更改antijoin_sql,以便它将未找到的行插入到目标表中.回想一下,tmptbl中数据的当前状态是does not match(因为我们更改了name),所以我们预计第一个maybe_add_row_sql会成功.

maybe_add_row_sql <- function(x, con, tbl, tmptbl) {
  cns <- colnames(x)
  antijoin_sql <- 
    sprintf("
      insert into %s
      select t1.* 
      from %s t1 left join %s t2 on %s
      where %s", tbl, tmptbl, tbl,
    paste(sprintf("t1.%s=t2.%s", cns, cns), collapse = " and "),
    paste(sprintf("t2.%s is null", cns), collapse = " and "))
  dbWriteTable(db, tmptbl, x, append = TRUE)
  on.exit(
    try(dbExecute(db, paste("delete from", tmptbl)), silent = TRUE),
    add = TRUE)
  dbExecute(db, antijoin_sql)
}
dbGetQuery(db, paste("select count(*) as n from", tbl))
#    n
# 1 10
maybe_add_row_sql(my_data_base[1,], db, tbl, tmptbl)
# [1] 1
dbGetQuery(db, paste("select count(*) as n from", tbl))
#    n
# 1 11
maybe_add_row_sql(my_data_base[1,], db, tbl, tmptbl)
# [1] 0
dbGetQuery(db, paste("select count(*) as n from", tbl))
#    n
# 1 11

好的,我们现在可以规模化运作了.

print(dbGetQuery(db, paste("select count(*) as n from", tbl)))
#    n
# 1 11
system.time({
  for (i in 1:100) {
    new_data <-
      if (runif(1) < 0.1) {
        message("forcing a dupe!")
        my_data_base[sample(nrow(my_data_base), 1),]
      } else make_dat(nrow=1, ncol=5)
    res <- maybe_add_row_sql(new_data, db, tbl, tmptbl)
    if (!res) message(".. not inserted")
  }
})
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
#    user  system elapsed 
#   0.780   0.036   0.892 
print(dbGetQuery(db, paste("select count(*) as n from", tbl)))
#     n
# 1 100

快多了.我们可以通过一个快速简单的查询来确认没有欺骗:

dbGetQuery(db, "with cte as (select name, count(*) as n from mydata group by name, v1) select * from cte where n > 1")
# [1] name n   
# <0 rows> (or 0-length row.names)

您可以直接查询您想要的数据,

dbGetQuery(db, "select * from mydata where name like '%a%'")
#   name    v1    v2    v3    v4    v5
# 1  7aE FALSE FALSE  TRUE  TRUE FALSE
# 2  sau  TRUE FALSE  TRUE FALSE  TRUE
# 3  8ah FALSE FALSE FALSE  TRUE FALSE
# 4  oaf  TRUE FALSE FALSE FALSE  TRUE
# 5  Ya3  TRUE  TRUE FALSE FALSE FALSE

如果您愿意,您仍然可以使用dplyr:

tbl(db, tbl) %>%
  filter(grepl("a", name)) %>%
  collect()
# # A tibble: 5 × 6
#   name  v1    v2    v3    v4    v5   
#   <chr> <lgl> <lgl> <lgl> <lgl> <lgl>
# 1 7aE   FALSE FALSE TRUE  TRUE  FALSE
# 2 sau   TRUE  FALSE TRUE  FALSE TRUE 
# 3 8ah   FALSE FALSE FALSE TRUE  FALSE
# 4 oaf   TRUE  FALSE FALSE FALSE TRUE 
# 5 Ya3   TRUE  TRUE  FALSE FALSE FALSE

还没有完全准备好进入记忆.有关dplyr和数据库的更多信息,请参见https://solutions.posit.co/connections/db/r-packages/dplyr/.

不要止步于此.我们逐行执行此操作,但如果您一次创建多个行,则无需更改即可使用.

new_data <- rbind(make_dat(nrow=10, ncol=5), my_data_base[1:2,])
dbGetQuery(db, paste("select count(*) as n from", tbl))
#     n
# 1 100
maybe_add_row_sql(new_data, db, tbl, tmptbl)
# [1] 10
dbGetQuery(db, paste("select count(*) as n from", tbl))
#     n
# 1 110

请注意,new_data有12行(具有2个已知的副本),并且只添加了10个新行.

R相关问答推荐

给定R中另一行中的值,如何插补缺失值

使用lapply的重新定位功能

更改Heatmap Annotation对象的名称

保存包含循环和ifelse的函数的输出

如何得到R中唯一的组合群?

在R中无法读入具有Readxl和lApply的数据集

对于变量的每个值,仅 Select 包含列表中所有值的值.R

如果可能,将数字列转换为整数,否则保留为数字

矩阵的堆叠条形图,条形图上有数字作为标签

在R gggplot2中是否有一种方法将绘图轴转换成连续的 colored颜色 尺度?

如何对2个列表元素的所有组合进行操作?

使用带有OR条件的grepl过滤字符串

按多列统计频次

将项粘贴到向量中,并将它们分组为x的倍数,用空格分隔

如何在R中改变fviz_pca_biplot中圆的边界线的 colored颜色 ?

远离理论值的伽马密度曲线下面积的近似

用多边形替换地块点

将美学添加到ggploy中的文本标签

如何将字符类对象中的数据转换为R中的字符串

如何从矩阵绘制环弦图