Zookeeper 是否有用于通知和等待的python库?

o3imoua4  于 2022-12-09  发布在  Apache
关注(0)|答案(2)|浏览(85)

我使用python-zookeeper进行锁定,并且尝试找出一种方法,使执行在监视文件时等待通知,因为zookeeper.exists()会立即返回,而不是阻塞。
基本上,我有下面列出的代码,但我不确定实现notify()wait_for_notification()函数的最佳方式。可以用os.kill()signal.pause()来实现,但我确信如果我以后在一个程序中有多个锁,这可能会导致问题--有没有一个特定的Python库适合这类事情?

def get_lock(zh):
    lockfile = zookeeper.create(zh,lockdir + '/guid-lock-','lock', [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL | zookeeper.SEQUENCE)

    while(True):
        # this won't work for more than one waiting process, fix later
        children = zookeeper.get_children(zh, lockdir)
        if len(children) == 1 and children[0] == basename(lockfile):
            return lockfile

        # yeah, there's a problem here, I'll fix it later
        for child in children:
            if child < basename(lockfile):
                break

        # exists will call notify when the watched file changes
        if zookeeper.exists(zh, lockdir + '/' + child, notify):
            # Process should wait here until notify() wakes it
            wait_for_notification()

def drop_lock(zh,lockfile):
    zookeeper.delete(zh,lockfile)

def notify(zh, unknown1, unknown2, lockfile):
    pass

def wait_for_notification():
    pass
oaxa6hgo

oaxa6hgo1#

Python线程化模块中的Condition变量可能非常适合您要做的事情:
http://docs.python.org/library/threading.html#condition-objects
我扩展了这个例子,使它更明显地说明如何根据您的目的来修改它:

#!/usr/bin/env python

from collections import deque
from threading import Thread,Condition

QUEUE = deque()

def an_item_is_available():
    return bool(QUEUE)

def get_an_available_item():
    return QUEUE.popleft()

def make_an_item_available(item):
    QUEUE.append(item)

def consume(cv):
    cv.acquire()
    while not an_item_is_available():
        cv.wait()
    print 'We got an available item', get_an_available_item()
    cv.release()

def produce(cv):
    cv.acquire()
    make_an_item_available('an item to be processed')
    cv.notify()
    cv.release()

def main():
    cv = Condition()
    Thread(target=consume, args=(cv,)).start()    
    Thread(target=produce, args=(cv,)).start()

if __name__ == '__main__':
    main()
xyhw6mcr

xyhw6mcr2#

我的回答可能与您的问题无关,但与问题标题有关。

from threading import Thread,Event

locker = Event()

def MyJob(locker):
    while True:
        #
        # do some logic here
        #
        locker.clear() # Set event state to 'False'
        locker.wait() # suspend the thread until event state is 'True'

worker_thread = Thread(target=MyJob, args=(locker,))
worker_thread.start()

#
# some main thread logic here
#
locker.set() # This sets the event state to 'True' and thus it resumes the worker_thread

更多信息,请访问:https://docs.python.org/3/library/threading.html#event-objects

相关问题