How do I execute inserts and updates in an Alembic upgrade script?

I need to alter data during an Alembic upgrade.

I currently have a 'players' table in a first revision:

def upgrade():
    op.create_table('player',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.Unicode(length=200), nullable=False),
        sa.Column('position', sa.Unicode(length=200), nullable=True),
        sa.Column('team', sa.Unicode(length=100), nullable=True)
        sa.PrimaryKeyConstraint('id')
    )

I want to introduce a 'teams' table. I've created a second revision:

def upgrade():
    op.create_table('teams',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=80), nullable=False)
    )
    op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

I would like the second migration to also add the following data:

  1. Populate teams table:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
    
  2. Update players.team_id based on players.team name:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;
    

How do I execute inserts and updates inside the upgrade script?


What you are asking for is a data migration, as opposed to the schema migration that is most prevalent in the Alembic docs.

This answer assumes you are using declarative (as opposed to class-Mapper-Table or core) to define your models. It should be relatively straightforward to adapt this to the other forms.

Note that Alembic provides some basic data functions: op.bulk_insert() and op.execute(). If the operations are fairly minimal, use those. If the migration requires relationships or other complex interactions, I prefer to use the full power of models and sessions as described below.

The following is an example migration script that sets up some declarative models that will be used to manipulate data in a session. The key points are:

  1. Define the basic models you need, with the columns you'll need. You don't need every column, just the primary key and the ones you'll be using.

  2. Within the upgrade function, use op.get_bind() to get the current connection, and make a session with it.

    • Or use bind.execute() to use SQLAlchemy's lower level to write SQL queries directly. This is useful for simple migrations.
  3. Use the models and session as you normally would in your application.

"""create teams table

Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""

revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'

from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class Player(Base):
    __tablename__ = 'players'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False)
    team_name = sa.Column('team', sa.String, nullable=False)
    team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)

    team = orm.relationship('Team', backref='players')


class Team(Base):
    __tablename__ = 'teams'

    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String, nullable=False, unique=True)


def upgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # create the teams table and the players.team_id column
    Team.__table__.create(bind)
    op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)

    # create teams for each team name
    teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
    session.add_all(teams.values())

    # set player team based on team name
    for player in session.query(Player):
        player.team = teams[player.team_name]

    session.commit()

    # don't need team name now that team relationship is set
    op.drop_column('players', 'team')


def downgrade():
    bind = op.get_bind()
    session = orm.Session(bind=bind)

    # re-add the players.team column
    op.add_column('players', sa.Column('team', sa.String, nullable=False)

    # set players.team based on team relationship
    for player in session.query(Player):
        player.team_name = player.team.name

    session.commit()

    op.drop_column('players', 'team_id')
    op.drop_table('teams')

The migration defines separate models because the models in your code represent the current state of the database, while the migrations represent steps along the way. Your database might be in any state along that path, so the models might not sync up with the database yet. Unless you're very careful, using the real models directly will cause problems with missing columns, invalid data, etc. It's clearer to explicitly state exactly what columns and models you will use in the migration.


You can also use direct SQL see (Alembic Operation Reference) as in the following example:

from alembic import op

# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands made by andrew ###
    op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

I recommend using SQLAlchemy core statements using an ad-hoc table, as detailed in the official documentation, because it allows the use of agnostic SQL and pythonic writing and is also self-contained. SQLAlchemy Core is the best of both worlds for migration scripts.

Here is an example of the concept:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op

account = table('account',
    column('name', String)
)
op.execute(
    account.update().\\
    where(account.c.name==op.inline_literal('account 1')).\\
        values({'name':op.inline_literal('account 2')})
        )

# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm

bind = op.get_bind()
session = orm.Session(bind=bind)

data = {
    "name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid