как лучше организовать работу python3 с sqlite из потоков?

Discussion in 'PHP' started by Дикс, 19 Jul 2011.

  1. Дикс

    Дикс Elder - Старейшина

    Joined:
    16 Apr 2006
    Messages:
    1,194
    Likes Received:
    227
    Reputations:
    26
    на данный момент ситуация такая:
    - главный поток подключается к sqlite3 и использует 1 файл базы.
    - запускается Н потоков, они все подключаются к этому же sqlite-файлу и вносят в него изменения

    сначала пробовал один раз подключиться к базе, а потом юзать соединение из потоков
    sqlite запрещает - пишет что нельзя использовать базу, подключенную в другом потоке

    потом стал создавать новые подключения в каждом потоке
    начал глючить, типа база залочена
    решилось использованием Lock()

    теперь проблема в том, что когда потоков много - лезет ошибка "unable to open database file"

    попробовал сделать очередь Queue, так чтобы потоки кидали в неё задания на изменение базы, а главный поток эти задания обрабатывал и сам работал с базой

    и вот в чем проблема: делать всякие insert, update удобно, но как получать данные из базы в потоки?

    например в базе есть таблица tasks и 100 потоков берут из таблицы рандом-строки, выполняют их и помечают в базе как выполненные

    вариант читать таблицу в память и брать оттуда не подходит - таблиц много, размеры большие, да и теряются преимущества базы по сортировке, выборке - я также мог бы грузить в память текстовый файл.


    т.е получается, для того чтобы брать из базы рандом строки потоками - все же требуется в каждом потоке создавать подключение к файлу базы
    что нам дает "unable to open database file" если потоков много

    можете предложить иной способ работы с бд?
     
  2. Gifts

    Gifts Green member

    Joined:
    25 Apr 2008
    Messages:
    2,494
    Likes Received:
    807
    Reputations:
    614
    Дикс если берете случайную строку - то организовать очередь заданий, в которую поток работающий с БД будет по мере освобождения их записывать. Например, если делать неявно:
    Code:
    DB_QUEUE = Queue.Queue()
    
    class queue_with_db(Queue.Queue):
        def task_done(self):
            Queue.Queue.task_done(self)
            DB_QUEUE.put(('GET_ONE', 0))
    USERS_QUEUE = queue_with_db()
    
    ........
    class db_class(threading.Thread):
        def _get_one(self, parsed=0):
            data = self.curs.execute('SELECT user_id FROM queue WHERE parsed=? LIMIT 1', (parsed, )).fetchone()
            if data:
                USERS_QUEUE.put(data[0])
    
        def run(self):
            self._init_db()
            data = self.curs.execute('SELECT user_id FROM queue WHERE parsed<=0 LIMIT 20')
            if data:
                for row in data.fetchall():
                    USERS_QUEUE.put(row[0])
            while 1:
                task = DB_QUEUE.get()
                if task[0] == 'ADD_ONE':
                    self._add_one(task[1])
                elif task[0] == 'GET_ONE':
                    self._get_one(task[1])
    
    
    Каждый раз когда воркеры будут уведомлять о завершении задания (метод task_done()) - поток с базой данных будет записывать новое задание в очередь. Можно улучшить, например: изначалально записывать в очередь 50 заданий, а потом когда становится меньше 25 - записывать еще 25 в очередь.

    Или можно явно в обработчиках отправлять в очередь запрос.

    Если нужно получить определенный результат, то имхо - та же очередь + потокобезопасный словарь. В очередь кидаем запрос селекта + уникальный идентификатор, а потом опрашиваем словарь о готовности данных по этому идентификатору.
     
    _________________________
    2 people like this.