Python Queue队列 优先级

文章展示了如何使用Python的queue.Queue和threading.Lock实现多线程生产者-消费者模式。通过自定义队列大小,用锁保证线程安全,创建多个线程从队列中按顺序取出数据,同时主线程向队列中批量添加数据,直到全部处理完毕。代码包含线程创建、加锁、非阻塞获取、队列大小控制等关键操作。

作者:zhuge···预计阅读 19 分钟·629 阅读·0 评论
Python Queue队列 优先级

实现了锁,并且自定义队列的大小,排列往队列中写入数据并且按先后顺序取出执行

import datetime
import threading
import queue
import time
from time import sleep

class MyThread(threading.Thread):     def init(self, threadId, name, q):         threading.Thread.init(self)         self.threadId = threadId         self.name = name         self.q = q

    def run(self):         print(f'{self.name}--start')         process_data(self.name, self.q)         print(f'{self.name}--exit')

def process_data(threadName, q):     while not exitFlag: #and not putFlag:         queueLock.acquire()         if not workQueue.empty():             data = q.get()             print(f'{threadName} -- > get -->  {data} ')             queueLock.release()         else:             queueLock.release()         sleep(0.01)

def addDataToQueue():     nameList = ['name' + str(i) for i in range(100000)]     workQueue.queue.clear()     print('put data start -->', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))     putFlag=True     startIndex = 0     while putFlag:         blank_count = workQueue.maxsize - workQueue.qsize()         if blank_count > 0:             endIndex = (startIndex + blank_count) if (startIndex + blank_count) < len(nameList) else len(nameList)             queueLock.acquire()             for x in range(startIndex, endIndex):                 workQueue.put(nameList[x])

                if (x >= len(nameList) - 1):                     print('put ok,wait -->', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))                     putFlag = False

            startIndex = x + 1             print('wait process -->', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))             queueLock.release()             time.sleep(0.01)

    print('put data end -->', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

def main():     global exitFlag     exitFlag = 0     global putFlag     threadListName = ['Th-' + str(i) for i in range(100)]     threads = []

    # 创建线程     for index, item in enumerate(threadListName, 1):         t = MyThread(index, item, workQueue)         t.start()         threads.append(t)

    # add_data = threading.Thread(target=addDataToQueue)     # add_data.start()     # threads.append(add_data)

    addDataToQueue()

    while not workQueue.empty(): pass     exitFlag = 1     for t in threads:         t.join()

    print('Exit All Thread')

if name == 'main':     queueLock = threading.Lock()     workQueue = queue.Queue(100)     main()


相关文章

评论

加载中...