Asynchronous locking based on a key
I'm attempting to figure out an issue that has been raised with my ImageProcessor library here where I am getting intermittent file access errors when adding items to the cache.
System.IO.IOException: The process cannot access the file 'D:\home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp' because it is being used by another process.
I wrote a class designed to perform an asynchronous lock based upon a key generated by a hashed url but it seems I have missed something in the implementation.
My locking class
public sealed class AsyncDuplicateLock
{
/// <summary>
/// The collection of semaphore slims.
/// </summary>
private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
= new ConcurrentDictionary<object, SemaphoreSlim>();
/// <summary>
/// Locks against the given key.
/// </summary>
/// <param name="key">
/// The key that identifies the current object.
/// </param>
/// <returns>
/// The disposable <see cref="Task"/>.
/// </returns>
public IDisposable Lock(object key)
{
DisposableScope releaser = new DisposableScope(
key,
s =>
{
SemaphoreSlim locker;
if (SemaphoreSlims.TryRemove(s, out locker))
{
locker.Release();
locker.Dispose();
}
});
SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
semaphore.Wait();
return releaser;
}
/// <summary>
/// Asynchronously locks against the given key.
/// </summary>
/// <param name="key">
/// The key that identifies the current object.
/// </param>
/// <returns>
/// The disposable <see cref="Task"/>.
/// </returns>
public Task<IDisposable> LockAsync(object key)
{
DisposableScope releaser = new DisposableScope(
key,
s =>
{
SemaphoreSlim locker;
if (SemaphoreSlims.TryRemove(s, out locker))
{
locker.Release();
locker.Dispose();
}
});
Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
Task waitTask = semaphore.WaitAsync();
return waitTask.IsCompleted
? releaserTask
: waitTask.ContinueWith(
(_, r) => (IDisposable)r,
releaser,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
/// <summary>
/// The disposable scope.
/// </summary>
private sealed class DisposableScope : IDisposable
{
/// <summary>
/// The key
/// </summary>
private readonly object key;
/// <summary>
/// The close scope action.
/// </summary>
private readonly Action<object> closeScopeAction;
/// <summary>
/// Initializes a new instance of the <see cref="DisposableScope"/> class.
/// </summary>
/// <param name="key">
/// The key.
/// </param>
/// <param name="closeScopeAction">
/// The close scope action.
/// </param>
public DisposableScope(object key, Action<object> closeScopeAction)
{
this.key = key;
this.closeScopeAction = closeScopeAction;
}
/// <summary>
/// Disposes the scope.
/// </summary>
public void Dispose()
{
this.closeScopeAction(this.key);
}
}
}
Usage - within a HttpModule
private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();
using (await this.locker.LockAsync(cachedPath))
{
// Process and save a cached image.
}
Can anyone spot where I have gone wrong? I'm worried that I am misunderstanding something fundamental.
The full source for the library is stored on Github here
As the other answerer noted, the original code is removing the SemaphoreSlim
from the ConcurrentDictionary
before it releases the semaphore. So, you've got too much semaphore churn going on - they're being removed from the dictionary when they could still be in use (not acquired, but already retrieved from the dictionary).
The problem with this kind of "mapping lock" is that it's difficult to know when the semaphore is no longer necessary. One option is to never dispose the semaphores at all; that's the easy solution, but may not be acceptable in your scenario. Another option - if the semaphores are actually related to object instances and not values (like strings) - is to attach them using ephemerons; however, I believe this option would also not be acceptable in your scenario.
So, we do it the hard way. :)
There are a few different approaches that would work. I think it makes sense to approach it from a reference-counting perspective (reference-counting each semaphore in the dictionary). Also, we want to make the decrement-count-and-remove operation atomic, so I just use a single lock
(making the concurrent dictionary superfluous):
public sealed class AsyncDuplicateLock
{
private sealed class RefCounted<T>
{
public RefCounted(T value)
{
RefCount = 1;
Value = value;
}
public int RefCount { get; set; }
public T Value { get; private set; }
}
private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
= new Dictionary<object, RefCounted<SemaphoreSlim>>();
private SemaphoreSlim GetOrCreate(object key)
{
RefCounted<SemaphoreSlim> item;
lock (SemaphoreSlims)
{
if (SemaphoreSlims.TryGetValue(key, out item))
{
++item.RefCount;
}
else
{
item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
SemaphoreSlims[key] = item;
}
}
return item.Value;
}
public IDisposable Lock(object key)
{
GetOrCreate(key).Wait();
return new Releaser { Key = key };
}
public async Task<IDisposable> LockAsync(object key)
{
await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
return new Releaser { Key = key };
}
private sealed class Releaser : IDisposable
{
public object Key { get; set; }
public void Dispose()
{
RefCounted<SemaphoreSlim> item;
lock (SemaphoreSlims)
{
item = SemaphoreSlims[Key];
--item.RefCount;
if (item.RefCount == 0)
SemaphoreSlims.Remove(Key);
}
item.Value.Release();
}
}
}
For a given key,
- Thread 1 calls
GetOrAdd
and adds a new semaphore and acquires it viaWait
- Thread 2 calls
GetOrAdd
and gets the existing semaphore and blocks onWait
- Thread 1 releases the semaphore, only after having called
TryRemove
, which removed the semaphore from the dictionary - Thread 2 now acquires the semaphore.
- Thread 3 calls
GetOrAdd
for the same key as thread 1 and 2. Thread 2 is still holding the semaphore, but the semaphore is not in the dictionary, so thread 3 creates a new semaphore and both threads 2 and 3 access the same protected resource.
You need to adjust your logic. The semaphore should only be removed from the dictionary when it has no waiters.
Here is one potential solution, minus the async part:
public sealed class AsyncDuplicateLock
{
private class LockInfo
{
private SemaphoreSlim sem;
private int waiterCount;
public LockInfo()
{
sem = null;
waiterCount = 1;
}
// Lazily create the semaphore
private SemaphoreSlim Semaphore
{
get
{
var s = sem;
if (s == null)
{
s = new SemaphoreSlim(0, 1);
var original = Interlocked.CompareExchange(ref sem, null, s);
// If someone else already created a semaphore, return that one
if (original != null)
return original;
}
return s;
}
}
// Returns true if successful
public bool Enter()
{
if (Interlocked.Increment(ref waiterCount) > 1)
{
Semaphore.Wait();
return true;
}
return false;
}
// Returns true if this lock info is now ready for removal
public bool Exit()
{
if (Interlocked.Decrement(ref waiterCount) <= 0)
return true;
// There was another waiter
Semaphore.Release();
return false;
}
}
private static readonly ConcurrentDictionary<object, LockInfo> activeLocks = new ConcurrentDictionary<object, LockInfo>();
public static IDisposable Lock(object key)
{
// Get the current info or create a new one
var info = activeLocks.AddOrUpdate(key,
(k) => new LockInfo(),
(k, v) => v.Enter() ? v : new LockInfo());
DisposableScope releaser = new DisposableScope(() =>
{
if (info.Exit())
{
// Only remove this exact info, in case another thread has
// already put its own info into the dictionary
((ICollection<KeyValuePair<object, LockInfo>>)activeLocks)
.Remove(new KeyValuePair<object, LockInfo>(key, info));
}
});
return releaser;
}
private sealed class DisposableScope : IDisposable
{
private readonly Action closeScopeAction;
public DisposableScope(Action closeScopeAction)
{
this.closeScopeAction = closeScopeAction;
}
public void Dispose()
{
this.closeScopeAction();
}
}
}
Here is a KeyedLock
class that is less convenient and more error prone, but also less allocatey than Stephen Cleary's AsyncDuplicateLock
. It maintains internally a pool of SemaphoreSlim
s, that can be reused by any key after they are released by the previous key. The capacity of the pool is configurable, and by default is 10.
This class is not allocation-free, because the SemaphoreSlim
class allocates memory (quite a lot actually) every time the semaphore cannot be acquired synchronously because of contention.
The lock can be requested both synchronously and asynchronously, and can also be requested with cancellation and timeout. These features are provided by exploiting the existing functionality of the SemaphoreSlim
class.
public class KeyedLock<TKey>
{
private readonly Dictionary<TKey, (SemaphoreSlim, int)> _perKey;
private readonly Stack<SemaphoreSlim> _pool;
private readonly int _poolCapacity;
public KeyedLock(IEqualityComparer<TKey> keyComparer = null, int poolCapacity = 10)
{
_perKey = new Dictionary<TKey, (SemaphoreSlim, int)>(keyComparer);
_pool = new Stack<SemaphoreSlim>(poolCapacity);
_poolCapacity = poolCapacity;
}
public async Task<bool> WaitAsync(TKey key, int millisecondsTimeout,
CancellationToken cancellationToken = default)
{
var semaphore = GetSemaphore(key);
bool entered = false;
try
{
entered = await semaphore.WaitAsync(millisecondsTimeout,
cancellationToken).ConfigureAwait(false);
}
finally { if (!entered) ReleaseSemaphore(key, entered: false); }
return entered;
}
public Task WaitAsync(TKey key, CancellationToken cancellationToken = default)
=> WaitAsync(key, Timeout.Infinite, cancellationToken);
public bool Wait(TKey key, int millisecondsTimeout,
CancellationToken cancellationToken = default)
{
var semaphore = GetSemaphore(key);
bool entered = false;
try { entered = semaphore.Wait(millisecondsTimeout, cancellationToken); }
finally { if (!entered) ReleaseSemaphore(key, entered: false); }
return entered;
}
public void Wait(TKey key, CancellationToken cancellationToken = default)
=> Wait(key, Timeout.Infinite, cancellationToken);
public void Release(TKey key) => ReleaseSemaphore(key, entered: true);
private SemaphoreSlim GetSemaphore(TKey key)
{
SemaphoreSlim semaphore;
lock (_perKey)
{
if (_perKey.TryGetValue(key, out var entry))
{
int counter;
(semaphore, counter) = entry;
_perKey[key] = (semaphore, ++counter);
}
else
{
lock (_pool) semaphore = _pool.Count > 0 ? _pool.Pop() : null;
if (semaphore == null) semaphore = new SemaphoreSlim(1, 1);
_perKey[key] = (semaphore, 1);
}
}
return semaphore;
}
private void ReleaseSemaphore(TKey key, bool entered)
{
SemaphoreSlim semaphore; int counter;
lock (_perKey)
{
if (_perKey.TryGetValue(key, out var entry))
{
(semaphore, counter) = entry;
counter--;
if (counter == 0)
_perKey.Remove(key);
else
_perKey[key] = (semaphore, counter);
}
else
{
throw new InvalidOperationException("Key not found.");
}
}
if (entered) semaphore.Release();
if (counter == 0)
{
Debug.Assert(semaphore.CurrentCount == 1);
lock (_pool) if (_pool.Count < _poolCapacity) _pool.Push(semaphore);
}
}
}
Usage example:
var locker = new KeyedLock<string>();
await locker.WaitAsync("Hello");
try
{
await DoSomethingAsync();
}
finally
{
locker.Release("Hello");
}
The implementation uses tuple deconstruction, that requires at least C# 7.
The KeyedLock
class could be easily modified to become a KeyedSemaphore
, that would allow more than one concurrent operations per key. It would just need a maximumConcurrencyPerKey
parameter in the constructor, that would be stored and passed to the constructor of the SemaphoreSlim
s.
Note: The SemaphoreSlim
class when misused it throws a SemaphoreFullException
. This happens when the semaphore is released more times than it has been acquired. The KeyedLock
implementation of this answer behaves differently in case of misuse: it throws an InvalidOperationException("Key not found.")
. This happens because when a key is released as many times as it has been acquired, the associated semaphore is removed from the dictionary. If this implementation ever throw a SemaphoreFullException
, it would be an indication of a bug.