diff --git a/cli/commands/migrate.go b/cli/commands/migrate.go index 15ddfb1b..77c0ce32 100644 --- a/cli/commands/migrate.go +++ b/cli/commands/migrate.go @@ -54,6 +54,8 @@ func ExecuteMigration(cmd string, t *migrate.Migrate, stepOrVersion int64) error err = mig.UpCmd(t, stepOrVersion) case "down": err = mig.DownCmd(t, stepOrVersion) + case "gotoVersion": + err = mig.GotoVersionCmd(t, stepOrVersion) case "version": var direction string if stepOrVersion >= 0 { diff --git a/cli/commands/migrate_apply.go b/cli/commands/migrate_apply.go index bf49aa25..59953e4e 100644 --- a/cli/commands/migrate_apply.go +++ b/cli/commands/migrate_apply.go @@ -17,8 +17,8 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { EC: ec, } migrateApplyCmd := &cobra.Command{ - Use: "apply", - Short: "Apply migrations on the database", + Use: "apply", + Short: "Apply migrations on the database", Example: ` # Apply all migrations hasura migrate apply @@ -42,6 +42,12 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { # Apply only a particular version hasura migrate apply --type up --version "" + + # Apply all up migrations upto version 125, last applied is 100 + hasura migrate apply --goto 125 + + # Apply all down migrations upto version 125, last applied is 150 + hasura migrate apply --goto 125 # Rollback a particular version: hasura migrate apply --type down --version "" @@ -58,19 +64,33 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { err := opts.run() opts.EC.Spinner.Stop() if err != nil { - return err + if err == migrate.ErrNoChange { + opts.EC.Logger.Info("nothing to apply") + return nil + } + if e, ok := err.(*os.PathError); ok { + // If Op is first, then log No migrations to apply + if e.Op == "first" { + opts.EC.Logger.Info("nothing to apply") + return nil + } + } + return errors.Wrap(err, "apply failed") } opts.EC.Logger.Info("migrations applied") return nil }, } f := migrateApplyCmd.Flags() + f.SortFlags = false f.StringVar(&opts.upMigration, "up", "", "apply all or N up migration steps") f.StringVar(&opts.downMigration, "down", "", "apply all or N down migration steps") + f.StringVar(&opts.gotoVersion, "goto", "", "apply migration chain up to to the version specified") + f.StringVar(&opts.versionMigration, "version", "", "only apply this particular migration") - f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag") f.BoolVar(&opts.skipExecution, "skip-execution", false, "skip executing the migration action, but mark them as applied") + f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag") f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine") f.String("admin-secret", "", "admin secret for Hasura GraphQL Engine") @@ -91,11 +111,13 @@ type migrateApplyOptions struct { downMigration string versionMigration string migrationType string - skipExecution bool + // version up to which migration chain has to be applied + gotoVersion string + skipExecution bool } func (o *migrateApplyOptions) run() error { - migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.skipExecution) + migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.gotoVersion, o.skipExecution) if err != nil { return errors.Wrap(err, "error validating flags") } @@ -106,27 +128,12 @@ func (o *migrateApplyOptions) run() error { } migrateDrv.SkipExecution = o.skipExecution - err = ExecuteMigration(migrationType, migrateDrv, step) - if err != nil { - if err == migrate.ErrNoChange { - o.EC.Logger.Info("nothing to apply") - return nil - } - if e, ok := err.(*os.PathError); ok { - // If Op is first, then log No migrations to apply - if e.Op == "first" { - o.EC.Logger.Info("No migrations to apply") - return nil - } - } - return errors.Wrap(err, "apply failed") - } - return nil + return ExecuteMigration(migrationType, migrateDrv, step) } // Only one flag out of up, down and version can be set at a time. This function // checks whether that is the case and returns an error is not -func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType string, skipExecution bool) (string, int64, error) { +func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType, gotoVersion string, skipExecution bool) (string, int64, error) { var flagCount = 0 var stepString = "all" var migrationName = "up" @@ -147,6 +154,11 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra } flagCount++ } + if gotoVersion != "" { + migrationName = "gotoVersion" + stepString = gotoVersion + flagCount++ + } if flagCount > 1 { return "", 0, errors.New("Only one migration type can be applied at a time (--up, --down or --goto)") @@ -162,7 +174,7 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra step, err := strconv.ParseInt(stepString, 10, 64) if err != nil { - return "", 0, errors.Wrap(err, "not a valid input for steps") + return "", 0, errors.Wrap(err, "not a valid input for steps/version") } return migrationName, step, nil } diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 534793ba..464b21c7 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -255,3 +255,7 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s return } + +func GotoVersionCmd(m *migrate.Migrate, gotoVersion int64) error { + return m.GotoVersion(gotoVersion) +} diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 94be7800..6f557f3c 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/hasura/graphql-engine/cli/migrate/database" "github.com/hasura/graphql-engine/cli/migrate/source" @@ -1231,7 +1233,7 @@ func (m *Migrate) versionUpExists(version uint64) error { // versionDownExists checks the source if either the up or down migration for // the specified migration version exists. func (m *Migrate) versionDownExists(version uint64) error { - // try up migration first + // try down migration first directions := m.sourceDrv.GetDirections(version) if !directions[source.Down] && !directions[source.MetaDown] { return fmt.Errorf("%d down migration not found", version) @@ -1470,3 +1472,228 @@ func (m *Migrate) unlockErr(prevErr error) error { } return prevErr } + +// GotoVersion will apply a version also applying the migration chain +// leading to it +func (m *Migrate) GotoVersion(gotoVersion int64) error { + mode, err := m.databaseDrv.GetSetting("migration_mode") + if err != nil { + return err + } + if mode != "true" { + return ErrNoMigrationMode + } + + currentVersion, dirty, err := m.Version() + currVersion := int64(currentVersion) + if err != nil { + if err == ErrNilVersion { + currVersion = database.NilVersion + } else { + return errors.Wrap(err, "cannot determine version") + } + } + if dirty { + return ErrDirty{currVersion} + } + + if err := m.lock(); err != nil { + return err + } + + ret := make(chan interface{}) + if currVersion <= gotoVersion { + go m.readUpFromVersion(-1, gotoVersion, ret) + } else if currVersion > gotoVersion { + go m.readDownFromVersion(currVersion, gotoVersion, ret) + } + + return m.unlockErr(m.runMigrations(ret)) + +} + +// readUpFromVersion reads up migrations from `from` limitted by `limit`. (is a modified version of readUp) +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readUpFromVersion is done reading it will close the ret channel. +func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{}) { + defer close(ret) + var noOfAppliedMigrations int + for { + if m.stop() { + return + } + if from == to { + if noOfAppliedMigrations == 0 { + ret <- ErrNoChange + } + return + } + + if from == -1 { + firstVersion, err := m.sourceDrv.First() + if err != nil { + ret <- err + return + } + + // Check if this version present in DB + ok := m.databaseDrv.Read(firstVersion) + if ok { + from = int64(firstVersion) + continue + } + + // Check if firstVersion files exists (yaml or sql) + if err = m.versionUpExists(firstVersion); err != nil { + ret <- err + return + } + + migr, err := m.newMigration(firstVersion, int64(firstVersion)) + if err != nil { + ret <- err + return + } + ret <- migr + go migr.Buffer() + + migr, err = m.metanewMigration(firstVersion, int64(firstVersion)) + if err != nil { + ret <- err + return + } + ret <- migr + + go migr.Buffer() + from = int64(firstVersion) + noOfAppliedMigrations++ + continue + } + + // apply next migration + next, err := m.sourceDrv.Next(suint64(from)) + if err != nil { + ret <- err + return + } + + // Check if this version present in DB + ok := m.databaseDrv.Read(next) + if ok { + from = int64(next) + continue + } + + // Check if next files exists (yaml or sql) + if err = m.versionUpExists(next); err != nil { + ret <- err + return + } + + migr, err := m.newMigration(next, int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.metanewMigration(next, int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + from = int64(next) + noOfAppliedMigrations++ + } +} + +// readDownFromVersion reads down migrations from `from` limitted by `limit`. (modified version of readDown) +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readDownFromVersion is done reading it will close the ret channel. +func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface{}) { + defer close(ret) + var err error + var noOfAppliedMigrations int + for { + if m.stop() { + return + } + + if from == to { + if noOfAppliedMigrations == 0 { + ret <- ErrNoChange + } + return + } + + err = m.versionDownExists(suint64(from)) + if err != nil { + ret <- err + return + } + + prev, ok := m.databaseDrv.Prev(suint64(from)) + if !ok { + // Check if any prev version available in source + prev, err = m.sourceDrv.Prev(suint64(from)) + if os.IsNotExist(err) && to == -1 { + // apply nil migration + migr, err := m.metanewMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + from = database.NilVersion + noOfAppliedMigrations++ + continue + } else if err != nil { + ret <- err + return + } + ret <- fmt.Errorf("%v not applied on database", prev) + return + } + + migr, err := m.metanewMigration(suint64(from), int64(prev)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(suint64(from), int64(prev)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + from = int64(prev) + noOfAppliedMigrations++ + } +}