SQLITE to SQLALCHEMY swap [duplicate]

Solution 1:

Your problem is that filter_by takes keyword arguments, but filter takes expressions. So expanding a dict for filter_by **mydict will work. With filter, you normally pass it one argument, which happens to be an expression. So when you expand your **filters dict to filter, you pass filter a bunch of keyword arguments that it doesn't understand.

If you want to build up a set of filters from a dict of stored filter args, you can use the generative nature of the query to keep applying filters. For example:

# assuming a model class, User, with attributes, name_last, name_first
my_filters = {'name_last':'Duncan', 'name_first':'Iain'}
query = session.query(User)
for attr,value in my_filters.iteritems():
    query = query.filter( getattr(User,attr)==value )
# now we can run the query
results = query.all()

The great thing about the above pattern is you can use it across multiple joined columns, you can construct 'ands' and 'ors' with and_ and or_, you can do <= or date comparisons, whatever. It's much more flexible than using filter_by with keywords. The only caveat is that for joins you have to be a bit careful you don't accidentally try to join a table twice, and you might have to specify the join condition for complex filtering. I use this in some very complex filtering over a pretty involved domain model and it works like a charm, I just keep a dict going of entities_joined to keep track of the joins.

Solution 2:

I have a similar issue, tried to filter from a dictionary:

filters = {"field": "value"}

Wrong:

...query(MyModel).filter(**filters).all()

Good:

...query(MyModel).filter_by(**filters).all()

Solution 3:

FWIW, There's a Python library designed to solve this exact problem: sqlalchemy-filters

It allows to dynamically filter using all operators, not only ==.

from sqlalchemy_filters import apply_filters


# `query` should be a SQLAlchemy query object

filter_spec = [{'field': 'name', 'op': '==', 'value': 'name_1'}]
filtered_query = apply_filters(query, filter_spec)

more_filters = [{'field': 'foo_id', 'op': 'is_not_null'}]
filtered_query = apply_filters(filtered_query, more_filters)

result = filtered_query.all()

Solution 4:

class Place(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    search_id = db.Column(db.Integer, db.ForeignKey('search.id'), nullable=False)

    @classmethod
    def dynamic_filter(model_class, filter_condition):
        '''
        Return filtered queryset based on condition.
        :param query: takes query
        :param filter_condition: Its a list, ie: [(key,operator,value)]
        operator list:
            eq for ==
            lt for <
            ge for >=
            in for in_
            like for like
            value could be list or a string
        :return: queryset
        '''
        __query = db.session.query(model_class)
        for raw in filter_condition:
            try:
                key, op, value = raw
            except ValueError:
                raise Exception('Invalid filter: %s' % raw)
            column = getattr(model_class, key, None)
            if not column:
                raise Exception('Invalid filter column: %s' % key)
            if op == 'in':
                if isinstance(value, list):
                    filt = column.in_(value)
                else:
                    filt = column.in_(value.split(','))
            else:
                try:
                    attr = list(filter(lambda e: hasattr(column, e % op), ['%s', '%s_', '__%s__']))[0] % op
                except IndexError:
                    raise Exception('Invalid filter operator: %s' % op)
                if value == 'null':
                    value = None
                filt = getattr(column, attr)(value)
            __query = __query.filter(filt)
        return __query

Execute like:

places = Place.dynamic_filter([('search_id', 'eq', 1)]).all()