Skip to content

Commit

Permalink
fix: upsert update action
Browse files Browse the repository at this point in the history
  • Loading branch information
Basim108 committed May 13, 2021
1 parent a11dbd9 commit 28b2415
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public IList<SqlCommandBuilderResult> Generate<TEntity>(ICollection<TEntity> ele
continue;
if (!propInfo.IsPartOfUniqueConstraint) {
var upsertSetDelimiter = firstUpdateSetColumn ? " do update set " : ", ";
upsertClause += $"{upsertSetDelimiter}\"{propInfo.DbColumnName}\" = {entityProfile.TableName}.\"{propInfo.DbColumnName}\"";
upsertClause += $"{upsertSetDelimiter}\"{propInfo.DbColumnName}\" = excluded.\"{propInfo.DbColumnName}\"";
firstUpdateSetColumn = false;
}
var delimiter = firstColumn ? "" : ", ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,21 @@ public async Task Should_insert_nullable()
await using var connection = new NpgsqlConnection(_configuration.ConnectionString);
await _testService.UpsertAsync(connection, elements, CancellationToken.None);

Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(127, elements[0].Value);
Assert.IsNull(elements[0].NullableValue);
var query = "select * from unit_tests.entity_with_unique_columns;";
using var command = new NpgsqlCommand(query, connection);
using (var reader = command.ExecuteReader()) {
var count = 0;
while (reader.Read()) {
count++;
Assert.AreEqual(1, count);
Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(127, elements[0].Value);
Assert.IsNull(elements[0].NullableValue);
}
await reader.CloseAsync();
}
}

[Test]
Expand All @@ -80,12 +90,22 @@ public async Task Should_update_nullable()
elements.First().NullableValue = null; // this item should be updated
// This item should be inserted
await _testService.UpsertAsync(connection, elements, CancellationToken.None);

Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(127, elements[0].Value);
Assert.IsNull(elements[0].NullableValue);

var query = "select * from unit_tests.entity_with_unique_columns;";
using var command = new NpgsqlCommand(query, connection);
using (var reader = command.ExecuteReader()) {
var count = 0;
while (reader.Read()) {
count++;
Assert.AreEqual(1, count);
Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(127, elements[0].Value);
Assert.IsNull(elements[0].NullableValue);
}
await reader.CloseAsync();
}
}

[Test]
Expand All @@ -102,17 +122,30 @@ public async Task Should_insert_and_update()
elements.Add(new TestEntity {RecordId = "rec-02", SensorId = "sens-01", Value = 1});
await _testService.UpsertAsync(connection, elements, CancellationToken.None);

Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(128, elements[0].Value);

// id = 3 because upsert calls a sequence then has a constraint violation and start doing update.
// Then it processes the second element and again calls sequence so gets 3;
Assert.AreEqual(3, elements[1].Id);
Assert.AreEqual("rec-02", elements[1].RecordId);
Assert.AreEqual("sens-01", elements[1].SensorId);
Assert.AreEqual(1, elements[1].Value);
var query = "select * from unit_tests.entity_with_unique_columns order by id;";
using var command = new NpgsqlCommand(query, connection);
using (var reader = command.ExecuteReader()) {
var count = 0;
while (reader.Read()) {
count++;
if (count == 1) {
Assert.AreEqual(1, elements[0].Id);
Assert.AreEqual("rec-01", elements[0].RecordId);
Assert.AreEqual("sens-01", elements[0].SensorId);
Assert.AreEqual(128, elements[0].Value);
}
else if (count == 2) {
// id = 3 because upsert calls a sequence then has a constraint violation and start doing update.
// Then it processes the second element and again calls sequence so gets 3;
Assert.AreEqual(3, elements[1].Id);
Assert.AreEqual("rec-02", elements[1].RecordId);
Assert.AreEqual("sens-01", elements[1].SensorId);
Assert.AreEqual(1, elements[1].Value);
}
}
await reader.CloseAsync();
Assert.AreEqual(2, count);
}
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Hrimsoft.SqlBulk.PostgreSql.IntegrationTests.TestModels;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using Npgsql;
using NUnit.Framework;

namespace Hrimsoft.SqlBulk.PostgreSql.IntegrationTests.BulkUpsert
{
public class CompositePkTests
{
private readonly TestConfiguration _configuration;

private IPostgreSqlBulkService _testService;
private BulkServiceOptions _bulkServiceOptions;

public CompositePkTests()
{
// As upsert command was implemented only in postgre version of 9.5+
_configuration = new TestConfiguration("Postgres_higher_than_9_4");
}

[SetUp]
public async Task SetUp()
{
var truncateTableCmd = "truncate \"unit_tests\".\"entity_with_composite_pk\";";

await using var connection = new NpgsqlConnection(_configuration.ConnectionString);
await using var command = new NpgsqlCommand(truncateTableCmd, connection);
await connection.OpenAsync();
await command.ExecuteNonQueryAsync();

_bulkServiceOptions = new BulkServiceOptions();
_bulkServiceOptions.AddEntityProfile<TestEntityWithCompositePk>(new EntityWithCompositePkProfile());

var insertCommandBuilder = new InsertSqlCommandBuilder(NullLoggerFactory.Instance);
var deleteCommandBuilder = new Mock<IDeleteSqlCommandBuilder>().Object;
var updateCommandBuilder = new Mock<IUpdateSqlCommandBuilder>().Object;
var upsertCommandBuilder = new UpsertSqlCommandBuilder(NullLoggerFactory.Instance);

_testService = new NpgsqlCommandsBulkService(
_bulkServiceOptions,
NullLoggerFactory.Instance,
insertCommandBuilder,
updateCommandBuilder,
deleteCommandBuilder,
upsertCommandBuilder);
}

[Test]
public async Task Should_insert()
{
var elements = new List<TestEntityWithCompositePk>
{
new TestEntityWithCompositePk {UserId = 1, Column2 = 2, Column3 = 3},
};
await using var connection = new NpgsqlConnection(_configuration.ConnectionString);
await _testService.UpsertAsync(connection, elements, CancellationToken.None);
var query = "select * from unit_tests.entity_with_composite_pk;";
using var command = new NpgsqlCommand(query, connection);
using (var reader = command.ExecuteReader()) {
var count = 0;
while (reader.Read()) {
count++;
Assert.AreEqual(1, count);
Assert.AreEqual(1, (int)reader["user_id"]);
Assert.AreEqual(2, (int)reader["column2"]);
Assert.AreEqual(3, (int)reader["column3"]);
}
await reader.CloseAsync();
}
}

[Test]
public async Task Should_update()
{
var elements = new List<TestEntityWithCompositePk>
{
new TestEntityWithCompositePk {UserId = 1, Column2 = 2, Column3 = 5},
};
await using var connection = new NpgsqlConnection(_configuration.ConnectionString);
await _testService.InsertAsync(connection, elements, CancellationToken.None);
elements[0].Column3 = 10; // this item should be updated
// This item should be inserted
await _testService.UpsertAsync(connection, elements, CancellationToken.None);

var query = "select * from unit_tests.entity_with_composite_pk;";
using var command = new NpgsqlCommand(query, connection);
using (var reader = command.ExecuteReader()) {
var count = 0;
while (reader.Read()) {
count++;
Assert.AreEqual(1, count);
Assert.AreEqual(1, (int)reader["user_id"]);
Assert.AreEqual(2, (int)reader["column2"]);
Assert.AreEqual(10, (int)reader["column3"]);
}
await reader.CloseAsync();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Hrimsoft.SqlBulk.PostgreSql.IntegrationTests.TestModels
{
public class EntityWithCompositePkProfile : EntityProfile
{
public EntityWithCompositePkProfile(int maximumSentElements = 0)
: base(typeof(TestEntityWithCompositePk))
{
this.MaximumSentElements = maximumSentElements;

this.ToTable("entity_with_composite_pk", "unit_tests");
this.HasUniqueConstraint("PK_entity_with_composite_pk");
this.HasPropertyAsPartOfUniqueConstraint<TestEntityWithCompositePk, int>(entity => entity.UserId)
.ThatIsPrivateKey();
this.HasPropertyAsPartOfUniqueConstraint<TestEntityWithCompositePk, int>(entity => entity.Column2)
.ThatIsPrivateKey();
this.HasProperty<TestEntityWithCompositePk, int>(entity => entity.Column3);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Hrimsoft.SqlBulk.PostgreSql.IntegrationTests.TestModels
{
public class TestEntityWithCompositePk
{
public int UserId { get; set; }
public int Column2 { get; set; }
public int Column3 { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,13 @@ create table "unit_tests"."bulk_test_entity"
sensor_id text,
value integer not null,
nullable_int integer
);

drop table if exists "unit_tests"."entity_with_composite_pk";
create table "unit_tests"."entity_with_composite_pk"
(
user_id integer not null,
column2 integer not null,
column3 integer not null,
CONSTRAINT "PK_entity_with_composite_pk" PRIMARY KEY (user_id, column2)
);
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ namespace Hrimsoft.SqlBulk.PostgreSql.Tests.UpsertSqlCommandBuilderService
public class UpsertConsts
{
public const string UPSERT_PATTERN =
"insert\\s+into\\s+(\"\\w+\".)?\"\\w+\"\\s*\\(\\s*\"\\w+\"(,\\s*\"\\w+\")*\\s*\\)\\s*values\\s*\\(\\s*(\\s*\\d+\\s*,\\s*)*\\s*@param_\\w+_\\d+(,\\s*@param_\\w+_\\d+)*\\s*(,\\s*\\d+)*\\s*\\)\\s*(,\\s*\\(\\s*@param_\\w+_\\d+(,\\s*@param_\\w+_\\d+)*\\s*(,\\s*\\d+)*\\s*\\))*\\s*on\\s+conflict\\s+(\\(\\s*\"\\w+\"\\s*\\)|on\\s+constraint\\s+\"\\w+\")\\s+do\\s+update\\s+set\\s+\"\\w+\"\\s*=\\s*(\"\\w+\".)?\"\\w+\".\"\\w+\"\\s*(,\\s*\"\\w+\"\\s*=\\s*(\"\\w+\".)?\"\\w+\".\"\\w+\")*\\s*(returning\\s+\"\\w+\"\\s*(,\\s*\"\\w+\")*)?;";
"insert\\s+into\\s+(\"\\w+\".)?\"\\w+\"\\s*\\(\\s*\"\\w+\"(,\\s*\"\\w+\")*\\s*\\)\\s*values\\s*\\(\\s*(\\s*\\d+\\s*,\\s*)*\\s*@param_\\w+_\\d+(,\\s*@param_\\w+_\\d+)*\\s*(,\\s*\\d+)*\\s*\\)\\s*(,\\s*\\(\\s*@param_\\w+_\\d+(,\\s*@param_\\w+_\\d+)*\\s*(,\\s*\\d+)*\\s*\\))*\\s*on\\s+conflict\\s+(\\(\\s*\"\\w+\"\\s*\\)|on\\s+constraint\\s+\"\\w+\")\\s+do\\s+update\\s+set\\s+\"\\w+\"\\s*=\\s*excluded.\"\\w+\"\\s*(,\\s*\"\\w+\"\\s*=\\s*excluded.\"\\w+\")*\\s*(returning\\s+\"\\w+\"\\s*(,\\s*\"\\w+\")*)?;";
}
}

0 comments on commit 28b2415

Please sign in to comment.