我在下面定义了两个表,supplier_balances
和supplier_balance_items
(顺便说一句,两者之间有1[supplier_balance]:N[supplier_balance_items]
个关系):
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balances (
/* id is here for joining purposes with items table, instead of joining with the 4 columns used for sake
of making sure a record is deemed as unique */
id bigserial NOT NULL,
accounting_document text NOT NULL,
accounting_document_type text NOT NULL,
company_code text NOT NULL,
document_date_year int4 NOT NULL,
accounting_doc_created_by_user text,
accounting_clerk text,
assignment_reference text,
document_reference_id text,
original_reference_document text,
payment_terms text,
supplier text,
supplier_name text,
document_date timestamp,
posting_date timestamp,
net_due_date timestamp,
created_on timestamp default NULL,
modified_on timestamp default NULL,
pushed_on timestamp default NULL,
is_modified bool GENERATED ALWAYS AS (modified_on IS NOT NULL AND modified_on > created_on) STORED,
is_pushed bool GENERATED ALWAYS AS (pushed_on IS NOT NULL AND pushed_on > modified_on) STORED,
CONSTRAINT supplier_balances_pkey PRIMARY KEY (id),
/* accounting_document being the field of the composite unique index -> faster querying */
CONSTRAINT supplier_balances_unique UNIQUE (
accounting_document,
accounting_document_type,
company_code,
document_date_year)
);
/* Creating other indexes for querying of those as well */
CREATE INDEX IF NOT EXISTS supplier_balances_accounting_document_type_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (accounting_document_type);
CREATE INDEX IF NOT EXISTS supplier_balances_company_code_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (company_code);
CREATE INDEX IF NOT EXISTS supplier_balances_document_date_year_idx
ON sch_brand_payment_data_lake_proxy.supplier_balances (document_date_year);
CREATE TABLE IF NOT EXISTS sch_brand_payment_data_lake_proxy.supplier_balance_items
(
supplier_balance_id bigserial NOT NULL,
posting_view_item text NOT NULL,
posting_key text,
amount_in_company_code_currency numeric,
amount_in_transaction_currency numeric,
cash_discount_1_percent numeric,
cash_discount_amount numeric,
clearing_accounting_document text,
document_item_text text,
gl_account text,
is_cleared bool,
clearing_date timestamp,
due_calculation_base_date timestamp,
/* uniqueness is basically the posting_view_item for a given supplier balance */
CONSTRAINT supplier_balance_items_pkey PRIMARY KEY (supplier_balance_id, posting_view_item),
/* 1(supplier balance):N(supplier balance items) */
CONSTRAINT supplier_balance_items_fkey FOREIGN KEY (supplier_balance_id)
REFERENCES sch_brand_payment_data_lake_proxy.supplier_balances (id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
注意:为了简单起见,我只是填充了不能为NULL
的列.
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
VALUES
('A', 'B', 'C', 0),
('A', 'B', 'C', 1),
('A', 'B', 'C', 2),
('A', 'B', 'C', 3),
('A', 'B', 'C', 4),
('A', 'B', 'C', 5)
RETURNING id;
输出:
id |
---|
1 |
2 |
3 |
4 |
5 |
6 |
INSERT INTO
sch_brand_payment_data_lake_proxy.supplier_balance_items
(supplier_balance_id, posting_view_item)
VALUES
(1, 'A'),
(1, 'B'),
(3, 'A'),
(3, 'B'),
(2, 'A'),
(1, 'C');
SELECT
accounting_document,
accounting_document_type,
company_code,
document_date_year
FROM sch_brand_payment_data_lake_proxy.supplier_balances;
输出:
id | accounting_document | accounting_document_type | company_code | document_date_year |
---|---|---|---|---|
1 | A | B | C | 0 |
2 | A | B | C | 1 |
3 | A | B | C | 2 |
4 | A | B | C | 3 |
5 | A | B | C | 4 |
6 | A | B | C | 5 |
SELECT
supplier_balance_id,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balance_items;
输出:
supplier_balance_id | posting_view_item |
---|---|
1 | A |
1 | B |
3 | A |
3 | B |
2 | A |
1 | C |
现在,如果我们想在连接中 Select 多个值,我们可以在原始SQL中执行:
SELECT
id,
accounting_document,
accounting_document_type,
company_code,
document_date_year,
posting_view_item
FROM sch_brand_payment_data_lake_proxy.supplier_balances
LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items
ON supplier_balances.id = supplier_balance_items.supplier_balance_id
WHERE (accounting_document, accounting_document_type, company_code, document_date_year)
IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2))
输出:
id | accounting_document | accounting_document_type | company_code | document_date_year | posting_view_item |
---|---|---|---|---|---|
2 | A | B | C | 1 | A |
3 | A | B | C | 2 | A |
https://github.com/npgsql/npgsql/issues/1199
现在,在C#中使用npgsql时,重现上面的查询是一个简单的壮举:
using System.Data;
using Npgsql;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (('A', 'B', 'C', 1), ('A', 'B', 'C', 2));";
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
正如预期的那样:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
现在,我想要实现的是,我希望使用一个NpgSqlParameter
,它有一个值集合(即,对于每一列),而不是为(('A', 'B', 'C', 1), ('A', 'B', 'C', 2));
传递一个原始字符串.
所以我修改了上面的C#代码段,并添加了参数
// ...
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN @values;";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Array;
parameter.NpgsqlValue = new object[,]
{
{ "A", "B", "C", 1 },
{ "A", "B", "C", 2 }
};
// Note: the same kind of issue arises when using tuples, i.e.
// ( "A", "B", "C", 1 )
// ( "A", "B", "C", 2 )
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
// ...
然后我得到了一个例外:
Unhandled exception. System.ArgumentOutOfRangeException: Cannot set NpgsqlDbType to just Array, Binary-Or with the element type (e.g. Array of Box is NpgsqlDbType.Array | Npg
sqlDbType.Box). (Parameter 'value')
at Npgsql.NpgsqlParameter.set_NpgsqlDbType(NpgsqlDbType value)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 25
然后我试着用以下方法来解决这个错误:
parameter.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Unknown;
但另一个例外是:
Unhandled exception. System.ArgumentException: No array type could be found in the database for element .<unknown>
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByNpgsqlDbType(NpgsqlDbType npgsqlDbType)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 32
似乎出于某种原因需要注册类型,实际上如果我没有指定类型:
Unhandled exception. System.NotSupportedException: The CLR type System.Object isn't natively supported by Npgsql or your PostgreSQL. To use it with a PostgreSQL composite
you need to specify DataTypeName or to map it, please refer to the documentation.
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.TypeMapping.ConnectorTypeMapper.ResolveByClrType(Type type)
at Npgsql.NpgsqlParameter.ResolveHandler(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameter.Bind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlParameterCollection.ValidateAndBind(ConnectorTypeMapper typeMapper)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior, Boolean async, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(CommandBehavior behavior)
at Program.<Main>$(String[] args) in C:\Users\natalie-perret\Desktop\Personal\playground\csharp\CSharpPlayground\Program.cs:line 31
[编辑]
我最终得到的临时解决方案是依赖jsonb支持,尤其是jsonb_to_recordset
函数(参见PostgreSQL documentation section about json functions):
using System.Data;
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;
var connectionStringBuilder = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Port = 5432,
Username = "brand_payment_migration",
Password = "secret",
Database = "brand_payment"
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ToString());
connection.Open();
using var command = connection.CreateCommand();
command.CommandText =
"SELECT id, accounting_document, accounting_document_type, company_code, document_date_year, posting_view_item " +
"FROM sch_brand_payment_data_lake_proxy.supplier_balances " +
"LEFT OUTER JOIN sch_brand_payment_data_lake_proxy.supplier_balance_items " +
"ON supplier_balances.id = supplier_balance_items.supplier_balance_id " +
"WHERE (accounting_document, accounting_document_type, company_code, document_date_year) " +
"IN (SELECT * FROM jsonb_to_recordset(@values) " +
"AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer));";
var parameter = command.CreateParameter();
parameter.ParameterName = "@values";
parameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
parameter.NpgsqlValue = JsonSerializer.Serialize(new []
{
new Params("A", "B", "C", 1),
new Params("A", "B", "C", 2)
});
command.Parameters.Add(parameter);
using var reader = command.ExecuteReader();
using var dataTable = new DataTable();
dataTable.Load(reader);
var cols = dataTable.Columns.Cast<DataColumn>().ToArray();
Console.WriteLine(string.Join(Environment.NewLine, cols.Select((x, i) => $"Col{i} = {x}")));
Console.WriteLine(string.Join("\t", cols.Select((_, i) => $"Col{i}")));
foreach (var dataRow in dataTable.Rows.Cast<DataRow>())
{
Console.WriteLine(string.Join("\t", dataRow.ItemArray));
}
public Params(
string accounting_document,
string accounting_document_type,
string company_code,
int document_date_year);
输出:
Col0 = id
Col1 = accounting_document
Col2 = accounting_document_type
Col3 = company_code
Col4 = document_date_year
Col5 = posting_view_item
Col0 Col1 Col2 Col3 Col4 Col5
2 A B C 1 A
3 A B C 2 A
3 A B C 2 B
但这是以在传递参数时增加json序列化的额外步骤为代价的.除此之外,构建一个非常长的字符串,我有点困惑于这样一个事实:没有额外的步骤,无法直接将实际值传递给属性.
[编辑2]
[编辑3]
同样的jsonb"技巧"也可以用于输入数据(尽管我已经在上面提到过同样的问题):
INSERT INTO sch_brand_payment_data_lake_proxy.supplier_balances
(accounting_document, accounting_document_type, company_code, document_date_year)
SELECT * FROM jsonb_to_recordset(
'[{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":1},
{"accounting_document":"E","accounting_document_type":"B","company_code":"C","document_date_year":2}]'::jsonb)
AS params (accounting_document text, accounting_document_type text, company_code text, document_date_year integer)
RETURNING id;