Pure-Ruby concurrent Hash

Solution 1:

Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.

It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end

Solution 2:

Posting base/naive solution, just to boost my Stack Overflow cred:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end

Solution 3:

Yehuda, I think you mentioned ivar setting was atomic? What about a simple copy and swap then?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end

Solution 4:

This is a wrapper class around Hash that allows concurrent readers, but locks things down for all other types of access (including iterated reads).

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

Here is the locking code it uses:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

The thread-aware lock is to allow you to lock the class once, and then call methods that would normally lock, and have them not lock. You need this because you yield into blocks inside some methods, and those blocks can call locking methods on the object, and you don't want a deadlock or a double-lock error. You could use a counting lock instead for this.

Here's an attempt to implement bucket-level read-write locks:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

I stopped working on this because it's too slow, so the each method is unsafe (allows mutations by other threads during an iteration) and it doesn't support most hash methods.

And here's a test harness for concurrent hashes:

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
          :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
          each_write_work(hash, k, v)
        else
          each_read_work(k, v)
        end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

Numbers: Jruby

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

And MRI

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

MRI numbers are pretty striking. Locking in MRI really sucks.