Django bulk_create with ignore rows that cause IntegrityError?

Solution 1:

This is now possible on Django 2.2

Django 2.2 adds a new ignore_conflicts option to the bulk_create method, from the documentation:

On databases that support it (all except PostgreSQL < 9.5 and Oracle), setting the ignore_conflicts parameter to True tells the database to ignore failure to insert any rows that fail constraints such as duplicate unique values. Enabling this parameter disables setting the primary key on each model instance (if the database normally supports it).

Example:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)

Solution 2:

(Note: I don't use Django, so there may be more suitable framework-specific answers)

It is not possible for Django to do this by simply ignoring INSERT failures because PostgreSQL aborts the whole transaction on the first error.

Django would need one of these approaches:

  1. INSERT each row in a separate transaction and ignore errors (very slow);
  2. Create a SAVEPOINT before each insert (can have scaling problems);
  3. Use a procedure or query to insert only if the row doesn't already exist (complicated and slow); or
  4. Bulk-insert or (better) COPY the data into a TEMPORARY table, then merge that into the main table server-side.

The upsert-like approach (3) seems like a good idea, but upsert and insert-if-not-exists are surprisingly complicated.

Personally, I'd take (4): I'd bulk-insert into a new separate table, probably UNLOGGED or TEMPORARY, then I'd run some manual SQL to:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

The LOCK TABLE ... IN EXCLUSIVE MODE prevents a concurrent insert that creates a row from causing a conflict with an insert done by the above statement and failing. It does not prevent concurrent SELECTs, only SELECT ... FOR UPDATE, INSERT,UPDATE and DELETE, so reads from the table carry on as normal.

If you can't afford to block concurrent writes for too long you could instead use a writable CTE to copy ranges of rows from temptable into realtable, retrying each block if it failed.

Solution 3:

One quick-and-dirty workaround for this that doesn't involve manual SQL and temporary tables is to just attempt to bulk insert the data. If it fails, revert to serial insertion.

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

If you have lots and lots of errors this may not be so efficient (you'll spend more time serially inserting than doing so in bulk), but I'm working through a high-cardinality dataset with few duplicates so this solves most of my problems.

Solution 4:

Or 5. Divide and conquer

I didn't test or benchmark this thoroughly, but it performs pretty well for me. YMMV, depending in particular on how many errors you expect to get in a bulk operation.

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()

    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False