diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 632398d1431b07d80bdac25d0354084db9bf16a2..d3463231eb5c420885c05712b80e629cbca11138 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -47,9 +47,10 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } testcases := []struct { - desc string - primaryFails bool - nodes []node + desc string + primaryFails bool + nodes []node + expectedRequestFinalizerErrorMessage string }{ { desc: "successful vote should not create replication jobs", @@ -109,11 +110,12 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, }, { - desc: "secondaries should not participate when primary's generation is unknown", + desc: "write is not acknowledged if it only targets outdated nodes", nodes: []node{ - {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: 0}, - {shouldParticipate: false, shouldGetRepl: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + {primary: true, subtransactions: subtransactions{{vote: "foobar", shouldSucceed: true}}, shouldParticipate: true, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, + {shouldParticipate: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown}, }, + expectedRequestFinalizerErrorMessage: "increment generation: write to outdated nodes", }, { // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong @@ -284,7 +286,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } err = streamParams.RequestFinalizer() - require.NoError(t, err) + if tc.expectedRequestFinalizerErrorMessage != "" { + require.EqualError(t, err, tc.expectedRequestFinalizerErrorMessage) + } else { + require.NoError(t, err) + } // Nodes that successfully committed should have their generations incremented. // Nodes that did not successfully commit or did not participate should remain on their diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 9248e9b2a9fc0e8c5c8a9c77ddbbbf19702733e6..0b53c344491fd133e5ef0e2030bc18817abc8a90 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -215,14 +215,10 @@ SELECT func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { const q = ` WITH repository AS ( - INSERT INTO repositories ( - virtual_storage, - relative_path, - generation - ) VALUES ($1, $2, $4) - ON CONFLICT (virtual_storage, relative_path) DO - UPDATE SET generation = EXCLUDED.generation - WHERE COALESCE(repositories.generation, -1) < EXCLUDED.generation + UPDATE repositories SET generation = $4 + WHERE virtual_storage = $1 + AND relative_path = $2 + AND COALESCE(repositories.generation, -1) < $4 ) INSERT INTO storage_repositories (