AGB  ·  Datenschutz  ·  Impressum  







Anmelden
Nützliche Links
Registrieren
Zurück Delphi-PRAXiS Programmierung allgemein Netzwerke IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?
Thema durchsuchen
Ansicht
Themen-Optionen

IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

Offene Frage von "jaenicke"
Ein Thema von Poolspieler · begonnen am 2. Feb 2015 · letzter Beitrag vom 3. Feb 2015
Antwort Antwort
Poolspieler

Registriert seit: 9. Aug 2004
165 Beiträge
 
Delphi 10.3 Rio
 
#1

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 19:30
Hallo jaenicke,
das ist auch ein guter Ansatz.
Aber woher weiß der Empfangsthread, ob der verarbeitende Thread die Daten schon abgeholt hat? Stichwort "Pufferüberlauf"...
Das aktualisieren der Pointer muss aber trotzdem synchronisiert erfolgen oder?
Oder sind Pointer threadsave?

Gruß,

Poolspieler
Andreas
  Mit Zitat antworten Zitat
Benutzerbild von Sir Rufo
Sir Rufo

Registriert seit: 5. Jan 2005
Ort: Stadthagen
9.454 Beiträge
 
Delphi 10 Seattle Enterprise
 
#2

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 19:58
Hier mal so ein grober Entwurf, der aber schon tut

UPDATE
Eine kurze Erklärung

Der TMessageQueueThread<T> ist selber nicht threadsafe und nur dazu gedacht von einem einzigen Thread gefüttert zu werden. Sollen die Nachrichten von mehreren Threads zusammengeführt werden, so wird zusätzlich noch eine weitere threadsafe Queue benötigt, die dann von den TMessageQueueThread gefüttert wird, jetzt aber entkoppelt vom eigentlichen Tread, der die Nachrichten produziert.

Jede Art der Synchronisierung würde den eigentlichen Thread wieder ausbremsen und das gilt es hier ja ausdrücklich zu vermeiden.

Delphi-Quellcode:
unit Unit1;

interface

uses
  System.SysUtils,
  System.Classes,
  System.SyncObjs,
  System.Generics.Collections;

type
  TMessageQueueThread<T> = class( TThread )
  private
    FQueueProc: TProc<T>;
    FInQueue, FOutQueue: TQueue<T>;
    FEvent: TEvent;
  protected
    procedure Execute; override;
    procedure DoProcessQueue( AQueue: TQueue<T> );
    procedure TerminatedSet; override;
  public
    constructor Create( AQueueProc: TProc<T> );
    destructor Destroy; override;

    procedure Enqueue( const AItem: T );
  end;

  TReceiveThread = class( TThread )
  private
    FMessageQueue: TMessageQueueThread<string>;
    FFMTset: TFormatSettings;
  protected
    procedure Execute; override;
  public
    constructor Create( AProc: TProc<string> );
    destructor Destroy; override;
  end;

implementation

{ TRelayQueueThread<T> }

constructor TMessageQueueThread<T>.Create( AQueueProc: TProc<T> );
begin
  inherited Create( False );
  FEvent := TEvent.Create( nil, False, False, '' );
  FQueueProc := AQueueProc;
  FInQueue := TQueue<T>.Create;
  FOutQueue := TQueue<T>.Create;
end;

destructor TMessageQueueThread<T>.Destroy;
begin
  inherited;
  FInQueue.Free;
  FOutQueue.Free;
  FEvent.Free;
end;

procedure TMessageQueueThread<T>.DoProcessQueue( AQueue: TQueue<T> );
begin
  while AQueue.Count > 0 do
    begin
      FQueueProc( AQueue.Peek );
      AQueue.Dequeue;
    end;
end;

procedure TMessageQueueThread<T>.Enqueue( const AItem: T );
begin
  FInQueue.Enqueue( AItem );
  FEvent.SetEvent;
end;

procedure TMessageQueueThread<T>.Execute;
begin
  inherited;
  while not Terminated do
    begin
      FEvent.WaitFor( );

      if Terminated
      then
        Exit;

      // Queues tauschen
      FOutQueue := TInterlocked.Exchange < TQueue < T >> ( FInQueue, FOutQueue );
      // Queue verarbeiten
      DoProcessQueue( FOutQueue );
      // Queue leeren, nur für alle Fälle, sollte ja eh jetzt leer sein :o)
      FOutQueue.Clear;
    end;
end;

procedure TMessageQueueThread<T>.TerminatedSet;
begin
  inherited;
  FEvent.SetEvent;
end;

{ TReceiveThread }

constructor TReceiveThread.Create( AProc: TProc<string> );
begin
  inherited Create( False );
  FFMTset := TFormatSettings.Create( '' );
  FMessageQueue := TMessageQueueThread<string>.Create( AProc );
end;

destructor TReceiveThread.Destroy;
begin
  inherited;
  FMessageQueue.Free;
end;

procedure TReceiveThread.Execute;
var
  LMsg: string;
begin
  inherited;
  while not Terminated do
    begin
      LMsg := FormatDateTime( 'hh:nn:ss.zzz', Now, FFMTset );
      FMessageQueue.Enqueue( LMsg );
    end;
end;

end.
Der ReceiveThread ist jetzt nur zum Testen des MessageQueueThread da

Und der Test mit der Synchronisierung
Delphi-Quellcode:
unit Form.Main;

interface

uses
  Unit1,

  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls;

type
  TForm1 = class( TForm )
    Button1: TButton;
    ListBox1: TListBox;
    procedure Button1Click( Sender: TObject );
  private
    FTestThread: TReceiveThread;
    procedure ReceiveMessage( Arg: string );
  public
    { Public-Deklarationen }
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

procedure TForm1.Button1Click( Sender: TObject );
begin
  if Assigned( FTestThread )
  then
    FreeAndNil( FTestThread )
  else
    FTestThread := TReceiveThread.Create( ReceiveMessage );
end;

procedure TForm1.ReceiveMessage( Arg: string );
begin
  TThread.Synchronize( nil,
      procedure
    begin
      ListBox1.Items.Add( Arg + ' - ' + FormatDateTime( 'hh:nn:ss.zzz', Now ) );
    end );
end;

end.
Und nach dem Druck auf den Button, relativ schnell wieder den Button drücken, der Thread haut dir gnadenlos die ListBox voll
Kaum macht man's richtig - schon funktioniert's
Zertifikat: Sir Rufo (Fingerprint: ‎ea 0a 4c 14 0d b6 3a a4 c1 c5 b9 dc 90 9d f0 e9 de 13 da 60)

Geändert von Sir Rufo ( 2. Feb 2015 um 20:17 Uhr)
  Mit Zitat antworten Zitat
OlafSt

Registriert seit: 2. Mär 2007
Ort: Hamburg
284 Beiträge
 
Delphi 10.2 Tokyo Professional
 
#3

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 22:31
Ich habe so ein ähnliches Problem vor geraumer Zeit gehabt. Als erstes habe ich die Indy-UDP-Komponenten entsorgt und eine Dritt-Implementation von UDP eingesetzt (UdpSockUtil, zu finden auf entwickler-ecke.de).

Im Empfängerthread wird dann eine TQueue mit den Empfangsdaten gefüllt. Nach jedem Empfang wird mit Postmessage der Hauptthread verständigt. Dieser wiederum liest dann solange Daten aus der Queue, bis diese leer ist.

Ich habe die Synchronisation manuell mit TCriticalSections gemacht, so das ich exakte Kontrolle darüber habe, wann wer wie wo was liest oder schreibt - vor allem blockiere ich so die Queue nur für den einzelnen Lese/Schreibvorgang und gebe sie sofort wieder "frei".

Falls die Windows-Messagequeue überzulaufen beginnt, kann man die Postmessages einfach auf jeden zweiten/dritten Eintrag begrenzen. Der Hauptthread arbeitet dann eh alles ab.
  Mit Zitat antworten Zitat
Poolspieler

Registriert seit: 9. Aug 2004
165 Beiträge
 
Delphi 10.3 Rio
 
#4

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 22:42
Hallo OlafSt,
ich werde mir Deinen Ansatz genau anschauen - und das für mich zutreffende umsetzen. Danke!

Welches Argument spricht gegen Indy?
Bzw. welche Vorteile bietet UdpSockUtil?

Ich persönlich (aber das ist nur meine Meinung...) versuche immer so viel wie möglich mit den Bordmitteln von Delphi zu programmieren.
Ich nutze z.B. TChart Prof. Und da fängt das Problem schon an. Bei jeder neuen Delphiversion muss ich erstmal ein bis zwei Monate warten, bis es eine lauffähige Version von Steema für die neue Delphiversion gibt.
Für manche Projekte habe ich auch schon die TMS-Komponenten genutzt - da ist es nicht viel anders.

Deshalb versuche ich neue (und auch laufende) Projekte möglichst nicht mit Drittanbietern zu "belasten".

Viele Grüße,

Poolspieler
Andreas
  Mit Zitat antworten Zitat
OlafSt

Registriert seit: 2. Mär 2007
Ort: Hamburg
284 Beiträge
 
Delphi 10.2 Tokyo Professional
 
#5

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 23:30
Die XE4-Implementation der Indy-UDP-Komponenten hat ne Macke. Habe das nicht weiter verfolgt (mir war das Fixen, neukompilieren und neu-Einbinden einfach zu kompliziert und ich hatte reichtlich Termindruck) und das ganze auf UdpSockUtil ungestellt. Die lassen sich problemlos von Version zu Version mitnehmen und gepflegt werden die auch noch

UdpSockUtils arbeitet direkt mit Sockets, direkter am System gehts kaum noch. Womit sich auch das mitschleppen dicker Units erübrigt. Aber - das ist jedem selbst überlassen.

Ansonsten bin ich da ganz bei dir - so wenig Fremdkomponenten wie möglich. Nur: Wirklich toll aussehende Anwendungen kriegt man damit nicht hin
  Mit Zitat antworten Zitat
Poolspieler

Registriert seit: 9. Aug 2004
165 Beiträge
 
Delphi 10.3 Rio
 
#6

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 2. Feb 2015, 23:40
Hallo,
ich gebe Dir in allen Punkten Recht.
Was mit XE4 und Indy war, weiß ich nicht mehr. Aber irgend ein Update gab es damals... Aktuell nutze ich XE7.

Ich werde mein Glück mit Indy noch etwas weiter versuchen (weil es ja prinzipiell schon mal damit läuft) - optional kann ich mir mal UdpSockUtils anschauen.

Zitat:
Ansonsten bin ich da ganz bei dir - so wenig Fremdkomponenten wie möglich. Nur: Wirklich toll aussehende Anwendungen kriegt man damit nicht hin
Stimmt, deswegen nutze ich manchmal z.B. die TMS Komponenten...

Viele Grüße,

Poolspieler
Andreas
  Mit Zitat antworten Zitat
mensch72

Registriert seit: 6. Feb 2008
838 Beiträge
 
#7

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 3. Feb 2015, 00:47
TChart 500T Datenpunkte... wenn die Verarbeitung / Anzeige teilweise nicht hinter her kommt, worauf kommt es denn dann in deiner Anwendung an?

Lieber mal ein paar Daten "weglassen" und dafür aber die Anzeige möglichst immer wieder "realtime" synchron halten, ODER eine Art "Gummiband" mit Dynamischen Puffer, welcher alles "irgendwann" dann eben verzögert darstellt und erst wenn wenig los mal wieder realtime synchron ist.

Fall 1 wird gerne bei Börsendaten genommen, denn da zählt letztendlich nur der aktuelle Kurs, wenn da mal einer zwischendurch fehlt ist das zwar nicht schön, aber kein Kurs ist da teils besser wie ein alter und nun schon falscher Kurs, von dem man nicht weiß, das er "alt" ist.

Meine Lösung: 3 "Block"-Puffer mit 4 möglichen States:"Write","Data","Read","Free"
- nur die StateZugriffe müssen synchronisiert werden, eine einfache kurze CriticalSection reicht hier, sind ja nur 1 oder 2 Variablenzugriffe zu schützen
- der Empfangsthread setzt synchronisiert den Puffer mit "WriteState" auf "DataState" und wechselt zyklisch den anderen Puffer von "Free/Data-State" auf "WriteState", ein Pufferwechsel wird dem WorkThread "signalisiert"
- der WorkThread setzt synchronisiert den Puffer mit "ReadState" auf "FreeState" und setzt den Puffer mit "DataState" auf "ReadState", dann werden die Daten des neuen "ReadState" Puffers beliebig langsam verarbeitet/angezeigt
- wenn die Anzeige sagen wir mal 3sec dauert und der Empfangsthread alle 1sec den Puffer wechselt, dann werden eben mal 2sec keine Empfangsdaten angezeigt, dann aber dafür sofort wieder die aktuellsten... für Visuelle Aufgaben ist das oft die eleganteste Lösung... für vollständige Datenlogs ist das aber ungeeignet, da muss es dann ein dynamischer FiFo sein. => Bei FiFo-Lösungen aber immer einen Empfangs-TimeStamp mit abspeichern! Nur so kann man bei der "späteren" Verarbeitung/Anzeige dann erkennen, das es alte Daten mit Zeitverzug sind

Geändert von mensch72 ( 3. Feb 2015 um 00:51 Uhr)
  Mit Zitat antworten Zitat
Benutzerbild von jaenicke
jaenicke

Registriert seit: 10. Jun 2003
Ort: Berlin
10.057 Beiträge
 
Delphi 12 Athens
 
#8

AW: IdUDPServer hat manchmal Datenverlust --> wie macht das Wireshark?

  Alt 3. Feb 2015, 07:41
Aber woher weiß der Empfangsthread, ob der verarbeitende Thread die Daten schon abgeholt hat? Stichwort "Pufferüberlauf"...
Das muss man prüfen. Da mit den Atomic...-Funktionen die Zugriffe threadsicher passieren, ist aber keine weitere Synchronisation nötig. Und die atomaren Funktionen werden direkt in Assembler umgesetzt, das ist Compilermagic. So entsteht wenig Aufwand für die Synchronisation, auch wenn man noch weitere Sachen prüfen muss.

Hier mal so ein grober Entwurf, der aber schon tut
Auf den ersten Blick würde ich sagen, dass der Austausch der Pointer der beiden Queues in TMessageQueueThread<T>.Execute genau kurz nach dem Assemblerbefehl passieren kann, der den bisherigen Pointer in TReceiveThread.Execute holt. So würde dann Enqueue auf der Queue ausgeführt, die in DoProcessQueue verarbeitet wird. Oder sehe ich das falsch?
Sebastian Jänicke
AppCentral
  Mit Zitat antworten Zitat
Antwort Antwort


Forumregeln

Es ist dir nicht erlaubt, neue Themen zu verfassen.
Es ist dir nicht erlaubt, auf Beiträge zu antworten.
Es ist dir nicht erlaubt, Anhänge hochzuladen.
Es ist dir nicht erlaubt, deine Beiträge zu bearbeiten.

BB-Code ist an.
Smileys sind an.
[IMG] Code ist an.
HTML-Code ist aus.
Trackbacks are an
Pingbacks are an
Refbacks are aus

Gehe zu:

Impressum · AGB · Datenschutz · Nach oben
Alle Zeitangaben in WEZ +1. Es ist jetzt 15:31 Uhr.
Powered by vBulletin® Copyright ©2000 - 2025, Jelsoft Enterprises Ltd.
LinkBacks Enabled by vBSEO © 2011, Crawlability, Inc.
Delphi-PRAXiS (c) 2002 - 2023 by Daniel R. Wolf, 2024-2025 by Thomas Breitkreuz