![]() |
Selbstgebauter Threadpool funktioniert nicht
Hi,
Habe mir mal gestern mal schnell ne Threadpool Klasse zusammengeschustert. Allerdings gibts da noch einige Probleme.
Delphi-Quellcode:
Das ist so ungefähr mein Grundgerüst. Ich dachte mir aber schon ganz zu Anfang, dass ich GetJob wohl synchronisieren muss, weil es schlecht wäre, wenn mehrere Threads gleichzeitig auf die Job-Queue zugreifen wollen.
procedure TThreadPoolThread.GetJob;
begin if FPool.FJobs.Count > 0 then FJob := FPool.FJobs.Pop else FJob := nil; end; procedure TThreadPoolThread.Execute; begin while not FDead do begin GetJob; if (Assigned(FOnWork)) and (Assigned(FJob)) then FOnWork(Self,FJob); sleep(1); end; end; Bei Synchronize stürzt der Thread ab. Aber soweit ich herausgefunden habe, ist Synchronize auch nicht das richtige in diesem Fall. Habe es dann mit ner CriticalSection probiert:
Delphi-Quellcode:
Das funktioniert soweit ich das mitbekommen habe pro Thread 1 mal und bei jedem weiteren mal bekomme ich ne AV.
procedure TThreadPoolThread.Execute;
var cs: TCriticalSection; begin cs := TCriticalSection.Create; while not FDead do begin cs.Enter; GetJob; cs.Leave; if (Assigned(FOnWork)) and (Assigned(FJob)) then FOnWork(Self,FJob); sleep(1); end; end; Weiß jetzt gerade nicht wie ich das machen muss :mrgreen: FJobs ist übrigens eine TObjectQueue (uses contnrs). Gruß Neutral General |
Re: Selbstgebauter Threadpool funktioniert nicht
Was du gerade tust ist so, als ob jeder einzelne Mensch für die Toilette sein eigenes besetzt-Schild mitbringt und das dann privat auf besetzt stellt. Das kann nicht funktionieren, oder? Du brauchst für den ganzen Pool eine Critical Section. Außerdem ist dein Polling eine ganz schlechte Idee. Nimm eine Semaphore. Oder wenn du es ganz korrekt machen willst: Verwende IO Completion Ports, da ist das gesamte Management schon eingebaut (und die Anzahl der aktiven Threads automatisch auf die CPU-Anzahl optimiert) und deine Liste kannst du dir dann auch schenken.
|
Re: Selbstgebauter Threadpool funktioniert nicht
Hi,
Wie verwendet man die IO Completion Ports APIs denn? Ich blicke da nicht ganz durch. |
Re: Selbstgebauter Threadpool funktioniert nicht
Eigentlich sind diese dafür gedacht, bei asynchronem IO Nachrichten über die Fertigstellung von Operationen zu bekommen. Du kannst aber auch manuell Nachrichten schicken. Die Benutzung ist simpel: Mit CreateIOCompletionPort einen Port erstellen, mit GetQueuedCompletionStatus in den Threadpool-Threads auf Nachrichten warten und von außen mit PostQueuedCompletionStatus Aufträge abschicken. Die Auftragsliste wird automatisch verwaltet, dabei hast du 12 Bytes Speicher pro Auftrag zur Verfügung, das sollte reichen, ohne dass du den Speichermanager bemühen musst.
Das geniale dabei ist, dass wie schon gesagt automatisch die Anzahl der laufenden Threads an die CPU-Zahl angepasst wird - bekanntermaßen sollten diese Zahlen gleich sein, damit weder eine CPU brach liegt noch der Scheduler unnötig belastet wird. Wenn nun ein Threadpool-Thread mit WaitForSingleObject o.Ä. auf ein Event wartet, wird der Completion Port automatisch informiert und ein weiterer Thread wird aus GetQueuedCompletionStatus freigelassen. So ein gutes Scheduling ist sonst absolut unmöglich, weil du nicht merkst, wenn ein Thread anfängt zu warten. |
Re: Selbstgebauter Threadpool funktioniert nicht
Hi,
Habs jetzt folgendermaßen aufgebaut:
Delphi-Quellcode:
Das klappt jetzt prinzipiell schon... Problem ist nur, dass es mehrere Jobs gibt, die ein und diesselbe Objektinstanz repräsentieren, die aber mehr oder weniger gleichzeitig von meinen Threads im Threadpool bearbeitet werden. Und dann kracht es -.-
procedure TThreadPool.AddJob(AJob: TObject);
begin PostQueuedCompletionStatus(FIOComPort,SizeOf(TObject),Cardinal(AJob),nil); end; procedure TThreadPoolThread.GetJob; var bytes: Cardinal; over: POverlapped; ajob: Cardinal; begin if GetQueuedCompletionStatus(FIOComPort,bytes,AJob,over,0) then FJob := TObject(AJob) else FJob := nil; end; procedure TThreadPoolThread.Execute; begin while not FDead do begin GetJob; if (Assigned(FOnWork)) and (Assigned(FJob)) then FOnWork(Self,FJob); sleep(1); end; end; Wenn du noch irgendwelche Infos dazu brauchst, dann sag mir ruhig Bescheid. Danke auf jeden Fall für die Hilfe bisher ;) |
Re: Selbstgebauter Threadpool funktioniert nicht
Hör doch bitte mit dem Polling auf. Setze in GetQueuedCompletionStatus INFINITE als letzten Parameter und lass das Sleep(1) weg. Sonst verbrätst du vollkommen unnötig CPU-Zeit. Dann musst du allerdings eine spezielle Stopp-Nachricht einführen, wofür du eines der ungenutzten Informations-Felder verwenden kannst. Ferner steht auch dwNumberOfBytesTransferred zu deiner freien Verfügung, du musst also nicht SizeOf(TObject) dort einsetzen.
Dein Problem, dass mehrere gleiche Jobs gleichzeitig abgearbeitet werden, ist eigentlich keines. Der Threadpool sollte sich um solche Aufgaben nicht kümmern. Wenn die Jobs nicht gleichzeitig abgearbeitet werden können, muss das der Code regeln, der den Threadpool nutzt, nicht der Threadpool selbst. Am besten erledigst du das wohl, indem der Job sich selbst noch einmal in die Liste einfügt, wenn er abgearbeitet wurde. |
Re: Selbstgebauter Threadpool funktioniert nicht
Hi,
Also ich bins nochmal. Hätte jetzt theoretisch ne Idee wie ich mein Problem lösen könnte aber ich glaube es wäre nicht der richtige Weg. Ich habe das Gefühl, dass ich die Sache mit den IO Completion Ports nicht ganz verstanden habe. Für mich sind das irgendwie nur von Windows verwaltete ![]() Wofür dieser letzte Parameter? Anzahl der Threads. Was geschieht denn da im Hintergrund? |
Re: Selbstgebauter Threadpool funktioniert nicht
IO Completion Ports sind Warteschlangen, genau. Der letzte Parameter steuert genau das spezielle Scheduling, was ich weiter oben bereits erläutert habe:
Zitat:
Könntest du dein Problem etwas genauer erläutern? |
Re: Selbstgebauter Threadpool funktioniert nicht
Also es geht um ein Serverprogramm.
Ich habe von TSocketServer (uses Sockets) abgeleitet und möchte jetzt die von den einzelnen Clients einkommenden Daten asynchron verarbeiten. Dafür soll das ServerSocket mit einem Threadpool ausgestattet werden. Es wird laufend geprüft ob Daten von den einzelnen Clients verfügbar ist. Wenn das der Fall ist, wird das ClientSocket in eine Warteschlange eingereiht und der Threadpool empfängt und verarbeitet diese Daten dann. Soweit mein Plan. Dafür hatte/wollte ich zuerst meine eigene Threadpool Klasse. Jetzt die Threadpool Klasse in Verbindung mit den IO Completion Ports. Aber ich glaube ich bin da aufgrund eines oder mehrerer Misverständnisse aufm Holzweg. |
Re: Selbstgebauter Threadpool funktioniert nicht
Zunächst einmal würde ich nur einen Threadpool instanziieren, nicht einen für jeden Socket. Dazu solltest du dann auch den Threadpool etwas verändern, sodass du nicht mehr ein OnWork-Ereignis hast, sondern einfach als Job einen Methodenzeiger übergibst (evtl. mit einem weitern Kontext-Parameter), der dann ausgeführt wird. Mit dem zusätzlichen Parameter nutzt du dann auch die 12 Byte voll aus.
Zum zweiten würde ich für jeden ClientSocket ein Flag Queued einführen. Dieses setzt du auf True, wenn du ihn in die Warteschlange stellst, und am Ende der Abarbeitungsroutine wieder auf False. So vermeidest du, dass ein Socket mehrfach in die Warteschlange gesetzt wird. Könntest du noch etwas genauer sagen, wo genau du die Daten abrufst und wie du auf neue Daten prüfst? |
Alle Zeitangaben in WEZ +1. Es ist jetzt 07:46 Uhr. |
Powered by vBulletin® Copyright ©2000 - 2025, Jelsoft Enterprises Ltd.
LinkBacks Enabled by vBSEO © 2011, Crawlability, Inc.
Delphi-PRAXiS (c) 2002 - 2023 by Daniel R. Wolf, 2024-2025 by Thomas Breitkreuz