mirror of
https://github.com/zhigang1992/graphql-engine.git
synced 2026-05-18 01:31:18 +08:00
* cli: add --goto <version> flag to migrate command * cli: fix error encoutered in goto, when on version -1 * do down migrations one step down * reorganize code * in case of up "gotos" make sure that all previous migration in migration chain are applied * modify readUp and readDown functions to account for --goto use case * refactor to clean up and adopt a better implementation addressing the comments * remove empty error handling step * return ErrNoChange when no migrations were applied * respect m.stop() can panic so place conditional cases after that and add a couple other small fixes * fix bug caused by early checking of versions * fix to add migration_mode and nil version check * add some more examples for migrate apply command Co-authored-by: Aravind Shankar <face11301@gmail.com> Co-authored-by: Shahidh K Muhammed <muhammedshahid.k@gmail.com>
This commit is contained in:
@@ -54,6 +54,8 @@ func ExecuteMigration(cmd string, t *migrate.Migrate, stepOrVersion int64) error
|
||||
err = mig.UpCmd(t, stepOrVersion)
|
||||
case "down":
|
||||
err = mig.DownCmd(t, stepOrVersion)
|
||||
case "gotoVersion":
|
||||
err = mig.GotoVersionCmd(t, stepOrVersion)
|
||||
case "version":
|
||||
var direction string
|
||||
if stepOrVersion >= 0 {
|
||||
|
||||
@@ -17,8 +17,8 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {
|
||||
EC: ec,
|
||||
}
|
||||
migrateApplyCmd := &cobra.Command{
|
||||
Use: "apply",
|
||||
Short: "Apply migrations on the database",
|
||||
Use: "apply",
|
||||
Short: "Apply migrations on the database",
|
||||
Example: ` # Apply all migrations
|
||||
hasura migrate apply
|
||||
|
||||
@@ -42,6 +42,12 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {
|
||||
|
||||
# Apply only a particular version
|
||||
hasura migrate apply --type up --version "<version>"
|
||||
|
||||
# Apply all up migrations upto version 125, last applied is 100
|
||||
hasura migrate apply --goto 125
|
||||
|
||||
# Apply all down migrations upto version 125, last applied is 150
|
||||
hasura migrate apply --goto 125
|
||||
|
||||
# Rollback a particular version:
|
||||
hasura migrate apply --type down --version "<version>"
|
||||
@@ -58,19 +64,33 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {
|
||||
err := opts.run()
|
||||
opts.EC.Spinner.Stop()
|
||||
if err != nil {
|
||||
return err
|
||||
if err == migrate.ErrNoChange {
|
||||
opts.EC.Logger.Info("nothing to apply")
|
||||
return nil
|
||||
}
|
||||
if e, ok := err.(*os.PathError); ok {
|
||||
// If Op is first, then log No migrations to apply
|
||||
if e.Op == "first" {
|
||||
opts.EC.Logger.Info("nothing to apply")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.Wrap(err, "apply failed")
|
||||
}
|
||||
opts.EC.Logger.Info("migrations applied")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
f := migrateApplyCmd.Flags()
|
||||
f.SortFlags = false
|
||||
|
||||
f.StringVar(&opts.upMigration, "up", "", "apply all or N up migration steps")
|
||||
f.StringVar(&opts.downMigration, "down", "", "apply all or N down migration steps")
|
||||
f.StringVar(&opts.gotoVersion, "goto", "", "apply migration chain up to to the version specified")
|
||||
|
||||
f.StringVar(&opts.versionMigration, "version", "", "only apply this particular migration")
|
||||
f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag")
|
||||
f.BoolVar(&opts.skipExecution, "skip-execution", false, "skip executing the migration action, but mark them as applied")
|
||||
f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag")
|
||||
|
||||
f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine")
|
||||
f.String("admin-secret", "", "admin secret for Hasura GraphQL Engine")
|
||||
@@ -91,11 +111,13 @@ type migrateApplyOptions struct {
|
||||
downMigration string
|
||||
versionMigration string
|
||||
migrationType string
|
||||
skipExecution bool
|
||||
// version up to which migration chain has to be applied
|
||||
gotoVersion string
|
||||
skipExecution bool
|
||||
}
|
||||
|
||||
func (o *migrateApplyOptions) run() error {
|
||||
migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.skipExecution)
|
||||
migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.gotoVersion, o.skipExecution)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error validating flags")
|
||||
}
|
||||
@@ -106,27 +128,12 @@ func (o *migrateApplyOptions) run() error {
|
||||
}
|
||||
migrateDrv.SkipExecution = o.skipExecution
|
||||
|
||||
err = ExecuteMigration(migrationType, migrateDrv, step)
|
||||
if err != nil {
|
||||
if err == migrate.ErrNoChange {
|
||||
o.EC.Logger.Info("nothing to apply")
|
||||
return nil
|
||||
}
|
||||
if e, ok := err.(*os.PathError); ok {
|
||||
// If Op is first, then log No migrations to apply
|
||||
if e.Op == "first" {
|
||||
o.EC.Logger.Info("No migrations to apply")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.Wrap(err, "apply failed")
|
||||
}
|
||||
return nil
|
||||
return ExecuteMigration(migrationType, migrateDrv, step)
|
||||
}
|
||||
|
||||
// Only one flag out of up, down and version can be set at a time. This function
|
||||
// checks whether that is the case and returns an error is not
|
||||
func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType string, skipExecution bool) (string, int64, error) {
|
||||
func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType, gotoVersion string, skipExecution bool) (string, int64, error) {
|
||||
var flagCount = 0
|
||||
var stepString = "all"
|
||||
var migrationName = "up"
|
||||
@@ -147,6 +154,11 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra
|
||||
}
|
||||
flagCount++
|
||||
}
|
||||
if gotoVersion != "" {
|
||||
migrationName = "gotoVersion"
|
||||
stepString = gotoVersion
|
||||
flagCount++
|
||||
}
|
||||
|
||||
if flagCount > 1 {
|
||||
return "", 0, errors.New("Only one migration type can be applied at a time (--up, --down or --goto)")
|
||||
@@ -162,7 +174,7 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra
|
||||
|
||||
step, err := strconv.ParseInt(stepString, 10, 64)
|
||||
if err != nil {
|
||||
return "", 0, errors.Wrap(err, "not a valid input for steps")
|
||||
return "", 0, errors.Wrap(err, "not a valid input for steps/version")
|
||||
}
|
||||
return migrationName, step, nil
|
||||
}
|
||||
|
||||
@@ -255,3 +255,7 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func GotoVersionCmd(m *migrate.Migrate, gotoVersion int64) error {
|
||||
return m.GotoVersion(gotoVersion)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/migrate/database"
|
||||
"github.com/hasura/graphql-engine/cli/migrate/source"
|
||||
|
||||
@@ -1231,7 +1233,7 @@ func (m *Migrate) versionUpExists(version uint64) error {
|
||||
// versionDownExists checks the source if either the up or down migration for
|
||||
// the specified migration version exists.
|
||||
func (m *Migrate) versionDownExists(version uint64) error {
|
||||
// try up migration first
|
||||
// try down migration first
|
||||
directions := m.sourceDrv.GetDirections(version)
|
||||
if !directions[source.Down] && !directions[source.MetaDown] {
|
||||
return fmt.Errorf("%d down migration not found", version)
|
||||
@@ -1470,3 +1472,228 @@ func (m *Migrate) unlockErr(prevErr error) error {
|
||||
}
|
||||
return prevErr
|
||||
}
|
||||
|
||||
// GotoVersion will apply a version also applying the migration chain
|
||||
// leading to it
|
||||
func (m *Migrate) GotoVersion(gotoVersion int64) error {
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if mode != "true" {
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
currentVersion, dirty, err := m.Version()
|
||||
currVersion := int64(currentVersion)
|
||||
if err != nil {
|
||||
if err == ErrNilVersion {
|
||||
currVersion = database.NilVersion
|
||||
} else {
|
||||
return errors.Wrap(err, "cannot determine version")
|
||||
}
|
||||
}
|
||||
if dirty {
|
||||
return ErrDirty{currVersion}
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ret := make(chan interface{})
|
||||
if currVersion <= gotoVersion {
|
||||
go m.readUpFromVersion(-1, gotoVersion, ret)
|
||||
} else if currVersion > gotoVersion {
|
||||
go m.readDownFromVersion(currVersion, gotoVersion, ret)
|
||||
}
|
||||
|
||||
return m.unlockErr(m.runMigrations(ret))
|
||||
|
||||
}
|
||||
|
||||
// readUpFromVersion reads up migrations from `from` limitted by `limit`. (is a modified version of readUp)
|
||||
// limit can be -1, implying no limit and reading until there are no more migrations.
|
||||
// Each migration is then written to the ret channel.
|
||||
// If an error occurs during reading, that error is written to the ret channel, too.
|
||||
// Once readUpFromVersion is done reading it will close the ret channel.
|
||||
func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{}) {
|
||||
defer close(ret)
|
||||
var noOfAppliedMigrations int
|
||||
for {
|
||||
if m.stop() {
|
||||
return
|
||||
}
|
||||
if from == to {
|
||||
if noOfAppliedMigrations == 0 {
|
||||
ret <- ErrNoChange
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if from == -1 {
|
||||
firstVersion, err := m.sourceDrv.First()
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this version present in DB
|
||||
ok := m.databaseDrv.Read(firstVersion)
|
||||
if ok {
|
||||
from = int64(firstVersion)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if firstVersion files exists (yaml or sql)
|
||||
if err = m.versionUpExists(firstVersion); err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
migr, err := m.newMigration(firstVersion, int64(firstVersion))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
|
||||
migr, err = m.metanewMigration(firstVersion, int64(firstVersion))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
ret <- migr
|
||||
|
||||
go migr.Buffer()
|
||||
from = int64(firstVersion)
|
||||
noOfAppliedMigrations++
|
||||
continue
|
||||
}
|
||||
|
||||
// apply next migration
|
||||
next, err := m.sourceDrv.Next(suint64(from))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this version present in DB
|
||||
ok := m.databaseDrv.Read(next)
|
||||
if ok {
|
||||
from = int64(next)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if next files exists (yaml or sql)
|
||||
if err = m.versionUpExists(next); err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
migr, err := m.newMigration(next, int64(next))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
|
||||
migr, err = m.metanewMigration(next, int64(next))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
from = int64(next)
|
||||
noOfAppliedMigrations++
|
||||
}
|
||||
}
|
||||
|
||||
// readDownFromVersion reads down migrations from `from` limitted by `limit`. (modified version of readDown)
|
||||
// limit can be -1, implying no limit and reading until there are no more migrations.
|
||||
// Each migration is then written to the ret channel.
|
||||
// If an error occurs during reading, that error is written to the ret channel, too.
|
||||
// Once readDownFromVersion is done reading it will close the ret channel.
|
||||
func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface{}) {
|
||||
defer close(ret)
|
||||
var err error
|
||||
var noOfAppliedMigrations int
|
||||
for {
|
||||
if m.stop() {
|
||||
return
|
||||
}
|
||||
|
||||
if from == to {
|
||||
if noOfAppliedMigrations == 0 {
|
||||
ret <- ErrNoChange
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = m.versionDownExists(suint64(from))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
prev, ok := m.databaseDrv.Prev(suint64(from))
|
||||
if !ok {
|
||||
// Check if any prev version available in source
|
||||
prev, err = m.sourceDrv.Prev(suint64(from))
|
||||
if os.IsNotExist(err) && to == -1 {
|
||||
// apply nil migration
|
||||
migr, err := m.metanewMigration(suint64(from), -1)
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
|
||||
migr, err = m.newMigration(suint64(from), -1)
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
|
||||
from = database.NilVersion
|
||||
noOfAppliedMigrations++
|
||||
continue
|
||||
} else if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
ret <- fmt.Errorf("%v not applied on database", prev)
|
||||
return
|
||||
}
|
||||
|
||||
migr, err := m.metanewMigration(suint64(from), int64(prev))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
|
||||
migr, err = m.newMigration(suint64(from), int64(prev))
|
||||
if err != nil {
|
||||
ret <- err
|
||||
return
|
||||
}
|
||||
|
||||
ret <- migr
|
||||
go migr.Buffer()
|
||||
from = int64(prev)
|
||||
noOfAppliedMigrations++
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user