From 95a34c165a56da4b52df0670d726e211f197cbb1 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:09:34 -0700 Subject: [PATCH 01/11] refactor: Clarify database/schema/table results --- results.go | 18 +++++++++++++----- verify.go | 16 ++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/results.go b/results.go index a232cd5c..597def8c 100644 --- a/results.go +++ b/results.go @@ -38,16 +38,24 @@ func NewResults(targetNames []string, testModes []string) *Results { } } -// SingleResult represents the verification result from a single target, with the schema: -// SingleResult[schema][table][mode] = test output. -type SingleResult map[string]map[string]map[string]string +// DatabaseResult represents the verification result from a single target database: +// DatabaseResult[schema][table][mode] = test output. +type DatabaseResult map[string]SchemaResult + +// SchemaResult represents the verification result from a single schema: +// SchemaResult[table][mode] = test output. +type SchemaResult map[string]TableResult + +// TableResult represents the verification result from a single table: +// TableResult[mode] = test output. +type TableResult map[string]string // AddResult adds a SingleResult from a test on a specific target to the Results object. -func (r *Results) AddResult(targetName string, schemaTableHashes SingleResult) { +func (r *Results) AddResult(targetName string, databaseHashes DatabaseResult) { r.mutex.Lock() defer r.mutex.Unlock() - for schema, tables := range schemaTableHashes { + for schema, tables := range databaseHashes { if _, ok := r.content[schema]; !ok { r.content[schema] = make(map[string]map[string]map[string][]string) } diff --git a/verify.go b/verify.go index 84d32eb1..1c3db92f 100644 --- a/verify.go +++ b/verify.go @@ -104,8 +104,8 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p close(done) } -func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (SingleResult, error) { - schemaTableHashes := make(SingleResult) +func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (DatabaseResult, error) { + schemaTableHashes := make(DatabaseResult) rows, err := conn.Query(ctx, buildGetTablesQuery(c.IncludeSchemas, c.ExcludeSchemas, c.IncludeTables, c.ExcludeTables)) if err != nil { @@ -119,10 +119,10 @@ func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (Sing } if _, ok := schemaTableHashes[schema.String]; !ok { - schemaTableHashes[schema.String] = make(map[string]map[string]string) + schemaTableHashes[schema.String] = make(SchemaResult) } - schemaTableHashes[schema.String][table.String] = make(map[string]string) + schemaTableHashes[schema.String][table.String] = make(TableResult) for _, testMode := range c.TestModes { schemaTableHashes[schema.String][table.String][testMode] = defaultErrorOutput @@ -152,8 +152,8 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaTableHashes SingleResult) SingleResult { - for schemaName, tables := range schemaTableHashes { +func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { + for schemaName, tables := range databaseHashes { for tableName := range tables { tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -236,13 +236,13 @@ func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry continue } - schemaTableHashes[schemaName][tableName][testMode] = testOutput + databaseHashes[schemaName][tableName][testMode] = testOutput testLogger.Infof("Hash computed: %s", testOutput) } } } - return schemaTableHashes + return databaseHashes } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 89efdb738103eaac3c8315318e106109cb47ec1a Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:15:48 -0700 Subject: [PATCH 02/11] refactor: Separate database, schema verification --- verify.go | 141 +++++++++++++++++++++++++++++------------------------- 1 file changed, 75 insertions(+), 66 deletions(-) diff --git a/verify.go b/verify.go index 1c3db92f..b42935ef 100644 --- a/verify.go +++ b/verify.go @@ -97,7 +97,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - schemaTableHashes = c.runTestQueriesOnTarget(ctx, logger, conn, schemaTableHashes) + schemaTableHashes = c.runTestQueriesOnDatabase(ctx, logger, conn, schemaTableHashes) finalResults.AddResult(targetName, schemaTableHashes) logger.Info("Table hashes computed") @@ -152,97 +152,106 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTarget(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { - for schemaName, tables := range databaseHashes { - for tableName := range tables { - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) - tableLogger.Info("Computing hash") +func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { + databaseResults := make(DatabaseResult) - rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) - if err != nil { - tableLogger.WithError(err).Error("Failed to query column names, data types") - - continue - } + for schemaName, schemaHashes := range databaseHashes { + schemaResult := c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, schemaHashes) + databaseResults[schemaName] = schemaResult + } - allTableColumns := make(map[string]column) + return databaseResults +} - for rows.Next() { - var columnName, dataType, constraintName, constraintType pgtype.Text +func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { + for tableName := range schemaHashes { + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) + tableLogger.Info("Computing hash") - err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) - if err != nil { - tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) + if err != nil { + tableLogger.WithError(err).Error("Failed to query column names, data types") - continue - } + continue + } - existing, ok := allTableColumns[columnName.String] - if ok { - existing.constraints = append(existing.constraints, constraintType.String) - allTableColumns[columnName.String] = existing - } else { - allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} - } - } + allTableColumns := make(map[string]column) - var tableColumns []column + for rows.Next() { + var columnName, dataType, constraintName, constraintType pgtype.Text - var primaryKeyColumnNames []string + err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) + if err != nil { + tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") - for _, col := range allTableColumns { - if col.IsPrimaryKey() { - primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) - } + continue + } - if c.validColumnTarget(col.name) { - tableColumns = append(tableColumns, col) - } + existing, ok := allTableColumns[columnName.String] + if ok { + existing.constraints = append(existing.constraints, constraintType.String) + allTableColumns[columnName.String] = existing + } else { + allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} } + } - if len(primaryKeyColumnNames) == 0 { - tableLogger.Error("No primary keys found") + var tableColumns []column - continue + var primaryKeyColumnNames []string + + for _, col := range allTableColumns { + if col.IsPrimaryKey() { + primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) } - tableLogger.WithFields(logrus.Fields{ - "primary_keys": primaryKeyColumnNames, - "columns": tableColumns, - }).Info("Determined columns to hash") + if c.validColumnTarget(col.name) { + tableColumns = append(tableColumns, col) + } + } - for _, testMode := range c.TestModes { - testLogger := tableLogger.WithField("test", testMode) + if len(primaryKeyColumnNames) == 0 { + tableLogger.Error("No primary keys found") - var query string + continue + } - switch testMode { - case TestModeFull: - query = buildFullHashQuery(c, schemaName, tableName, tableColumns) - case TestModeBookend: - query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) - case TestModeSparse: - query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) - case TestModeRowCount: - query = buildRowCountQuery(schemaName, tableName) - } + tableLogger.WithFields(logrus.Fields{ + "primary_keys": primaryKeyColumnNames, + "columns": tableColumns, + }).Info("Determined columns to hash") - testLogger.Debugf("Generated query: %s", query) + for _, testMode := range c.TestModes { + testLogger := tableLogger.WithField("test", testMode) + + var query string + + switch testMode { + case TestModeFull: + query = buildFullHashQuery(c, schemaName, tableName, tableColumns) + case TestModeBookend: + query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) + case TestModeSparse: + query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) + case TestModeRowCount: + query = buildRowCountQuery(schemaName, tableName) + } - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") + testLogger.Debugf("Generated query: %s", query) - continue - } + testOutput, err := runTestOnTable(ctx, conn, query) + if err != nil { + testLogger.WithError(err).Error("Failed to compute hash") - databaseHashes[schemaName][tableName][testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + continue } + + schemaHashes[tableName][testMode] = testOutput + testLogger.Infof("Hash computed: %s", testOutput) } } - return databaseHashes + return schemaHashes } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 81258f908f2c9b6be95c9879412a51052a6b19ad Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:21:23 -0700 Subject: [PATCH 03/11] refactor: Separate schema, table verification --- verify.go | 139 +++++++++++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 63 deletions(-) diff --git a/verify.go b/verify.go index b42935ef..187652d2 100644 --- a/verify.go +++ b/verify.go @@ -164,94 +164,107 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { + schemaResults := make(SchemaResult) + for tableName := range schemaHashes { - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) - tableLogger.Info("Computing hash") + tableResult := c.runTestQueriesOnTable(ctx, logger, conn, schemaName, tableName) + if tableResult != nil { + schemaResults[tableName] = tableResult + } + } - rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) - if err != nil { - tableLogger.WithError(err).Error("Failed to query column names, data types") + return schemaResults +} - continue - } +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { + tableResults := make(TableResult) + + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) + tableLogger.Info("Computing hash") - allTableColumns := make(map[string]column) + rows, err := conn.Query(ctx, buildGetColumsQuery(schemaName, tableName)) + if err != nil { + tableLogger.WithError(err).Error("Failed to query column names, data types") - for rows.Next() { - var columnName, dataType, constraintName, constraintType pgtype.Text + return nil + } - err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) - if err != nil { - tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + allTableColumns := make(map[string]column) - continue - } + for rows.Next() { + var columnName, dataType, constraintName, constraintType pgtype.Text - existing, ok := allTableColumns[columnName.String] - if ok { - existing.constraints = append(existing.constraints, constraintType.String) - allTableColumns[columnName.String] = existing - } else { - allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} - } + err := rows.Scan(&columnName, &dataType, &constraintName, &constraintType) + if err != nil { + tableLogger.WithError(err).Error("Failed to parse column names, data types from query response") + + continue } - var tableColumns []column + existing, ok := allTableColumns[columnName.String] + if ok { + existing.constraints = append(existing.constraints, constraintType.String) + allTableColumns[columnName.String] = existing + } else { + allTableColumns[columnName.String] = column{columnName.String, dataType.String, []string{constraintType.String}} + } + } - var primaryKeyColumnNames []string + var tableColumns []column - for _, col := range allTableColumns { - if col.IsPrimaryKey() { - primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) - } + var primaryKeyColumnNames []string - if c.validColumnTarget(col.name) { - tableColumns = append(tableColumns, col) - } + for _, col := range allTableColumns { + if col.IsPrimaryKey() { + primaryKeyColumnNames = append(primaryKeyColumnNames, col.name) } - if len(primaryKeyColumnNames) == 0 { - tableLogger.Error("No primary keys found") - - continue + if c.validColumnTarget(col.name) { + tableColumns = append(tableColumns, col) } + } - tableLogger.WithFields(logrus.Fields{ - "primary_keys": primaryKeyColumnNames, - "columns": tableColumns, - }).Info("Determined columns to hash") + if len(primaryKeyColumnNames) == 0 { + tableLogger.Error("No primary keys found") - for _, testMode := range c.TestModes { - testLogger := tableLogger.WithField("test", testMode) - - var query string - - switch testMode { - case TestModeFull: - query = buildFullHashQuery(c, schemaName, tableName, tableColumns) - case TestModeBookend: - query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) - case TestModeSparse: - query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) - case TestModeRowCount: - query = buildRowCountQuery(schemaName, tableName) - } + return nil + } - testLogger.Debugf("Generated query: %s", query) + tableLogger.WithFields(logrus.Fields{ + "primary_keys": primaryKeyColumnNames, + "columns": tableColumns, + }).Info("Determined columns to hash") + + for _, testMode := range c.TestModes { + testLogger := tableLogger.WithField("test", testMode) + + var query string + + switch testMode { + case TestModeFull: + query = buildFullHashQuery(c, schemaName, tableName, tableColumns) + case TestModeBookend: + query = buildBookendHashQuery(c, schemaName, tableName, tableColumns, c.BookendLimit) + case TestModeSparse: + query = buildSparseHashQuery(c, schemaName, tableName, tableColumns, c.SparseMod) + case TestModeRowCount: + query = buildRowCountQuery(schemaName, tableName) + } - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") + testLogger.Debugf("Generated query: %s", query) - continue - } + testOutput, err := runTestOnTable(ctx, conn, query) + if err != nil { + testLogger.WithError(err).Error("Failed to compute hash") - schemaHashes[tableName][testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + continue } + + tableResults[testMode] = testOutput + testLogger.Infof("Hash computed: %s", testOutput) } - return schemaHashes + return tableResults } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 5cb57bbc26dfb4a649f257c81b99b255bcbda378 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:30:28 -0700 Subject: [PATCH 04/11] refactor: Add results per-database --- verify.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/verify.go b/verify.go index 187652d2..7843f985 100644 --- a/verify.go +++ b/verify.go @@ -97,9 +97,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - schemaTableHashes = c.runTestQueriesOnDatabase(ctx, logger, conn, schemaTableHashes) - - finalResults.AddResult(targetName, schemaTableHashes) + c.runTestQueriesOnDatabase(ctx, logger, conn, targetName, schemaTableHashes, finalResults) logger.Info("Table hashes computed") close(done) } @@ -152,7 +150,7 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, databaseHashes DatabaseResult) DatabaseResult { +func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { databaseResults := make(DatabaseResult) for schemaName, schemaHashes := range databaseHashes { @@ -160,7 +158,7 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent databaseResults[schemaName] = schemaResult } - return databaseResults + finalResults.AddResult(targetName, databaseResults) } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { From 5627e6bb2538b18f56fb19c82a20c90c5c4ea807 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:32:41 -0700 Subject: [PATCH 05/11] refactor: Add results per-schema --- verify.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/verify.go b/verify.go index 7843f985..711ce069 100644 --- a/verify.go +++ b/verify.go @@ -151,17 +151,12 @@ func (c Config) validColumnTarget(columnName string) bool { } func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { - databaseResults := make(DatabaseResult) - for schemaName, schemaHashes := range databaseHashes { - schemaResult := c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, schemaHashes) - databaseResults[schemaName] = schemaResult + c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, targetName, schemaHashes, finalResults) } - - finalResults.AddResult(targetName, databaseResults) } -func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName string, schemaHashes SchemaResult) SchemaResult { +func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { schemaResults := make(SchemaResult) for tableName := range schemaHashes { @@ -171,7 +166,9 @@ func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry } } - return schemaResults + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = schemaResults + finalResults.AddResult(targetName, databaseResults) } func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { From b72d24e5293a370abcfd9491ea6d18658675e7ea Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:35:29 -0700 Subject: [PATCH 06/11] refactor: Add results per-table --- verify.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/verify.go b/verify.go index 711ce069..ba094606 100644 --- a/verify.go +++ b/verify.go @@ -157,21 +157,12 @@ func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Ent } func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { - schemaResults := make(SchemaResult) - for tableName := range schemaHashes { - tableResult := c.runTestQueriesOnTable(ctx, logger, conn, schemaName, tableName) - if tableResult != nil { - schemaResults[tableName] = tableResult - } + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) } - - databaseResults := make(DatabaseResult) - databaseResults[schemaName] = schemaResults - finalResults.AddResult(targetName, databaseResults) } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, schemaName, tableName string) TableResult { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { tableResults := make(TableResult) tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) @@ -181,7 +172,7 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, if err != nil { tableLogger.WithError(err).Error("Failed to query column names, data types") - return nil + return } allTableColumns := make(map[string]column) @@ -222,7 +213,7 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, if len(primaryKeyColumnNames) == 0 { tableLogger.Error("No primary keys found") - return nil + return } tableLogger.WithFields(logrus.Fields{ @@ -259,7 +250,10 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Infof("Hash computed: %s", testOutput) } - return tableResults + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = make(SchemaResult) + databaseResults[schemaName][tableName] = tableResults + finalResults.AddResult(targetName, databaseResults) } func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { From 49a971d9bab0e147f4ede1a47f57574f1d133feb Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:42:42 -0700 Subject: [PATCH 07/11] refactor: Add results per-test --- verify.go | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/verify.go b/verify.go index ba094606..29596caa 100644 --- a/verify.go +++ b/verify.go @@ -163,8 +163,6 @@ func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry } func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { - tableResults := make(TableResult) - tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -239,35 +237,34 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) - testOutput, err := runTestOnTable(ctx, conn, query) - if err != nil { - testLogger.WithError(err).Error("Failed to compute hash") - - continue - } - - tableResults[testMode] = testOutput - testLogger.Infof("Hash computed: %s", testOutput) + runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults) } - - databaseResults := make(DatabaseResult) - databaseResults[schemaName] = make(SchemaResult) - databaseResults[schemaName][tableName] = tableResults - finalResults.AddResult(targetName, databaseResults) } -func runTestOnTable(ctx context.Context, conn *pgx.Conn, query string) (string, error) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results) { row := conn.QueryRow(ctx, query) + var testOutputString string + var testOutput pgtype.Text if err := row.Scan(&testOutput); err != nil { switch err { case pgx.ErrNoRows: - return "no rows", nil + testOutputString = "no rows" default: - return "", errors.Wrap(err, "failed to scan test output") + logger.WithError(err).Error("failed to scan test output") + + return } + } else { + testOutputString = testOutput.String } - return testOutput.String, nil + logger.Infof("Hash computed: %s", testOutputString) + + databaseResults := make(DatabaseResult) + databaseResults[schemaName] = make(SchemaResult) + databaseResults[schemaName][tableName] = make(TableResult) + databaseResults[schemaName][tableName][testMode] = testOutputString + finalResults.AddResult(targetName, databaseResults) } From e88922255ff27447b6eb9ca23f1db66ab5d7ecb4 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:44:24 -0700 Subject: [PATCH 08/11] refactor: Condense per database/schema/table logic --- verify.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/verify.go b/verify.go index 29596caa..1e4e155c 100644 --- a/verify.go +++ b/verify.go @@ -97,7 +97,12 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p return } - c.runTestQueriesOnDatabase(ctx, logger, conn, targetName, schemaTableHashes, finalResults) + for schemaName, schemaHashes := range schemaTableHashes { + for tableName := range schemaHashes { + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) + } + } + logger.Info("Table hashes computed") close(done) } @@ -150,18 +155,6 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnDatabase(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName string, databaseHashes DatabaseResult, finalResults *Results) { - for schemaName, schemaHashes := range databaseHashes { - c.runTestQueriesOnSchema(ctx, logger, conn, schemaName, targetName, schemaHashes, finalResults) - } -} - -func (c Config) runTestQueriesOnSchema(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName string, schemaHashes SchemaResult, finalResults *Results) { - for tableName := range schemaHashes { - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) - } -} - func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") From 3a4062925cad249bf7069a70088dd480bde0b559 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:49:04 -0700 Subject: [PATCH 09/11] fix: Cleaner parallel processing with sync.WaitGroup --- verify.go | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/verify.go b/verify.go index 1e4e155c..ec7af6b5 100644 --- a/verify.go +++ b/verify.go @@ -2,6 +2,7 @@ package pgverify import ( "context" + "sync" "github.com/jackc/pgx/pgtype" "github.com/jackc/pgx/v4" @@ -63,18 +64,17 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results finalResults = NewResults(targetNames, c.TestModes) // Then query each target database in parallel to generate table hashes. - var doneChannels []chan struct{} + wg := &sync.WaitGroup{} for i, conn := range conns { - done := make(chan struct{}) - go c.runTestsOnTarget(ctx, targetNames[i], conn, finalResults, done) - doneChannels = append(doneChannels, done) - } + wg.Add(1) - for _, done := range doneChannels { - <-done + go c.runTestsOnTarget(ctx, targetNames[i], conn, finalResults, wg) } + // Wait for queries to complete + wg.Wait() + // Compare final results reportErrors := finalResults.CheckForErrors() if len(reportErrors) > 0 { @@ -86,25 +86,27 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results return finalResults, nil } -func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, done chan struct{}) { +func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + logger := c.Logger.WithField("target", targetName) schemaTableHashes, err := c.fetchTargetTableNames(ctx, conn) if err != nil { logger.WithError(err).Error("failed to fetch target tables") - close(done) return } for schemaName, schemaHashes := range schemaTableHashes { for tableName := range schemaHashes { - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults) + wg.Add(1) + + c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) } } logger.Info("Table hashes computed") - close(done) } func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (DatabaseResult, error) { @@ -155,7 +157,9 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results) { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) tableLogger.Info("Computing hash") @@ -230,11 +234,14 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) - runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults) + wg.Add(1) + runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) } } -func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { + defer wg.Done() + row := conn.QueryRow(ctx, query) var testOutputString string From bbe231899153d476c43cdaa7d3b1379ac00dd2b5 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 14:50:17 -0700 Subject: [PATCH 10/11] feat: Execute all queries in parallel --- verify.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/verify.go b/verify.go index ec7af6b5..fd87107b 100644 --- a/verify.go +++ b/verify.go @@ -102,7 +102,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p for tableName := range schemaHashes { wg.Add(1) - c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) + go c.runTestQueriesOnTable(ctx, logger, conn, targetName, schemaName, tableName, finalResults, wg) } } @@ -235,7 +235,8 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, testLogger.Debugf("Generated query: %s", query) wg.Add(1) - runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) + + go runTestOnTable(ctx, testLogger, conn, targetName, schemaName, tableName, testMode, query, finalResults, wg) } } From d464143f745ed978747b6d0c99fe52d844719c83 Mon Sep 17 00:00:00 2001 From: Connor Finnell Date: Tue, 5 Dec 2023 15:03:31 -0700 Subject: [PATCH 11/11] fix: Use DB connecttion pool to allow concurrent queries --- cmd/pgverify/cmd.go | 6 +- go.mod | 1 + go.sum | 1 + integration_test.go | 16 +- .../jackc/pgx/v4/pgxpool/batch_results.go | 60 ++ .../github.com/jackc/pgx/v4/pgxpool/conn.go | 109 ++++ vendor/github.com/jackc/pgx/v4/pgxpool/doc.go | 25 + .../github.com/jackc/pgx/v4/pgxpool/pool.go | 607 ++++++++++++++++++ .../github.com/jackc/pgx/v4/pgxpool/rows.go | 105 +++ .../github.com/jackc/pgx/v4/pgxpool/stat.go | 64 ++ vendor/github.com/jackc/pgx/v4/pgxpool/tx.go | 90 +++ vendor/github.com/jackc/puddle/.travis.yml | 16 + vendor/github.com/jackc/puddle/CHANGELOG.md | 36 ++ vendor/github.com/jackc/puddle/LICENSE | 22 + vendor/github.com/jackc/puddle/README.md | 54 ++ vendor/github.com/jackc/puddle/doc.go | 11 + .../github.com/jackc/puddle/nanotime_time.go | 13 + .../jackc/puddle/nanotime_unsafe.go | 12 + vendor/github.com/jackc/puddle/pool.go | 532 +++++++++++++++ vendor/modules.txt | 4 + verify.go | 33 +- 21 files changed, 1790 insertions(+), 27 deletions(-) create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/batch_results.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/conn.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/doc.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/pool.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/rows.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/stat.go create mode 100644 vendor/github.com/jackc/pgx/v4/pgxpool/tx.go create mode 100644 vendor/github.com/jackc/puddle/.travis.yml create mode 100644 vendor/github.com/jackc/puddle/CHANGELOG.md create mode 100644 vendor/github.com/jackc/puddle/LICENSE create mode 100644 vendor/github.com/jackc/puddle/README.md create mode 100644 vendor/github.com/jackc/puddle/doc.go create mode 100644 vendor/github.com/jackc/puddle/nanotime_time.go create mode 100644 vendor/github.com/jackc/puddle/nanotime_unsafe.go create mode 100644 vendor/github.com/jackc/puddle/pool.go diff --git a/cmd/pgverify/cmd.go b/cmd/pgverify/cmd.go index fa57d269..2d47a478 100644 --- a/cmd/pgverify/cmd.go +++ b/cmd/pgverify/cmd.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -49,9 +49,9 @@ var rootCmd = &cobra.Command{ Long: `Verify data consistency between PostgreSQL syntax compatible databases.`, Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - var targets []*pgx.ConnConfig + var targets []*pgxpool.Config for _, target := range args { - connConfig, err := pgx.ParseConfig(target) + connConfig, err := pgxpool.ParseConfig(target) if err != nil { return fmt.Errorf("invalid target URI %s: %w", target, err) } diff --git a/go.mod b/go.mod index 6d2a94f6..a0cc3fe4 100644 --- a/go.mod +++ b/go.mod @@ -107,6 +107,7 @@ require ( github.com/jackc/pgproto3/v2 v2.2.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.10.0 // indirect + github.com/jackc/puddle v1.2.1 // indirect github.com/jgautheron/goconst v1.6.0 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect diff --git a/go.sum b/go.sum index 92462365..172b91e0 100644 --- a/go.sum +++ b/go.sum @@ -643,6 +643,7 @@ github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw= github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jgautheron/goconst v1.6.0 h1:gbMLWKRMkzAc6kYsQL6/TxaoBUg3Jm9LSF/Ih1ADWGA= github.com/jgautheron/goconst v1.6.0/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= diff --git a/integration_test.go b/integration_test.go index 445fb723..c68b53e9 100644 --- a/integration_test.go +++ b/integration_test.go @@ -15,7 +15,7 @@ import ( "github.com/cjfinnell/pgverify" "github.com/google/uuid" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,17 +27,17 @@ var ( dbName = "test" ) -func waitForDBReady(t *testing.T, ctx context.Context, config *pgx.ConnConfig) bool { +func waitForDBReady(t *testing.T, ctx context.Context, config *pgxpool.Config) bool { t.Helper() connected := false for count := 0; count < 30; count++ { - conn, err := pgx.ConnectConfig(ctx, config) + conn, err := pgxpool.ConnectConfig(ctx, config) if err == nil { connected = true - conn.Close(ctx) + conn.Close() break } @@ -257,7 +257,7 @@ func TestVerifyData(t *testing.T) { } // Act - var targets []*pgx.ConnConfig + var targets []*pgxpool.Config var aliases []string @@ -266,13 +266,13 @@ func TestVerifyData(t *testing.T) { // Create db and connect _, port, err := createContainer(t, ctx, db.image, db.port, db.env, db.cmd) require.NoError(t, err) - config, err := pgx.ParseConfig(fmt.Sprintf("postgresql://%s@127.0.0.1:%d%s", db.userPassword, port, db.db)) + config, err := pgxpool.ParseConfig(fmt.Sprintf("postgresql://%s@127.0.0.1:%d%s", db.userPassword, port, db.db)) require.NoError(t, err) assert.True(t, waitForDBReady(t, ctx, config)) - conn, err := pgx.ConnectConfig(ctx, config) + conn, err := pgxpool.ConnectConfig(ctx, config) require.NoError(t, err) - defer conn.Close(ctx) + defer conn.Close() // Create and populate tables for _, tableName := range tableNames { diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/batch_results.go b/vendor/github.com/jackc/pgx/v4/pgxpool/batch_results.go new file mode 100644 index 00000000..c625a474 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/batch_results.go @@ -0,0 +1,60 @@ +package pgxpool + +import ( + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" +) + +type errBatchResults struct { + err error +} + +func (br errBatchResults) Exec() (pgconn.CommandTag, error) { + return nil, br.err +} + +func (br errBatchResults) Query() (pgx.Rows, error) { + return errRows{err: br.err}, br.err +} + +func (br errBatchResults) QueryFunc(scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) { + return nil, br.err +} + +func (br errBatchResults) QueryRow() pgx.Row { + return errRow{err: br.err} +} + +func (br errBatchResults) Close() error { + return br.err +} + +type poolBatchResults struct { + br pgx.BatchResults + c *Conn +} + +func (br *poolBatchResults) Exec() (pgconn.CommandTag, error) { + return br.br.Exec() +} + +func (br *poolBatchResults) Query() (pgx.Rows, error) { + return br.br.Query() +} + +func (br *poolBatchResults) QueryFunc(scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) { + return br.br.QueryFunc(scans, f) +} + +func (br *poolBatchResults) QueryRow() pgx.Row { + return br.br.QueryRow() +} + +func (br *poolBatchResults) Close() error { + err := br.br.Close() + if br.c != nil { + br.c.Release() + br.c = nil + } + return err +} diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/conn.go b/vendor/github.com/jackc/pgx/v4/pgxpool/conn.go new file mode 100644 index 00000000..0b59d741 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/conn.go @@ -0,0 +1,109 @@ +package pgxpool + +import ( + "context" + "time" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/puddle" +) + +// Conn is an acquired *pgx.Conn from a Pool. +type Conn struct { + res *puddle.Resource + p *Pool +} + +// Release returns c to the pool it was acquired from. Once Release has been called, other methods must not be called. +// However, it is safe to call Release multiple times. Subsequent calls after the first will be ignored. +func (c *Conn) Release() { + if c.res == nil { + return + } + + conn := c.Conn() + res := c.res + c.res = nil + + now := time.Now() + if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) { + res.Destroy() + return + } + + if c.p.afterRelease == nil { + res.Release() + return + } + + go func() { + if c.p.afterRelease(conn) { + res.Release() + } else { + res.Destroy() + } + }() +} + +func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { + return c.Conn().Exec(ctx, sql, arguments...) +} + +func (c *Conn) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { + return c.Conn().Query(ctx, sql, args...) +} + +func (c *Conn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { + return c.Conn().QueryRow(ctx, sql, args...) +} + +func (c *Conn) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) { + return c.Conn().QueryFunc(ctx, sql, args, scans, f) +} + +func (c *Conn) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults { + return c.Conn().SendBatch(ctx, b) +} + +func (c *Conn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc) +} + +// Begin starts a transaction block from the *Conn without explicitly setting a transaction mode (see BeginTx with TxOptions if transaction mode is required). +func (c *Conn) Begin(ctx context.Context) (pgx.Tx, error) { + return c.Conn().Begin(ctx) +} + +// BeginTx starts a transaction block from the *Conn with txOptions determining the transaction mode. +func (c *Conn) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) { + return c.Conn().BeginTx(ctx, txOptions) +} + +func (c *Conn) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error { + return c.Conn().BeginFunc(ctx, f) +} + +func (c *Conn) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, f func(pgx.Tx) error) error { + return c.Conn().BeginTxFunc(ctx, txOptions, f) +} + +func (c *Conn) Ping(ctx context.Context) error { + return c.Conn().Ping(ctx) +} + +func (c *Conn) Conn() *pgx.Conn { + return c.connResource().conn +} + +func (c *Conn) connResource() *connResource { + return c.res.Value().(*connResource) +} + +func (c *Conn) getPoolRow(r pgx.Row) *poolRow { + return c.connResource().getPoolRow(c, r) +} + +func (c *Conn) getPoolRows(r pgx.Rows) *poolRows { + return c.connResource().getPoolRows(c, r) +} diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/doc.go b/vendor/github.com/jackc/pgx/v4/pgxpool/doc.go new file mode 100644 index 00000000..e8239a6f --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/doc.go @@ -0,0 +1,25 @@ +// Package pgxpool is a concurrency-safe connection pool for pgx. +/* +pgxpool implements a nearly identical interface to pgx connections. + +Establishing a Connection + +The primary way of establishing a connection is with `pgxpool.Connect`. + + pool, err := pgxpool.Connect(context.Background(), os.Getenv("DATABASE_URL")) + +The database connection string can be in URL or DSN format. PostgreSQL settings, pgx settings, and pool settings can be +specified here. In addition, a config struct can be created by `ParseConfig` and modified before establishing the +connection with `ConnectConfig`. + + config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL")) + if err != nil { + // ... + } + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + // do something with every new connection + } + + pool, err := pgxpool.ConnectConfig(context.Background(), config) +*/ +package pgxpool diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/pool.go b/vendor/github.com/jackc/pgx/v4/pgxpool/pool.go new file mode 100644 index 00000000..f287ad88 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/pool.go @@ -0,0 +1,607 @@ +package pgxpool + +import ( + "context" + "fmt" + "runtime" + "strconv" + "sync" + "time" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/puddle" +) + +var defaultMaxConns = int32(4) +var defaultMinConns = int32(0) +var defaultMaxConnLifetime = time.Hour +var defaultMaxConnIdleTime = time.Minute * 30 +var defaultHealthCheckPeriod = time.Minute + +type connResource struct { + conn *pgx.Conn + conns []Conn + poolRows []poolRow + poolRowss []poolRows +} + +func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn { + if len(cr.conns) == 0 { + cr.conns = make([]Conn, 128) + } + + c := &cr.conns[len(cr.conns)-1] + cr.conns = cr.conns[0 : len(cr.conns)-1] + + c.res = res + c.p = p + + return c +} + +func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow { + if len(cr.poolRows) == 0 { + cr.poolRows = make([]poolRow, 128) + } + + pr := &cr.poolRows[len(cr.poolRows)-1] + cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1] + + pr.c = c + pr.r = r + + return pr +} + +func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows { + if len(cr.poolRowss) == 0 { + cr.poolRowss = make([]poolRows, 128) + } + + pr := &cr.poolRowss[len(cr.poolRowss)-1] + cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1] + + pr.c = c + pr.r = r + + return pr +} + +// Pool allows for connection reuse. +type Pool struct { + p *puddle.Pool + config *Config + beforeConnect func(context.Context, *pgx.ConnConfig) error + afterConnect func(context.Context, *pgx.Conn) error + beforeAcquire func(context.Context, *pgx.Conn) bool + afterRelease func(*pgx.Conn) bool + minConns int32 + maxConnLifetime time.Duration + maxConnIdleTime time.Duration + healthCheckPeriod time.Duration + + closeOnce sync.Once + closeChan chan struct{} +} + +// Config is the configuration struct for creating a pool. It must be created by ParseConfig and then it can be +// modified. A manually initialized ConnConfig will cause ConnectConfig to panic. +type Config struct { + ConnConfig *pgx.ConnConfig + + // BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and + // will not impact any existing open connections. + BeforeConnect func(context.Context, *pgx.ConnConfig) error + + // AfterConnect is called after a connection is established, but before it is added to the pool. + AfterConnect func(context.Context, *pgx.Conn) error + + // BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the + // acquision or false to indicate that the connection should be destroyed and a different connection should be + // acquired. + BeforeAcquire func(context.Context, *pgx.Conn) bool + + // AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to + // return the connection to the pool or false to destroy the connection. + AfterRelease func(*pgx.Conn) bool + + // MaxConnLifetime is the duration since creation after which a connection will be automatically closed. + MaxConnLifetime time.Duration + + // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check. + MaxConnIdleTime time.Duration + + // MaxConns is the maximum size of the pool. + MaxConns int32 + + // MinConns is the minimum size of the pool. The health check will increase the number of connections to this + // amount if it had dropped below. + MinConns int32 + + // HealthCheckPeriod is the duration between checks of the health of idle connections. + HealthCheckPeriod time.Duration + + // If set to true, pool doesn't do any I/O operation on initialization. + // And connects to the server only when the pool starts to be used. + // The default is false. + LazyConnect bool + + createdByParseConfig bool // Used to enforce created by ParseConfig rule. +} + +// Copy returns a deep copy of the config that is safe to use and modify. +// The only exception is the tls.Config: +// according to the tls.Config docs it must not be modified after creation. +func (c *Config) Copy() *Config { + newConfig := new(Config) + *newConfig = *c + newConfig.ConnConfig = c.ConnConfig.Copy() + return newConfig +} + +// ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config. +func (c *Config) ConnString() string { return c.ConnConfig.ConnString() } + +// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial +// connection. See ParseConfig for information on connString format. +func Connect(ctx context.Context, connString string) (*Pool, error) { + config, err := ParseConfig(connString) + if err != nil { + return nil, err + } + + return ConnectConfig(ctx, config) +} + +// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial +// connection. config must have been created by ParseConfig. +func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { + // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from + // zero values. + if !config.createdByParseConfig { + panic("config must be created by ParseConfig") + } + + p := &Pool{ + config: config, + beforeConnect: config.BeforeConnect, + afterConnect: config.AfterConnect, + beforeAcquire: config.BeforeAcquire, + afterRelease: config.AfterRelease, + minConns: config.MinConns, + maxConnLifetime: config.MaxConnLifetime, + maxConnIdleTime: config.MaxConnIdleTime, + healthCheckPeriod: config.HealthCheckPeriod, + closeChan: make(chan struct{}), + } + + p.p = puddle.NewPool( + func(ctx context.Context) (interface{}, error) { + connConfig := p.config.ConnConfig + + if p.beforeConnect != nil { + connConfig = p.config.ConnConfig.Copy() + if err := p.beforeConnect(ctx, connConfig); err != nil { + return nil, err + } + } + + conn, err := pgx.ConnectConfig(ctx, connConfig) + if err != nil { + return nil, err + } + + if p.afterConnect != nil { + err = p.afterConnect(ctx, conn) + if err != nil { + conn.Close(ctx) + return nil, err + } + } + + cr := &connResource{ + conn: conn, + conns: make([]Conn, 64), + poolRows: make([]poolRow, 64), + poolRowss: make([]poolRows, 64), + } + + return cr, nil + }, + func(value interface{}) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + conn := value.(*connResource).conn + conn.Close(ctx) + select { + case <-conn.PgConn().CleanupDone(): + case <-ctx.Done(): + } + cancel() + }, + config.MaxConns, + ) + + if !config.LazyConnect { + if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { + // Couldn't create resources for minpool size. Close unhealthy pool. + p.Close() + return nil, err + } + + // Initially establish one connection + res, err := p.p.Acquire(ctx) + if err != nil { + p.Close() + return nil, err + } + res.Release() + } + + go p.backgroundHealthCheck() + + return p, nil +} + +// ParseConfig builds a Config from connString. It parses connString with the same behavior as pgx.ParseConfig with the +// addition of the following variables: +// +// pool_max_conns: integer greater than 0 +// pool_min_conns: integer 0 or greater +// pool_max_conn_lifetime: duration string +// pool_max_conn_idle_time: duration string +// pool_health_check_period: duration string +// +// See Config for definitions of these arguments. +// +// # Example DSN +// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10 +// +// # Example URL +// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10 +func ParseConfig(connString string) (*Config, error) { + connConfig, err := pgx.ParseConfig(connString) + if err != nil { + return nil, err + } + + config := &Config{ + ConnConfig: connConfig, + createdByParseConfig: true, + } + + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_max_conns") + n, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err) + } + if n < 1 { + return nil, fmt.Errorf("pool_max_conns too small: %d", n) + } + config.MaxConns = int32(n) + } else { + config.MaxConns = defaultMaxConns + if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns { + config.MaxConns = numCPU + } + } + + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_min_conns") + n, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err) + } + config.MinConns = int32(n) + } else { + config.MinConns = defaultMinConns + } + + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime") + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err) + } + config.MaxConnLifetime = d + } else { + config.MaxConnLifetime = defaultMaxConnLifetime + } + + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time") + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err) + } + config.MaxConnIdleTime = d + } else { + config.MaxConnIdleTime = defaultMaxConnIdleTime + } + + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_health_check_period") + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("invalid pool_health_check_period: %w", err) + } + config.HealthCheckPeriod = d + } else { + config.HealthCheckPeriod = defaultHealthCheckPeriod + } + + return config, nil +} + +// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned +// to pool and closed. +func (p *Pool) Close() { + p.closeOnce.Do(func() { + close(p.closeChan) + p.p.Close() + }) +} + +func (p *Pool) backgroundHealthCheck() { + ticker := time.NewTicker(p.healthCheckPeriod) + + for { + select { + case <-p.closeChan: + ticker.Stop() + return + case <-ticker.C: + p.checkIdleConnsHealth() + p.checkMinConns() + } + } +} + +func (p *Pool) checkIdleConnsHealth() { + resources := p.p.AcquireAllIdle() + + now := time.Now() + for _, res := range resources { + if now.Sub(res.CreationTime()) > p.maxConnLifetime { + res.Destroy() + } else if res.IdleDuration() > p.maxConnIdleTime { + res.Destroy() + } else { + res.ReleaseUnused() + } + } +} + +func (p *Pool) checkMinConns() { + for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + p.p.CreateResource(ctx) + }() + } +} + +func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + errs := make(chan error, targetResources) + + for i := 0; i < targetResources; i++ { + go func() { + err := p.p.CreateResource(ctx) + errs <- err + }() + } + + var firstError error + for i := 0; i < targetResources; i++ { + err := <-errs + if err != nil && firstError == nil { + cancel() + firstError = err + } + } + + return firstError +} + +// Acquire returns a connection (*Conn) from the Pool +func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { + for { + res, err := p.p.Acquire(ctx) + if err != nil { + return nil, err + } + + cr := res.Value().(*connResource) + if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) { + return cr.getConn(p, res), nil + } + + res.Destroy() + } +} + +// AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the +// call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is +// automatically released after the call of f. +func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error { + conn, err := p.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + return f(conn) +} + +// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and +// keep-alive functionality. It does not update pool statistics. +func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn { + resources := p.p.AcquireAllIdle() + conns := make([]*Conn, 0, len(resources)) + for _, res := range resources { + cr := res.Value().(*connResource) + if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) { + conns = append(conns, cr.getConn(p, res)) + } else { + res.Destroy() + } + } + + return conns +} + +// Config returns a copy of config that was used to initialize this pool. +func (p *Pool) Config() *Config { return p.config.Copy() } + +// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics. +func (p *Pool) Stat() *Stat { + return &Stat{s: p.p.Stat()} +} + +// Exec acquires a connection from the Pool and executes the given SQL. +// SQL can be either a prepared statement name or an SQL string. +// Arguments should be referenced positionally from the SQL string as $1, $2, etc. +// The acquired connection is returned to the pool when the Exec function returns. +func (p *Pool) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { + c, err := p.Acquire(ctx) + if err != nil { + return nil, err + } + defer c.Release() + + return c.Exec(ctx, sql, arguments...) +} + +// Query acquires a connection and executes a query that returns pgx.Rows. +// Arguments should be referenced positionally from the SQL string as $1, $2, etc. +// See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool. +// +// If there is an error, the returned pgx.Rows will be returned in an error state. +// If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows. +// +// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and +// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely +// needed. See the documentation for those types for details. +func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { + c, err := p.Acquire(ctx) + if err != nil { + return errRows{err: err}, err + } + + rows, err := c.Query(ctx, sql, args...) + if err != nil { + c.Release() + return errRows{err: err}, err + } + + return c.getPoolRows(rows), nil +} + +// QueryRow acquires a connection and executes a query that is expected +// to return at most one row (pgx.Row). Errors are deferred until pgx.Row's +// Scan method is called. If the query selects no rows, pgx.Row's Scan will +// return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row +// and discards the rest. The acquired connection is returned to the Pool when +// pgx.Row's Scan method is called. +// +// Arguments should be referenced positionally from the SQL string as $1, $2, etc. +// +// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and +// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely +// needed. See the documentation for those types for details. +func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { + c, err := p.Acquire(ctx) + if err != nil { + return errRow{err: err} + } + + row := c.QueryRow(ctx, sql, args...) + return c.getPoolRow(row) +} + +func (p *Pool) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) { + c, err := p.Acquire(ctx) + if err != nil { + return nil, err + } + defer c.Release() + + return c.QueryFunc(ctx, sql, args, scans, f) +} + +func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults { + c, err := p.Acquire(ctx) + if err != nil { + return errBatchResults{err: err} + } + + br := c.SendBatch(ctx, b) + return &poolBatchResults{br: br, c: c} +} + +// Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no +// auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required). +// *pgxpool.Tx is returned, which implements the pgx.Tx interface. +// Commit or Rollback must be called on the returned transaction to finalize the transaction block. +func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) { + return p.BeginTx(ctx, pgx.TxOptions{}) +} + +// BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode. +// Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation. +// *pgxpool.Tx is returned, which implements the pgx.Tx interface. +// Commit or Rollback must be called on the returned transaction to finalize the transaction block. +func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) { + c, err := p.Acquire(ctx) + if err != nil { + return nil, err + } + + t, err := c.BeginTx(ctx, txOptions) + if err != nil { + c.Release() + return nil, err + } + + return &Tx{t: t, c: c}, err +} + +func (p *Pool) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error { + return p.BeginTxFunc(ctx, pgx.TxOptions{}, f) +} + +func (p *Pool) BeginTxFunc(ctx context.Context, txOptions pgx.TxOptions, f func(pgx.Tx) error) error { + c, err := p.Acquire(ctx) + if err != nil { + return err + } + defer c.Release() + + return c.BeginTxFunc(ctx, txOptions, f) +} + +func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + c, err := p.Acquire(ctx) + if err != nil { + return 0, err + } + defer c.Release() + + return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc) +} + +// Ping acquires a connection from the Pool and executes an empty sql statement against it. +// If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned. +func (p *Pool) Ping(ctx context.Context) error { + c, err := p.Acquire(ctx) + if err != nil { + return err + } + defer c.Release() + return c.Ping(ctx) +} diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/rows.go b/vendor/github.com/jackc/pgx/v4/pgxpool/rows.go new file mode 100644 index 00000000..6dc0cc34 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/rows.go @@ -0,0 +1,105 @@ +package pgxpool + +import ( + "github.com/jackc/pgconn" + "github.com/jackc/pgproto3/v2" + "github.com/jackc/pgx/v4" +) + +type errRows struct { + err error +} + +func (errRows) Close() {} +func (e errRows) Err() error { return e.err } +func (errRows) CommandTag() pgconn.CommandTag { return nil } +func (errRows) FieldDescriptions() []pgproto3.FieldDescription { return nil } +func (errRows) Next() bool { return false } +func (e errRows) Scan(dest ...interface{}) error { return e.err } +func (e errRows) Values() ([]interface{}, error) { return nil, e.err } +func (e errRows) RawValues() [][]byte { return nil } + +type errRow struct { + err error +} + +func (e errRow) Scan(dest ...interface{}) error { return e.err } + +type poolRows struct { + r pgx.Rows + c *Conn + err error +} + +func (rows *poolRows) Close() { + rows.r.Close() + if rows.c != nil { + rows.c.Release() + rows.c = nil + } +} + +func (rows *poolRows) Err() error { + if rows.err != nil { + return rows.err + } + return rows.r.Err() +} + +func (rows *poolRows) CommandTag() pgconn.CommandTag { + return rows.r.CommandTag() +} + +func (rows *poolRows) FieldDescriptions() []pgproto3.FieldDescription { + return rows.r.FieldDescriptions() +} + +func (rows *poolRows) Next() bool { + if rows.err != nil { + return false + } + + n := rows.r.Next() + if !n { + rows.Close() + } + return n +} + +func (rows *poolRows) Scan(dest ...interface{}) error { + err := rows.r.Scan(dest...) + if err != nil { + rows.Close() + } + return err +} + +func (rows *poolRows) Values() ([]interface{}, error) { + values, err := rows.r.Values() + if err != nil { + rows.Close() + } + return values, err +} + +func (rows *poolRows) RawValues() [][]byte { + return rows.r.RawValues() +} + +type poolRow struct { + r pgx.Row + c *Conn + err error +} + +func (row *poolRow) Scan(dest ...interface{}) error { + if row.err != nil { + return row.err + } + + err := row.r.Scan(dest...) + if row.c != nil { + row.c.Release() + } + return err +} diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/stat.go b/vendor/github.com/jackc/pgx/v4/pgxpool/stat.go new file mode 100644 index 00000000..336be42d --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/stat.go @@ -0,0 +1,64 @@ +package pgxpool + +import ( + "time" + + "github.com/jackc/puddle" +) + +// Stat is a snapshot of Pool statistics. +type Stat struct { + s *puddle.Stat +} + +// AcquireCount returns the cumulative count of successful acquires from the pool. +func (s *Stat) AcquireCount() int64 { + return s.s.AcquireCount() +} + +// AcquireDuration returns the total duration of all successful acquires from +// the pool. +func (s *Stat) AcquireDuration() time.Duration { + return s.s.AcquireDuration() +} + +// AcquiredConns returns the number of currently acquired connections in the pool. +func (s *Stat) AcquiredConns() int32 { + return s.s.AcquiredResources() +} + +// CanceledAcquireCount returns the cumulative count of acquires from the pool +// that were canceled by a context. +func (s *Stat) CanceledAcquireCount() int64 { + return s.s.CanceledAcquireCount() +} + +// ConstructingConns returns the number of conns with construction in progress in +// the pool. +func (s *Stat) ConstructingConns() int32 { + return s.s.ConstructingResources() +} + +// EmptyAcquireCount returns the cumulative count of successful acquires from the pool +// that waited for a resource to be released or constructed because the pool was +// empty. +func (s *Stat) EmptyAcquireCount() int64 { + return s.s.EmptyAcquireCount() +} + +// IdleConns returns the number of currently idle conns in the pool. +func (s *Stat) IdleConns() int32 { + return s.s.IdleResources() +} + +// MaxConns returns the maximum size of the pool. +func (s *Stat) MaxConns() int32 { + return s.s.MaxResources() +} + +// TotalConns returns the total number of resources currently in the pool. +// The value is the sum of ConstructingConns, AcquiredConns, and +// IdleConns. +func (s *Stat) TotalConns() int32 { + return s.s.TotalResources() +} diff --git a/vendor/github.com/jackc/pgx/v4/pgxpool/tx.go b/vendor/github.com/jackc/pgx/v4/pgxpool/tx.go new file mode 100644 index 00000000..6f566e41 --- /dev/null +++ b/vendor/github.com/jackc/pgx/v4/pgxpool/tx.go @@ -0,0 +1,90 @@ +package pgxpool + +import ( + "context" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" +) + +// Tx represents a database transaction acquired from a Pool. +type Tx struct { + t pgx.Tx + c *Conn +} + +// Begin starts a pseudo nested transaction implemented with a savepoint. +func (tx *Tx) Begin(ctx context.Context) (pgx.Tx, error) { + return tx.t.Begin(ctx) +} + +func (tx *Tx) BeginFunc(ctx context.Context, f func(pgx.Tx) error) error { + return tx.t.BeginFunc(ctx, f) +} + +// Commit commits the transaction and returns the associated connection back to the Pool. Commit will return ErrTxClosed +// if the Tx is already closed, but is otherwise safe to call multiple times. If the commit fails with a rollback status +// (e.g. the transaction was already in a broken state) then ErrTxCommitRollback will be returned. +func (tx *Tx) Commit(ctx context.Context) error { + err := tx.t.Commit(ctx) + if tx.c != nil { + tx.c.Release() + tx.c = nil + } + return err +} + +// Rollback rolls back the transaction and returns the associated connection back to the Pool. Rollback will return ErrTxClosed +// if the Tx is already closed, but is otherwise safe to call multiple times. Hence, defer tx.Rollback() is safe even if +// tx.Commit() will be called first in a non-error condition. +func (tx *Tx) Rollback(ctx context.Context) error { + err := tx.t.Rollback(ctx) + if tx.c != nil { + tx.c.Release() + tx.c = nil + } + return err +} + +func (tx *Tx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + return tx.t.CopyFrom(ctx, tableName, columnNames, rowSrc) +} + +func (tx *Tx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults { + return tx.t.SendBatch(ctx, b) +} + +func (tx *Tx) LargeObjects() pgx.LargeObjects { + return tx.t.LargeObjects() +} + +// Prepare creates a prepared statement with name and sql. If the name is empty, +// an anonymous prepared statement will be used. sql can contain placeholders +// for bound parameters. These placeholders are referenced positionally as $1, $2, etc. +// +// Prepare is idempotent; i.e. it is safe to call Prepare multiple times with the same +// name and sql arguments. This allows a code path to Prepare and Query/Exec without +// needing to first check whether the statement has already been prepared. +func (tx *Tx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) { + return tx.t.Prepare(ctx, name, sql) +} + +func (tx *Tx) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { + return tx.t.Exec(ctx, sql, arguments...) +} + +func (tx *Tx) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) { + return tx.t.Query(ctx, sql, args...) +} + +func (tx *Tx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { + return tx.t.QueryRow(ctx, sql, args...) +} + +func (tx *Tx) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error) { + return tx.t.QueryFunc(ctx, sql, args, scans, f) +} + +func (tx *Tx) Conn() *pgx.Conn { + return tx.t.Conn() +} diff --git a/vendor/github.com/jackc/puddle/.travis.yml b/vendor/github.com/jackc/puddle/.travis.yml new file mode 100644 index 00000000..6335c4ab --- /dev/null +++ b/vendor/github.com/jackc/puddle/.travis.yml @@ -0,0 +1,16 @@ +language: go + +go: + - 1.x + - tip + +env: + global: + - STRESS_TEST_DURATION=15s + +script: + - go test -v -race + +matrix: + allow_failures: + - go: tip diff --git a/vendor/github.com/jackc/puddle/CHANGELOG.md b/vendor/github.com/jackc/puddle/CHANGELOG.md new file mode 100644 index 00000000..246f3dcc --- /dev/null +++ b/vendor/github.com/jackc/puddle/CHANGELOG.md @@ -0,0 +1,36 @@ +# 1.2.1 (December 2, 2021) + +* TryAcquire now does not block when background constructing resource + +# 1.2.0 (November 20, 2021) + +* Add TryAcquire (A. Jensen) +* Fix: remove memory leak / unintentionally pinned memory when shrinking slices (Alexander Staubo) +* Fix: Do not leave pool locked after panic from nil context + +# 1.1.4 (September 11, 2021) + +* Fix: Deadlock in CreateResource if pool was closed during resource acquisition (Dmitriy Matrenichev) + +# 1.1.3 (December 3, 2020) + +* Fix: Failed resource creation could cause concurrent Acquire to hang. (Evgeny Vanslov) + +# 1.1.2 (September 26, 2020) + +* Fix: Resource.Destroy no longer removes itself from the pool before its destructor has completed. +* Fix: Prevent crash when pool is closed while resource is being created. + +# 1.1.1 (April 2, 2020) + +* Pool.Close can be safely called multiple times +* AcquireAllIDle immediately returns nil if pool is closed +* CreateResource checks if pool is closed before taking any action +* Fix potential race condition when CreateResource and Close are called concurrently. CreateResource now checks if pool is closed before adding newly created resource to pool. + +# 1.1.0 (February 5, 2020) + +* Use runtime.nanotime for faster tracking of acquire time and last usage time. +* Track resource idle time to enable client health check logic. (Patrick Ellul) +* Add CreateResource to construct a new resource without acquiring it. (Patrick Ellul) +* Fix deadlock race when acquire is cancelled. (Michael Tharp) diff --git a/vendor/github.com/jackc/puddle/LICENSE b/vendor/github.com/jackc/puddle/LICENSE new file mode 100644 index 00000000..bcc286c5 --- /dev/null +++ b/vendor/github.com/jackc/puddle/LICENSE @@ -0,0 +1,22 @@ +Copyright (c) 2018 Jack Christensen + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/jackc/puddle/README.md b/vendor/github.com/jackc/puddle/README.md new file mode 100644 index 00000000..633faf19 --- /dev/null +++ b/vendor/github.com/jackc/puddle/README.md @@ -0,0 +1,54 @@ +[![](https://godoc.org/github.com/jackc/puddle?status.svg)](https://godoc.org/github.com/jackc/puddle) +[![Build Status](https://travis-ci.org/jackc/puddle.svg)](https://travis-ci.org/jackc/puddle) + +# Puddle + +Puddle is a tiny generic resource pool library for Go that uses the standard +context library to signal cancellation of acquires. It is designed to contain +the minimum functionality required for a resource pool. It can be used directly +or it can be used as the base for a domain specific resource pool. For example, +a database connection pool may use puddle internally and implement health checks +and keep-alive behavior without needing to implement any concurrent code of its +own. + +## Features + +* Acquire cancellation via context standard library +* Statistics API for monitoring pool pressure +* No dependencies outside of standard library +* High performance +* 100% test coverage + +## Example Usage + +```go +constructor := func(context.Context) (interface{}, error) { + return net.Dial("tcp", "127.0.0.1:8080") +} +destructor := func(value interface{}) { + value.(net.Conn).Close() +} +maxPoolSize := 10 + +pool := puddle.NewPool(constructor, destructor, maxPoolSize) + +// Acquire resource from the pool. +res, err := pool.Acquire(context.Background()) +if err != nil { + // ... +} + +// Use resource. +_, err = res.Value().(net.Conn).Write([]byte{1}) +if err != nil { + // ... +} + +// Release when done. +res.Release() + +``` + +## License + +MIT diff --git a/vendor/github.com/jackc/puddle/doc.go b/vendor/github.com/jackc/puddle/doc.go new file mode 100644 index 00000000..e27e3f6f --- /dev/null +++ b/vendor/github.com/jackc/puddle/doc.go @@ -0,0 +1,11 @@ +// Package puddle is a generic resource pool. +/* + +Puddle is a tiny generic resource pool library for Go that uses the standard +context library to signal cancellation of acquires. It is designed to contain +the minimum functionality a resource pool needs that cannot be implemented +without concurrency concerns. For example, a database connection pool may use +puddle internally and implement health checks and keep-alive behavior without +needing to implement any concurrent code of its own. +*/ +package puddle diff --git a/vendor/github.com/jackc/puddle/nanotime_time.go b/vendor/github.com/jackc/puddle/nanotime_time.go new file mode 100644 index 00000000..2bca251f --- /dev/null +++ b/vendor/github.com/jackc/puddle/nanotime_time.go @@ -0,0 +1,13 @@ +// +build purego appengine js + +// This file contains the safe implementation of nanotime using time.Now(). + +package puddle + +import ( + "time" +) + +func nanotime() int64 { + return time.Now().UnixNano() +} diff --git a/vendor/github.com/jackc/puddle/nanotime_unsafe.go b/vendor/github.com/jackc/puddle/nanotime_unsafe.go new file mode 100644 index 00000000..99d80ee4 --- /dev/null +++ b/vendor/github.com/jackc/puddle/nanotime_unsafe.go @@ -0,0 +1,12 @@ +// +build !purego,!appengine,!js + +// This file contains the implementation of nanotime using runtime.nanotime. + +package puddle + +import "unsafe" + +var _ = unsafe.Sizeof(0) + +//go:linkname nanotime runtime.nanotime +func nanotime() int64 diff --git a/vendor/github.com/jackc/puddle/pool.go b/vendor/github.com/jackc/puddle/pool.go new file mode 100644 index 00000000..1baf88ce --- /dev/null +++ b/vendor/github.com/jackc/puddle/pool.go @@ -0,0 +1,532 @@ +package puddle + +import ( + "context" + "errors" + "sync" + "time" +) + +const ( + resourceStatusConstructing = 0 + resourceStatusIdle = iota + resourceStatusAcquired = iota + resourceStatusHijacked = iota +) + +// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool +// or a pool that is closed while the acquire is waiting. +var ErrClosedPool = errors.New("closed pool") + +// ErrNotAvailable occurs on an attempt to acquire a resource from a pool +// that is at maximum capacity and has no available resources. +var ErrNotAvailable = errors.New("resource not available") + +// Constructor is a function called by the pool to construct a resource. +type Constructor func(ctx context.Context) (res interface{}, err error) + +// Destructor is a function called by the pool to destroy a resource. +type Destructor func(res interface{}) + +// Resource is the resource handle returned by acquiring from the pool. +type Resource struct { + value interface{} + pool *Pool + creationTime time.Time + lastUsedNano int64 + status byte +} + +// Value returns the resource value. +func (res *Resource) Value() interface{} { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.value +} + +// Release returns the resource to the pool. res must not be subsequently used. +func (res *Resource) Release() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, nanotime()) +} + +// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime +// will not change. res must not be subsequently used. +func (res *Resource) ReleaseUnused() { + if res.status != resourceStatusAcquired { + panic("tried to release resource that is not acquired") + } + res.pool.releaseAcquiredResource(res, res.lastUsedNano) +} + +// Destroy returns the resource to the pool for destruction. res must not be +// subsequently used. +func (res *Resource) Destroy() { + if res.status != resourceStatusAcquired { + panic("tried to destroy resource that is not acquired") + } + go res.pool.destroyAcquiredResource(res) +} + +// Hijack assumes ownership of the resource from the pool. Caller is responsible +// for cleanup of resource value. +func (res *Resource) Hijack() { + if res.status != resourceStatusAcquired { + panic("tried to hijack resource that is not acquired") + } + res.pool.hijackAcquiredResource(res) +} + +// CreationTime returns when the resource was created by the pool. +func (res *Resource) CreationTime() time.Time { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + return res.creationTime +} + +// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time +// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with +// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead. +func (res *Resource) LastUsedNanotime() int64 { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return res.lastUsedNano +} + +// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting +// LastUsedNanotime to the current nanotime. +func (res *Resource) IdleDuration() time.Duration { + if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { + panic("tried to access resource that is not acquired or hijacked") + } + + return time.Duration(nanotime() - res.lastUsedNano) +} + +// Pool is a concurrency-safe resource pool. +type Pool struct { + cond *sync.Cond + destructWG *sync.WaitGroup + + allResources []*Resource + idleResources []*Resource + + constructor Constructor + destructor Destructor + maxSize int32 + + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount int64 + + closed bool +} + +// NewPool creates a new pool. Panics if maxSize is less than 1. +func NewPool(constructor Constructor, destructor Destructor, maxSize int32) *Pool { + if maxSize < 1 { + panic("maxSize is less than 1") + } + + return &Pool{ + cond: sync.NewCond(new(sync.Mutex)), + destructWG: &sync.WaitGroup{}, + maxSize: maxSize, + constructor: constructor, + destructor: destructor, + } +} + +// Close destroys all resources in the pool and rejects future Acquire calls. +// Blocks until all resources are returned to pool and destroyed. +func (p *Pool) Close() { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return + } + p.closed = true + + for _, res := range p.idleResources { + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + } + p.idleResources = nil + p.cond.L.Unlock() + + // Wake up all go routines waiting for a resource to be returned so they can terminate. + p.cond.Broadcast() + + p.destructWG.Wait() +} + +// Stat is a snapshot of Pool statistics. +type Stat struct { + constructingResources int32 + acquiredResources int32 + idleResources int32 + maxResources int32 + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount int64 +} + +// TotalResource returns the total number of resources currently in the pool. +// The value is the sum of ConstructingResources, AcquiredResources, and +// IdleResources. +func (s *Stat) TotalResources() int32 { + return s.constructingResources + s.acquiredResources + s.idleResources +} + +// ConstructingResources returns the number of resources with construction in progress in +// the pool. +func (s *Stat) ConstructingResources() int32 { + return s.constructingResources +} + +// AcquiredResources returns the number of currently acquired resources in the pool. +func (s *Stat) AcquiredResources() int32 { + return s.acquiredResources +} + +// IdleResources returns the number of currently idle resources in the pool. +func (s *Stat) IdleResources() int32 { + return s.idleResources +} + +// MaxResources returns the maximum size of the pool. +func (s *Stat) MaxResources() int32 { + return s.maxResources +} + +// AcquireCount returns the cumulative count of successful acquires from the pool. +func (s *Stat) AcquireCount() int64 { + return s.acquireCount +} + +// AcquireDuration returns the total duration of all successful acquires from +// the pool. +func (s *Stat) AcquireDuration() time.Duration { + return s.acquireDuration +} + +// EmptyAcquireCount returns the cumulative count of successful acquires from the pool +// that waited for a resource to be released or constructed because the pool was +// empty. +func (s *Stat) EmptyAcquireCount() int64 { + return s.emptyAcquireCount +} + +// CanceledAcquireCount returns the cumulative count of acquires from the pool +// that were canceled by a context. +func (s *Stat) CanceledAcquireCount() int64 { + return s.canceledAcquireCount +} + +// Stat returns the current pool statistics. +func (p *Pool) Stat() *Stat { + p.cond.L.Lock() + s := &Stat{ + maxResources: p.maxSize, + acquireCount: p.acquireCount, + emptyAcquireCount: p.emptyAcquireCount, + canceledAcquireCount: p.canceledAcquireCount, + acquireDuration: p.acquireDuration, + } + + for _, res := range p.allResources { + switch res.status { + case resourceStatusConstructing: + s.constructingResources += 1 + case resourceStatusIdle: + s.idleResources += 1 + case resourceStatusAcquired: + s.acquiredResources += 1 + } + } + + p.cond.L.Unlock() + return s +} + +// Acquire gets a resource from the pool. If no resources are available and the pool +// is not at maximum capacity it will create a new resource. If the pool is at +// maximum capacity it will block until a resource is available. ctx can be used +// to cancel the Acquire. +func (p *Pool) Acquire(ctx context.Context) (*Resource, error) { + startNano := nanotime() + if doneChan := ctx.Done(); doneChan != nil { + select { + case <-ctx.Done(): + p.cond.L.Lock() + p.canceledAcquireCount += 1 + p.cond.L.Unlock() + return nil, ctx.Err() + default: + } + } + + p.cond.L.Lock() + + emptyAcquire := false + + for { + if p.closed { + p.cond.L.Unlock() + return nil, ErrClosedPool + } + + // If a resource is available now + if len(p.idleResources) > 0 { + res := p.idleResources[len(p.idleResources)-1] + p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak + p.idleResources = p.idleResources[:len(p.idleResources)-1] + res.status = resourceStatusAcquired + if emptyAcquire { + p.emptyAcquireCount += 1 + } + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.cond.L.Unlock() + return res, nil + } + + emptyAcquire = true + + // If there is room to create a resource do so + if len(p.allResources) < int(p.maxSize) { + res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + p.allResources = append(p.allResources, res) + p.destructWG.Add(1) + p.cond.L.Unlock() + + value, err := p.constructResourceValue(ctx) + p.cond.L.Lock() + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + + select { + case <-ctx.Done(): + if err == ctx.Err() { + p.canceledAcquireCount += 1 + } + default: + } + + p.cond.L.Unlock() + p.cond.Signal() + return nil, err + } + + res.value = value + res.status = resourceStatusAcquired + p.emptyAcquireCount += 1 + p.acquireCount += 1 + p.acquireDuration += time.Duration(nanotime() - startNano) + p.cond.L.Unlock() + return res, nil + } + + if ctx.Done() == nil { + p.cond.Wait() + } else { + // Convert p.cond.Wait into a channel + waitChan := make(chan struct{}, 1) + go func() { + p.cond.Wait() + waitChan <- struct{}{} + }() + + select { + case <-ctx.Done(): + // Allow goroutine waiting for signal to exit. Re-signal since we couldn't + // do anything with it. Another goroutine might be waiting. + go func() { + <-waitChan + p.cond.Signal() + p.cond.L.Unlock() + }() + + p.cond.L.Lock() + p.canceledAcquireCount += 1 + p.cond.L.Unlock() + return nil, ctx.Err() + case <-waitChan: + } + } + } +} + +// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no +// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only +// used to cancel the background creation. +func (p *Pool) TryAcquire(ctx context.Context) (*Resource, error) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if p.closed { + return nil, ErrClosedPool + } + + // If a resource is available now + if len(p.idleResources) > 0 { + res := p.idleResources[len(p.idleResources)-1] + p.idleResources[len(p.idleResources)-1] = nil // Avoid memory leak + p.idleResources = p.idleResources[:len(p.idleResources)-1] + p.acquireCount += 1 + res.status = resourceStatusAcquired + return res, nil + } + + if len(p.allResources) < int(p.maxSize) { + res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing} + p.allResources = append(p.allResources, res) + p.destructWG.Add(1) + + go func() { + value, err := p.constructResourceValue(ctx) + defer p.cond.Signal() + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if err != nil { + p.allResources = removeResource(p.allResources, res) + p.destructWG.Done() + return + } + + res.value = value + res.status = resourceStatusIdle + p.idleResources = append(p.idleResources, res) + }() + } + + return nil, ErrNotAvailable +} + +// AcquireAllIdle atomically acquires all currently idle resources. Its intended +// use is for health check and keep-alive functionality. It does not update pool +// statistics. +func (p *Pool) AcquireAllIdle() []*Resource { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return nil + } + + for _, res := range p.idleResources { + res.status = resourceStatusAcquired + } + resources := p.idleResources // Swap out current slice + p.idleResources = nil + + p.cond.L.Unlock() + return resources +} + +// CreateResource constructs a new resource without acquiring it. +// It goes straight in the IdlePool. It does not check against maxSize. +// It can be useful to maintain warm resources under little load. +func (p *Pool) CreateResource(ctx context.Context) error { + p.cond.L.Lock() + if p.closed { + p.cond.L.Unlock() + return ErrClosedPool + } + p.cond.L.Unlock() + + value, err := p.constructResourceValue(ctx) + if err != nil { + return err + } + + res := &Resource{ + pool: p, + creationTime: time.Now(), + status: resourceStatusIdle, + value: value, + lastUsedNano: nanotime(), + } + p.destructWG.Add(1) + + p.cond.L.Lock() + // If closed while constructing resource then destroy it and return an error + if p.closed { + go p.destructResourceValue(res.value) + p.cond.L.Unlock() + return ErrClosedPool + } + p.allResources = append(p.allResources, res) + p.idleResources = append(p.idleResources, res) + p.cond.L.Unlock() + + return nil +} + +// releaseAcquiredResource returns res to the the pool. +func (p *Pool) releaseAcquiredResource(res *Resource, lastUsedNano int64) { + p.cond.L.Lock() + + if !p.closed { + res.lastUsedNano = lastUsedNano + res.status = resourceStatusIdle + p.idleResources = append(p.idleResources, res) + } else { + p.allResources = removeResource(p.allResources, res) + go p.destructResourceValue(res.value) + } + + p.cond.L.Unlock() + p.cond.Signal() +} + +// Remove removes res from the pool and closes it. If res is not part of the +// pool Remove will panic. +func (p *Pool) destroyAcquiredResource(res *Resource) { + p.destructResourceValue(res.value) + p.cond.L.Lock() + p.allResources = removeResource(p.allResources, res) + p.cond.L.Unlock() + p.cond.Signal() +} + +func (p *Pool) hijackAcquiredResource(res *Resource) { + p.cond.L.Lock() + + p.allResources = removeResource(p.allResources, res) + res.status = resourceStatusHijacked + p.destructWG.Done() // not responsible for destructing hijacked resources + + p.cond.L.Unlock() + p.cond.Signal() +} + +func removeResource(slice []*Resource, res *Resource) []*Resource { + for i := range slice { + if slice[i] == res { + slice[i] = slice[len(slice)-1] + slice[len(slice)-1] = nil // Avoid memory leak + return slice[:len(slice)-1] + } + } + + panic("BUG: removeResource could not find res in slice") +} + +func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) { + return p.constructor(ctx) +} + +func (p *Pool) destructResourceValue(value interface{}) { + p.destructor(value) + p.destructWG.Done() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 864b9b6c..e0b67629 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -401,6 +401,10 @@ github.com/jackc/pgx/pgtype ## explicit; go 1.13 github.com/jackc/pgx/v4 github.com/jackc/pgx/v4/internal/sanitize +github.com/jackc/pgx/v4/pgxpool +# github.com/jackc/puddle v1.2.1 +## explicit; go 1.12 +github.com/jackc/puddle # github.com/jgautheron/goconst v1.6.0 ## explicit; go 1.13 github.com/jgautheron/goconst diff --git a/verify.go b/verify.go index fd87107b..8be83ff2 100644 --- a/verify.go +++ b/verify.go @@ -6,6 +6,7 @@ import ( "github.com/jackc/pgx/pgtype" "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.uber.org/multierr" @@ -13,14 +14,14 @@ import ( // Verify runs all verification tests for the given table, configured by // the supplied Options. -func Verify(ctx context.Context, targets []*pgx.ConnConfig, opts ...Option) (*Results, error) { +func Verify(ctx context.Context, targets []*pgxpool.Config, opts ...Option) (*Results, error) { c := NewConfig(opts...) return c.Verify(ctx, targets) } // Verify runs all verification tests for the given table. -func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results, error) { +func (c Config) Verify(ctx context.Context, targets []*pgxpool.Config) (*Results, error) { var finalResults *Results if err := c.Validate(); err != nil { @@ -31,33 +32,33 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results // First check that we can connect to every specified target database. targetNames := make([]string, len(targets)) - conns := make(map[int]*pgx.Conn) + conns := make(map[int]*pgxpool.Pool) for i, target := range targets { pgxLoggerFields := logrus.Fields{ "component": "pgx", - "host": targets[i].Host, - "port": targets[i].Port, - "database": targets[i].Database, - "user": targets[i].User, + "host": targets[i].ConnConfig.Host, + "port": targets[i].ConnConfig.Port, + "database": targets[i].ConnConfig.Database, + "user": targets[i].ConnConfig.User, } if len(c.Aliases) == len(targets) { targetNames[i] = c.Aliases[i] pgxLoggerFields["alias"] = c.Aliases[i] } else { - targetNames[i] = targets[i].Host + targetNames[i] = targets[i].ConnConfig.Host } - target.Logger = &pgxLogger{c.Logger.WithFields(pgxLoggerFields)} + target.ConnConfig.Logger = &pgxLogger{c.Logger.WithFields(pgxLoggerFields)} - target.LogLevel = pgx.LogLevelError + target.ConnConfig.LogLevel = pgx.LogLevelError - conn, err := pgx.ConnectConfig(ctx, target) + conn, err := pgxpool.ConnectConfig(ctx, target) if err != nil { return finalResults, err } - defer conn.Close(ctx) + defer conn.Close() conns[i] = conn } @@ -86,7 +87,7 @@ func (c Config) Verify(ctx context.Context, targets []*pgx.ConnConfig) (*Results return finalResults, nil } -func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgx.Conn, finalResults *Results, wg *sync.WaitGroup) { +func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *pgxpool.Pool, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() logger := c.Logger.WithField("target", targetName) @@ -109,7 +110,7 @@ func (c Config) runTestsOnTarget(ctx context.Context, targetName string, conn *p logger.Info("Table hashes computed") } -func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgx.Conn) (DatabaseResult, error) { +func (c Config) fetchTargetTableNames(ctx context.Context, conn *pgxpool.Pool) (DatabaseResult, error) { schemaTableHashes := make(DatabaseResult) rows, err := conn.Query(ctx, buildGetTablesQuery(c.IncludeSchemas, c.ExcludeSchemas, c.IncludeTables, c.ExcludeTables)) @@ -157,7 +158,7 @@ func (c Config) validColumnTarget(columnName string) bool { return false } -func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { +func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, conn *pgxpool.Pool, targetName, schemaName, tableName string, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() tableLogger := logger.WithField("table", tableName).WithField("schema", schemaName) @@ -240,7 +241,7 @@ func (c Config) runTestQueriesOnTable(ctx context.Context, logger *logrus.Entry, } } -func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgx.Conn, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { +func runTestOnTable(ctx context.Context, logger *logrus.Entry, conn *pgxpool.Pool, targetName, schemaName, tableName, testMode, query string, finalResults *Results, wg *sync.WaitGroup) { defer wg.Done() row := conn.QueryRow(ctx, query)