1.介绍

dogpile是python缓存的开源库,openstack项目在使用 官方文档 https://dogpilecache.sqlalchemy.org/en/latest/

dogpile provides the concept of a “dogpile lock”, a control structure which allows a single thread of execution to be selected as the “creator” of some resource, while allowing other threads of execution to refer to the previous version of this resource as the creation proceeds; if there is no previous version, then those threads block until the object is available

dogpile实现了一个锁的机制,一个控制流只容许一个单线程执行资源的create操作 并发的情况下,如果资源有之前的旧版本,则其他线程去获取旧版本的资源,否则阻塞直到资源可用为止

1.支持不同的缓存backend 比如file/memory/memecached/redis 2.默认使用pickle序列化对象到缓存backend

2.存储数据结构

dogpile实际序列化到backend的数据分为 payload和metadata

payload为实际有效数据 metadata包含ct(当前时间戳), v(dogpile版本区分)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CachedValue(tuple):
    """Represent a value stored in the cache.
    :class:`.CachedValue` is a two-tuple of
    ``(payload, metadata)``, where ``metadata``
    is dogpile.cache's tracking information (
    currently the creation time).  The metadata
    and tuple structure is pickleable, if
    the backend requires serialization.
	
    """
    payload = property(operator.itemgetter(0))
    """Named accessor for the payload."""
	
    metadata = property(operator.itemgetter(1))
    """Named accessor for the dogpile.cache metadata dictionary."""
	
    def __new__(cls, payload, metadata):
        return tuple.__new__(cls, (payload, metadata))
    
def _value(self, value):
    """Return a :class:`.CachedValue` given a value."""
    return CachedValue(
        value,
        {
            "ct": time.time(),
            "v": value_version
        })

def set(self, key, value):
    """Place a new value in the cache under the given key."""

    if self.key_mangler:
        key = self.key_mangler(key)
    self.backend.set(key, self._value(value))

3.redis key生成规则

默认提供了根据func结合args或者func结合kwargs参数,生成redis key的规则

4.过期时间比较

1.dogpile无法主动失效缓存 只设置region的expire_time并不会将backend中的key失效 只是在获取到value的时候, 根据value的ct和当前时间作一个比较 2.缓存set的时候, 可以设置过期时间,如果不设置,以region的过期时间为准 3.如果backend使用redis的话, 可以设置redis_expiration_time 以后set key的时候,redis会设置过期时间 4.region失效分为 软失效和硬失效 软失效 可以返回过期数据 硬失效 返回NO_VALUE

5.并发锁的设计

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def _enter(self):
  value_fn = self.value_and_created_fn

  # 尝试去获取缓存
  try:
      value = value_fn()
      value, createdtime = value
  except NeedRegenerationException:
      # 不存在,需要重新生成
      log.debug("NeedRegenerationException")
      value = NOT_REGENERATED
      createdtime = -1

  generated = self._enter_create(value, createdtime)

  if generated is not NOT_REGENERATED:
      generated, createdtime = generated
      return generated
  elif value is NOT_REGENERATED:
      # we called upon the creator, and it said that it
      # didn't regenerate.  this typically means another
      # thread is running the creation function, and that the
      # cache should still have a value.  However,
      # we don't have a value at all, which is unusual since we just
      # checked for it, so check again (TODO: is this a real codepath?)
      try:
          value, createdtime = value_fn()
          return value
      except NeedRegenerationException:
          raise Exception(
              "Generation function should "
              "have just been called by a concurrent "
              "thread.")
  else:
      return value

def _enter_create(self, value, createdtime):
  # 根据创建时间判断是否过期
  if not self._is_expired(createdtime):
      # 没有过期, 不需要重新生成
      return NOT_REGENERATED

  _async = False

  # 通过createdtime判断 缓存中是否有值
  if self._has_value(createdtime):
      has_value = True
      # 缓存中有值, 且已经过期
      # 获取锁, 如果获取到了, 则重新生成
      # 缓存有值&过期的话,不阻塞,直接返回旧版数据
      if not self.mutex.acquire(False):
          log.debug(
              "creation function in progress "
              "elsewhere, returning")
          return NOT_REGENERATED
  else:
      # 缓存中没有值
      has_value = False
      log.debug("no value, waiting for create lock")
      # 获取锁, 如果获取到了, 则重新生成
      # 缓存没值,并发获取锁,如果没有获取,则一直阻塞
      self.mutex.acquire()

  try:
      log.debug("value creation lock %r acquired" % self.mutex)
      # 缓存中没有值
      if not has_value:
          # 重新getter, 检查是否其他线程已经重新生成了新值
          # we entered without a value, or at least with "creationtime ==
          # 0".   Run the "getter" function again, to see if another
          # thread has already generated the value while we waited on the
          # mutex,  or if the caller is otherwise telling us there is a
          # value already which allows us to use async regeneration. (the
          # latter is used by the multi-key routine).
          try:
              value, createdtime = self.value_and_created_fn()
          except NeedRegenerationException:
              # 当前没有其他线程生成
              # nope, nobody created the value, we're it.
              # we must create it right now
              pass
          else:
              has_value = True
              # 缓存中已经值(被其他线程重新生成),再次判断过期时间, 如果过期也需要重新生成
              # caller is telling us there is a value and that we can
              # use async creation if it is expired.
              if not self._is_expired(createdtime):
                  # it's not expired, return it
                  log.debug("Concurrent thread created the value")
                  return value, createdtime
	
              # otherwise it's expired, call creator again
	  # 如果当前缓存有值 并且async_creator存在
	  # async_creator如果指定, 则做一些额外的操作
      if has_value and self.async_creator:
          # we have a value we can return, safe to use async_creator
          log.debug("Passing creation lock to async runner")

          # so...run it!
          self.async_creator(self.mutex)
          _async = True

          # and return the expired value for now
          return value, createdtime

      # it's expired, and it's our turn to create it synchronously, *or*,
      # there's no value at all, and we have to create it synchronously
      log.debug(
          "Calling creation function for %s value",
          "not-yet-present" if not has_value else
          "previously expired"
      )
	    # 需要重新生成
      return self.creator()
  finally:
      if not _async:
          # 释放锁
          self.mutex.release()
          log.debug("Released creation lock")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_value():
    value = self.backend.get(key)
    # 如果是NO_VALUE,或者dogpile版本不一致 或者当前region硬失效,则需要重新生成
    if (value is NO_VALUE or value.metadata['v'] != value_version or
            self.region_invalidator.is_hard_invalidated(
                value.metadata["ct"])):
        raise NeedRegenerationException()
    ct = value.metadata["ct"]
    # 判断region软失效
    if self.region_invalidator.is_soft_invalidated(ct):
        ct = time.time() - expiration_time - .0001

    return value.payload, ct

def gen_value():
    if creator_args:
        created_value = creator(*creator_args[0], **creator_args[1])
    else:
        created_value = creator()
    # 加入metadata,前面提到的ct, v
    value = self._value(created_value)
   
    # 判断是否需要缓存,默认是需要
    if not should_cache_fn or \
            should_cache_fn(created_value):
        # backend设置缓存
        self.backend.set(key, value)

    return value.payload, value.metadata["ct"]

6.sqlchemy与dogpile结合

http://tutorials.jumpstartlab.com/topics/performance/digest_based_caching.html https://github.com/sqlalchemy/sqlalchemy/tree/master/examples/dogpile_caching

问题: pickle序列话python对象比较占内存 解决参考 https://github.com/jvanasco/dogpile_backend_redis_advanced