Разработчици на C ++ се стремят да изградят стабилни многонишкови Qt приложения, но многопоточността никога не е била лесна при всички тези условия на състезание, синхронизация и мъртви блокировки. За ваш кредит, вие не се отказвате и се оказвате да измиете StackOverflow. Въпреки това избирането на правилното и работещо решение от дузина различни отговори е доста нетривиално, особено като се има предвид, че всяко решение има свои собствени недостатъци.
Многопоточността е широко разпространен модел за програмиране и изпълнение, който позволява съществуването на множество нишки в контекста на един процес. Тези нишки споделят ресурсите на процеса, но могат да се изпълняват независимо. Моделът за програмиране с резба предоставя на разработчиците полезна абстракция от едновременно изпълнение. Многопоточността може да се приложи и към един процес, за да се даде възможност за паралелно изпълнение в многопроцесорна система.
Целта на тази статия е да събере основните знания за едновременното програмиране с Qt рамката, по-специално най-неразбраните теми. Очаква се читателят да има предишен опит в Qt и C ++, за да разбере съдържанието.
QThreadPool
и QThread
Рамката Qt предлага много инструменти за многопоточност. Изборът на правилния инструмент може да бъде предизвикателство в началото, но всъщност дървото за вземане на решения се състои само от две опции: или искате Qt да управлява нишките вместо вас, или искате да управлявате нишките сами. Има обаче и други важни критерии:
Задачи, които не се нуждаят от цикъла на събитията. По-конкретно, задачите, които не използват механизъм сигнал / слот по време на изпълнението на задачата.
Употреба: QtConcurrent и QThreadPool + QRустройство .
Задачи, които използват сигнал / слотове и следователно се нуждаят от цикъла на събитията.
Използване: Работните обекти са преместени в + QThread .
Голямата гъвкавост на Qt рамката ви позволява да заобиколите проблема „липсващ цикъл на събития“ и да добавите такъв към QRunnable
:
class MyTask : public QObject, public QRunnable { Q_OBJECT public: void MyTask::run() { _loop.exec(); } public slots: // you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked... void finishTask() { _loop.exit(); } private: QEventLoop _loop; }
Опитайте се да избягвате подобни „заобикалящи мерки“, тъй като те са опасни и неефективни: ако една от нишките от пула от нишки (работещ MyTask) е блокирана поради изчакване на сигнал, тогава тя не може да изпълнява други задачи от пула.
Можете също така да стартирате QThread
без никакъв цикъл на събития, като замени QThread::run()
метод и това е напълно добре, стига да знаете какво правите. Например, не очаквайте метод quit()
да работи в такъв случай.
Представете си, че трябва да се уверите, че само един екземпляр на задача може да бъде изпълнен наведнъж и всички чакащи заявки за изпълнение на една и съща задача чакат на определена опашка. Това често е необходимо, когато дадена задача има достъп до ексклузивен ресурс, като писане в същия файл или изпращане на пакети чрез TCP сокет.
Да забравим за момент за компютърните науки и модела на производител-потребител и да помислим за нещо тривиално; нещо, което може лесно да се намери в реални проекти.
Наивно решение на този проблем може да бъде използването на QMutex
. Във функцията на задачата можете просто да придобиете mutex, ефективно сериализирайки всички нишки, опитващи се да стартират задачата. Това би гарантирало, че само една нишка в даден момент може да изпълнява функцията. Това решение обаче влияе върху представянето чрез въвеждане висока конкуренция проблем, защото всички тези нишки ще бъдат блокирани (в мютекса), преди да могат да продължат. Ако имате много нишки, които активно използват такава задача и извършват някаква полезна работа между тях, тогава всички тези нишки просто ще спят през повечето време.
void logEvent(const QString & event) { static QMutex lock; QMutexLocker locker(& lock); // high contention! logStream << event; // exclusive resource }
За да избегнем спор, имаме нужда от опашка и работник, който живее в собствената си нишка и обработва опашката. Това е почти класиката производител-потребител модел. Работникът ( консуматор ) би избрал заявки от опашката една по една и всяка продуцент може просто да добави своите заявки в опашката. Отначало звучи просто и може да помислите да използвате QQueue
и QWaitCondition
, но задръжте и нека видим дали можем да постигнем целта без тези примитиви:
QThreadPool
тъй като има опашка от чакащи задачиИли
QThread::run()
защото има QEventLoop
Първият вариант е да се използва QThreadPool
. Можем да създадем QThreadPool
екземпляр и използвайте QThreadPool::setMaxThreadCount(1)
. Тогава можем да използваме QtConcurrent::run()
да планирате заявки:
class Logger: public QObject { public: explicit Logger(QObject *parent = nullptr) : QObject(parent) { threadPool.setMaxThreadCount(1); } void logEvent(const QString &event) { QtConcurrent::run(&threadPool, [this, event]{ logEventCore(event); }); } private: void logEventCore(const QString &event) { logStream << event; } QThreadPool threadPool; };
Това решение има едно предимство: QThreadPool::clear()
ви позволява незабавно отмени всички чакащи заявки, например когато приложението ви трябва бързо да се изключи. Съществува обаче и съществен недостатък, който е свързан с нишка-афинитет : logEventCore
функцията вероятно ще се изпълнява в различни нишки от обаждане до повикване. И знаем, че Qt има някои класове, които изискват нишка-афинитет : QTimer
, QTcpSocket
и евентуално някои други.
Какво казва Qt spec за афинитета на нишките: таймерите, стартирани в една нишка, не могат да бъдат спрени от друга нишка. И само нишката, притежаваща екземпляр на сокет, може да използва този сокет. Това предполага, че трябва да спрете всички работещи таймери в нишката, която ги е стартирала, и трябва да извикате QTcpSocket :: close () в нишката, притежаваща сокета. И двата примера обикновено се изпълняват в деструктори.
По-доброто решение разчита на използването на QEventLoop
предоставено от QThread
. Идеята е проста: ние използваме механизъм сигнал / слот за издаване на заявки и цикълът на събитията, който се изпълнява вътре в нишката, ще служи като опашка, позволяваща да се изпълнява само един слот наведнъж.
// the worker that will be moved to a thread class LogWorker: public QObject { Q_OBJECT public: explicit LogWorker(QObject *parent = nullptr); public slots: // this slot will be executed by event loop (one call at a time) void logEvent(const QString &event); };
Прилагане на LogWorker
конструктор и logEvent
е ясен и следователно не е предоставен тук. Сега се нуждаем от услуга, която ще управлява нишката и работния екземпляр:
// interface class LogService : public QObject { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); ~LogService(); signals: // to use the service, just call this signal to send a request: // logService->logEvent('event'); void logEvent(const QString &event); private: QThread *thread; LogWorker *worker; }; // implementation LogService::LogService(QObject *parent) : QObject(parent) { thread = new QThread(this); worker = new LogWorker; worker->moveToThread(thread); connect(this, &LogService::logEvent, worker, &LogWorker::logEvent); connect(thread, &QThread::finished, worker, &QObject::deleteLater); thread->start(); } LogService::~LogService() { thread->quit(); thread->wait(); }
Нека обсъдим как работи този код:
QThread::finished
сигнал до deleteLater
слот. Също така свързваме метода на прокси LogService::logEvent()
до LogWorker::logEvent()
които ще използват Qt::QueuedConnection
режим поради различни нишки.quit
събитие в опашката на цикъла на събитията. Това събитие ще бъде обработено след всички други събития се обработват. Например, ако сме направили стотици logEvent()
обаждания непосредствено преди извикването на деструктора, регистраторът ще ги обработва всички, преди да извлече събитието за отказ. Това отнема време, разбира се, така че трябва wait()
докато цикълът на събитията излезе. Струва си да се спомене, че всички бъдещи заявки за регистриране са публикувани след събитието за отказ никога няма да бъде обработено.LogWorker::logEvent
) винаги ще се извършва в една и съща нишка, следователно този подход работи добре за класове, изискващи нишка-афинитет . В същото време, LogWorker
конструктор и деструктор се изпълняват в основната нишка (по-специално се изпълнява нишката LogService
) и следователно трябва да бъдете много внимателни относно кода, който изпълнявате там. По-конкретно, не спирайте таймери или използвайте сокети в деструктора на работника, освен ако не можете да стартирате деструктора в същата нишка!Ако вашият работник има работа с таймери или сокети, трябва да се уверите, че деструкторът се изпълнява в същата нишка (нишката, която сте създали за работника и където сте преместили работника). Очевидният начин да се подкрепи това е подкласът QThread
и delete
работник вътре QThread::run()
метод. Обмислете следния шаблон:
template class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { quit(); wait(); } TWorker worker() const { return _worker; } protected: void run() override { QThread::run(); delete _worker; } private: TWorker *_worker; };
Използвайки този шаблон, ние предефинираме LogService
от предишния пример:
// interface class LogService : public Thread { Q_OBJECT public: explicit LogService(QObject *parent = nullptr); signals: void **logEvent**(const QString &event); }; // implementation LogService::**LogService**(QObject *parent) : Thread(new LogWorker, parent) { connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent); }
Нека обсъдим как трябва да работи това:
LogService
да бъде QThread
обект, защото трябваше да приложим потребителския run()
функция. Използвахме частно подкласиране, за да предотвратим достъпа до функциите QThread
, тъй като искаме да контролираме вътрешния жизнен цикъл на нишката.Thread::run()
функция изпълняваме цикъла на събитията, като извикваме по подразбиране QThread::run()
внедряване и унищожете екземпляра на работника веднага след като цикълът на събития е излязъл. Имайте предвид, че деструкторът на работника се изпълнява в същата нишка.LogService::logEvent()
е прокси функцията (сигнал), която ще публикува събитието за регистриране в опашката на събитията на нишката.Друга интересна възможност е да можете да спрете и възобновите нашите персонализирани нишки. Представете си, че вашето приложение извършва някаква обработка, която трябва да бъде спряна, когато приложението бъде сведено до минимум, заключено или просто е загубило мрежовата връзка. Това може да бъде постигнато чрез изграждане на персонализирана асинхронна опашка, която ще задържи всички чакащи заявки, докато работникът не бъде възобновен. Тъй като обаче търсим най-лесните решения, ще използваме (отново) опашката на цикъла на събитията за същата цел.
За да спрем нишка, явно се нуждаем от нея, за да изчакаме при определени условия на изчакване. Ако нишката е блокирана по този начин, нейният цикъл на събития не обработва никакви събития и Qt трябва да постави keep в опашката. След като се възобнови, цикълът на събития ще обработва всички натрупани заявки. За условието на изчакване просто използваме QWaitCondition
обект, който също изисква QMutex
. За да проектираме родово решение, което може да бъде използвано повторно от всеки работник, трябва да поставим цялата логика за спиране / възобновяване в основен клас за многократна употреба. Нека го наречем SuspendableWorker
. Такъв клас трябва да поддържа два метода:
suspend()
би било блокиращо повикване, което задава нишката, която чака в състояние на изчакване. Това ще стане чрез публикуване на заявка за спиране в опашката и изчакване, докато бъде обработена. Почти подобно на QThread::quit()
+ wait()
.resume()
би сигнализирал условието за изчакване да събуди спящата нишка, за да продължи изпълнението си.Нека да прегледаме интерфейса и изпълнението:
// interface class SuspendableWorker : public QObject { Q_OBJECT public: explicit SuspendableWorker(QObject *parent = nullptr); ~SuspendableWorker(); // resume() must be called from the outer thread. void resume(); // suspend() must be called from the outer thread. // the function would block the caller's thread until // the worker thread is suspended. void suspend(); private slots: void suspendImpl(); private: QMutex _waitMutex; QWaitCondition _waitCondition; };
// implementation SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) { _waitMutex.lock(); } SuspendableWorker::~SuspendableWorker() { _waitCondition.wakeAll(); _waitMutex.unlock(); } void SuspendableWorker::resume() { _waitCondition.wakeAll(); } void SuspendableWorker::suspend() { QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl); // acquiring mutex to block the calling thread _waitMutex.lock(); _waitMutex.unlock(); } void SuspendableWorker::suspendImpl() { _waitCondition.wait(&_waitMutex); }
Не забравяйте, че спряната нишка никога няма да получи quit
събитие. Поради тази причина не можем да използваме това безопасно с ванилия QThread
освен ако не възобновим нишката преди публикуване на quit. Нека интегрираме това в нашия обичай Thread
шаблон, за да стане куршумоустойчив.
template class Thread : QThread { public: explicit Thread(TWorker *worker, QObject *parent = nullptr) : QThread(parent), _worker(worker) { _worker->moveToThread(this); start(); } ~Thread() { resume(); quit(); wait(); } void suspend() { auto worker = qobject_cast(_worker); if (worker != nullptr) { worker->suspend(); } } void resume() { auto worker = qobject_cast(_worker); if (worker != nullptr) { worker->resume(); } } TWorker worker() const { return _worker; } protected: void run() override { QThread::*run*(); delete _worker; } private: TWorker *_worker; };
С тези промени ще възобновим нишката, преди да публикуваме събитието за отказ. Също така, Thread
все още позволява да се предаде всякакъв вид работник, независимо дали е SuspendableWorker
или не.
Използването ще бъде както следва:
LogService logService; logService.logEvent('processed event'); logService.suspend(); logService.logEvent('queued event'); logService.resume(); // 'queued event' is now processed.
Това е често неразбрана тема. Повечето хора вярват, че volatile
променливите могат да се използват за обслужване на определени флагове, достъпни от множество нишки, и това запазва от условията на състезанието за данни. Това е невярно и QAtomic*
за тази цел трябва да се използват класове (или std::atomic
).
Нека разгледаме реалистичен пример: a TcpConnection
клас на свързване, който работи в специална нишка, и ние искаме този клас да експортира метод, безопасен за нишки: bool isConnected()
. Вътрешно класът ще слуша събития в сокета: connected
и disconnected
за поддържане на вътрешен булев флаг:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: // this is not thread-safe! bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: bool _connected; }
Осъществяване _connected
член volatile
няма да реши проблема и няма да направи isConnected()
безопасно за конци. Това решение ще работи в 99% от случаите, но останалите 1% ще направят живота ви кошмарен. За да поправим това, трябва да защитим променливия достъп от множество нишки. Нека използваме QReadWriteLocker
за тази цел:
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { QReadLocker locker(&_lock); return _connected; } private slots: void handleSocketConnected() { QWriteLocker locker(&_lock); _connected = true; } void handleSocketDisconnected() { QWriteLocker locker(&_lock); _connected = false; } private: QReadWriteLocker _lock; bool _connected; }
Това работи надеждно, но не толкова бързо, колкото използването на атомни операции без заключване. Третото решение е едновременно бързо и безопасно за нишки (примерът използва std::atomic
вместо QAtomicInt
, но семантично те са идентични):
// pseudo-code, won't compile class TcpConnection : QObject { Q_OBJECT public: bool isConnected() const { return _connected; } private slots: void handleSocketConnected() { _connected = true; } void handleSocketDisconnected() { _connected = false; } private: std::atomic _connected; }
В тази статия обсъдихме няколко важни притеснения относно едновременното програмиране с Qt рамката и проектирахме решения за справяне с конкретни случаи на употреба. Не сме разглеждали много от простите теми като използване на атомни примитиви, брави за четене / запис и много други, но ако се интересувате от тях, оставете коментара си по-долу и поискайте такъв урок.
Ако се интересувате от проучване на Qmake, също наскоро публикувах Жизненоважното ръководство за Qmake . Това е страхотно четиво!
Този многопоточен модел предоставя на разработчиците полезна абстракция за едновременно изпълнение. Въпреки това, той наистина блести, когато се прилага към един процес: позволява паралелно изпълнение в многопроцесорна система.
Многопоточността е модел за програмиране и изпълнение, който позволява съществуването на множество нишки в контекста на един процес. Тези нишки споделят ресурсите на процеса, но могат да се изпълняват независимо.
Приложение, което е изградено с Qt framework. Qt приложенията често се изграждат с C ++, тъй като самата рамка е изградена с C ++. Има обаче и други езикови обвързвания като Python-Qt.
Задача, базирана на едновременност чрез използване на QThreadPool и QRunnable, и програмиране на нишки с помощта на клас QThread.
Най-често използваните са QMutex, QSemaphore и QReadWriteLock. Има и атомни операции без заключване, осигурени от класовете QAtomic *.