Complete rewrite.个
我相信你正在try 将唯一的行插入到数据集中.我的回答的前一个版本做到了这一点,但都是在内存中.在R中,当谈到内存不足的数据集时,有几种方法可以工作,但没有多少方法可以重复添加数据.
前面:对于R和数据库,列名为1
、2
等都是evil.许多与DBI
相关的工具不能很好地防止列名为1
,因此我strongly敦促您重新考虑这些名称.在这个演示中,我将只使用v1
、v2
等.
因此,函数和初始数据将为:
set.seed(1)
make_dat <- function(nrow=1, ncol=5) {
out <- cbind.data.frame(name=stringi::stri_rand_strings(n=nrow, length=3),
matrix(data=sample(c(T, F), nrow*ncol, T), ncol=ncol))
colnames(out)[-1] <- paste0("v", colnames(out)[-1])
out
}
my_data_base <- make_dat(nrow=10, ncol=5)
head(my_data_base, 2)
# name v1 v2 v3 v4 v5
# 1 GNZ FALSE FALSE FALSE TRUE FALSE
# 2 uCt FALSE FALSE TRUE FALSE FALSE
我有两个快速的方法:(1)一个"合适的"数据库;(2)CSV判断/附加黑客.
我能想到的最大区别是:
- CSV黑客攻击方法一次只能在一行上工作,因此您的
nrow=
参数可能需要硬编码为1,否则您将迭代new_data
中的每一行.这将使它变得更慢.
- 如果您在某个时候说"即使列XYZ相同,也要插入",那么CSV黑客不容易允许这样做(尽管它可能会进一步被黑客攻击,但我不建议这样做).
CSV黑客攻击
这是一个小技巧,但如果你不是在寻找正式的东西,这可能会很好用.为此,您可以将最初的my_data_base
写入CSV,然后只有当grep -q
(命令行)指示该行不存在时,每行才可以是appended(header=FALSE
,只是数据)到文件.
Once you want to _use_ the data, I'd go with `arrow::read_csv_arrow(.., as_data_frame=FALSE)` and `dplyr` processing, which allows lazy filtering/querying with a simple `%>% collect()` at the end where filtered data is finally brought into memory.
两个命令:"check"和"append".
对于"check",我们将使用write.table
来创建该行应该是什么样子,并使用grep -q
来针对文件创建它,如果找到它,则返回状态代码0
,如果没有找到,则返回1
.
对于"append",我们将采用write.table
创建的文本并将其附加到文件中.
概念和组件的演示:
### works on only one row at a time, though it can
### be modified/extended to work on multiple rows at
### the expense of performance/efficiency
maybe_add_row <- function(x, file) {
### CHECK
txt <- capture.output(write.table(x, row.names=FALSE, col.names=FALSE, sep=","))
res <- processx::run("grep", args = c("-q", txt, file), error_on_status = FALSE)
if (res$status == 1) {
### APPEND
cat(txt, "\n", file = file, append = TRUE)
TRUE
} else FALSE
}
### initial file creation
dbfn <- "mydata.csv"
write.csv(my_data_base, dbfn, row.names=FALSE)
processx::run("wc", c("-l", dbfn))$stdout # verify number of lines in the file
# [1] "11 mydata.csv\n"
new_data <- my_data_base[3,] # will be the same, should not be inserted
maybe_add_row_csv(new_data, dbfn) # should not be added
# [1] FALSE
processx::run("wc", c("-l", dbfn))$stdout
# [1] "11 mydata.csv\n"
new_data$v1 <- (!new_data$v1) # will be different, should be inserted
maybe_add_row_csv(new_data, dbfn) # should be added
# [1] TRUE
processx::run("wc", c("-l", dbfn))$stdout
# [1] "12 mydata.csv\n"
现在我们已经验证了这一点,我们可以迭代并使用它.在这个循环中,我将添加一个runif(1)
来随机 Select 放入一个已知的已有文件,只是为了证明这一点.
print(processx::run("wc", c("-l", dbfn))$stdout)
# [1] "12 mydata.csv\n"
system.time({
for (i in 1:100) {
new_data <-
if (runif(1) < 0.1) {
message("forcing a dupe!")
my_data_base[sample(nrow(my_data_base), 1),]
} else make_dat(nrow=1, ncol=5)
res <- maybe_add_row_csv(new_data, dbfn)
if (!res) message(".. not inserted")
}
print(processx::run("wc", c("-l", dbfn))$stdout)
})
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# [1] "97 mydata.csv\n"
# user system elapsed
# 0.265 13.760 13.925
print(processx::run("wc", c("-l", dbfn))$stdout)
# [1] "97 mydata.csv\n"
This is not fast!那次操作花了14秒,显然反复拨打grep
将是一个问题.(请注意,不要try 使用R grep(readLines(..))
,在这方面,您正在违背内存不足操作的目的.)
从这里,我们可以使用arrow
和dplyr
将some个数据缓慢地读入内存.
library(arrow)
library(dplyr)
arr <- arrow::read_csv_arrow(dbfn, as_data_frame = FALSE)
arr
# Table
# 96 rows x 6 columns
# $name <string>
# $v1 <bool>
# $v2 <bool>
# $v3 <bool>
# $v4 <bool>
# $v5 <string>
nrow(arr)
# [1] 96
arr %>%
filter(grepl("a", name)) %>%
collect()
# # A tibble: 3 × 6
# name v1 v2 v3 v4 v5
# <chr> <lgl> <lgl> <lgl> <lgl> <chr>
# 1 a9L FALSE FALSE FALSE FALSE "TRUE "
# 2 XYa TRUE TRUE TRUE FALSE "FALSE "
# 3 xha TRUE TRUE TRUE FALSE "FALSE "
filter
(和mutate
等)支持许多R表达式,但绝对不支持多少任意表达式;如果不支持,它会清楚地告诉您,但在这样做时,它可能会try 将其读入内存,因此请注意.一种安全的技术是执行arr %>% head() %>% filter(something_crazy)
,如果它不能进行直接转换,它将只拉入6行(显然,一旦您验证了表达式可以工作,就删除head()
.有关这方面的更多信息,请阅读https://arrow.apache.org/docs/2.0/r/articles/dataset.html.
"适当的"数据库
我们可以针对现有数据对新数据执行SQL"反联接",只插入那些尚未出现的行.
library(duckdb) # if you have another, feel free to adapt to that
### you will probably want a non-temp file
tf <- tempfile(fileext = ".duckdb")
db <- DBI::dbConnect(duckdb::duckdb(), tf)
tbl <- "mydata"
tmptbl <- "mydata_temp"
dbWriteTable(db, tbl, my_data_base)
dbWriteTable(db, tmptbl, my_data_base[0,]) # create empty table
dbGetQuery(db, sprintf("select * from %s limit 3", tbl))
# name v1 v2 v3 v4 v5
# 1 GNZ FALSE FALSE FALSE TRUE FALSE
# 2 uCt FALSE FALSE TRUE FALSE FALSE
# 3 wed TRUE TRUE FALSE FALSE TRUE
"反联接"是一个简单的概念,类似于常规的左联接,但仅当一方(全部)为空时才返回数据.我将演示一个测试,判断是否所有字段都为空,但是如果您知道有一个字段(例如name
)在R中永远不会是NA
,或者在数据库中永远不会是null
,那么您可以将where
子句限制为只有一个字段,以提高SQL效率.
cns <- colnames(my_data_base)
antijoin_sql <-
sprintf("
select t1.*
from %s t1 left join %s t2 on %s
where %s", tmptbl, tbl,
paste(sprintf("t1.%s=t2.%s", cns, cns), collapse = " and "),
paste(sprintf("t2.%s is null", cns), collapse = " and "))
### clearly this is all duplicated
cns <- colnames(my_data_base)
new_data <- my_data_base[3,]
new_data
# name v1 v2 v3 v4 v5
# 3 wed TRUE TRUE FALSE FALSE TRUE
dbWriteTable(db, tmptbl, new_data, append=T)
### change one field in the temp table
dbExecute(db, sprintf("update %s set v1=not v1", tmptbl))
# [1] 1
dbGetQuery(db, antijoin_sql)
# name v1 v2 v3 v4 v5
# 1 wed FALSE TRUE FALSE FALSE TRUE
我们将更改antijoin_sql
,以便它将未找到的行插入到目标表中.回想一下,tmptbl
中数据的当前状态是does not match(因为我们更改了name
),所以我们预计第一个maybe_add_row_sql
会成功.
maybe_add_row_sql <- function(x, con, tbl, tmptbl) {
cns <- colnames(x)
antijoin_sql <-
sprintf("
insert into %s
select t1.*
from %s t1 left join %s t2 on %s
where %s", tbl, tmptbl, tbl,
paste(sprintf("t1.%s=t2.%s", cns, cns), collapse = " and "),
paste(sprintf("t2.%s is null", cns), collapse = " and "))
dbWriteTable(db, tmptbl, x, append = TRUE)
on.exit(
try(dbExecute(db, paste("delete from", tmptbl)), silent = TRUE),
add = TRUE)
dbExecute(db, antijoin_sql)
}
dbGetQuery(db, paste("select count(*) as n from", tbl))
# n
# 1 10
maybe_add_row_sql(my_data_base[1,], db, tbl, tmptbl)
# [1] 1
dbGetQuery(db, paste("select count(*) as n from", tbl))
# n
# 1 11
maybe_add_row_sql(my_data_base[1,], db, tbl, tmptbl)
# [1] 0
dbGetQuery(db, paste("select count(*) as n from", tbl))
# n
# 1 11
好的,我们现在可以规模化运作了.
print(dbGetQuery(db, paste("select count(*) as n from", tbl)))
# n
# 1 11
system.time({
for (i in 1:100) {
new_data <-
if (runif(1) < 0.1) {
message("forcing a dupe!")
my_data_base[sample(nrow(my_data_base), 1),]
} else make_dat(nrow=1, ncol=5)
res <- maybe_add_row_sql(new_data, db, tbl, tmptbl)
if (!res) message(".. not inserted")
}
})
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# forcing a dupe!
# .. not inserted
# user system elapsed
# 0.780 0.036 0.892
print(dbGetQuery(db, paste("select count(*) as n from", tbl)))
# n
# 1 100
快多了.我们可以通过一个快速简单的查询来确认没有欺骗:
dbGetQuery(db, "with cte as (select name, count(*) as n from mydata group by name, v1) select * from cte where n > 1")
# [1] name n
# <0 rows> (or 0-length row.names)
您可以直接查询您想要的数据,
dbGetQuery(db, "select * from mydata where name like '%a%'")
# name v1 v2 v3 v4 v5
# 1 7aE FALSE FALSE TRUE TRUE FALSE
# 2 sau TRUE FALSE TRUE FALSE TRUE
# 3 8ah FALSE FALSE FALSE TRUE FALSE
# 4 oaf TRUE FALSE FALSE FALSE TRUE
# 5 Ya3 TRUE TRUE FALSE FALSE FALSE
如果您愿意,您仍然可以使用dplyr
:
tbl(db, tbl) %>%
filter(grepl("a", name)) %>%
collect()
# # A tibble: 5 × 6
# name v1 v2 v3 v4 v5
# <chr> <lgl> <lgl> <lgl> <lgl> <lgl>
# 1 7aE FALSE FALSE TRUE TRUE FALSE
# 2 sau TRUE FALSE TRUE FALSE TRUE
# 3 8ah FALSE FALSE FALSE TRUE FALSE
# 4 oaf TRUE FALSE FALSE FALSE TRUE
# 5 Ya3 TRUE TRUE FALSE FALSE FALSE
还没有完全准备好进入记忆.有关dplyr
和数据库的更多信息,请参见https://solutions.posit.co/connections/db/r-packages/dplyr/.
不要止步于此.我们逐行执行此操作,但如果您一次创建多个行,则无需更改即可使用.
new_data <- rbind(make_dat(nrow=10, ncol=5), my_data_base[1:2,])
dbGetQuery(db, paste("select count(*) as n from", tbl))
# n
# 1 100
maybe_add_row_sql(new_data, db, tbl, tmptbl)
# [1] 10
dbGetQuery(db, paste("select count(*) as n from", tbl))
# n
# 1 110
请注意,new_data
有12行(具有2个已知的副本),并且只添加了10个新行.