Einzelnen Beitrag anzeigen

AJ_Oldendorf

Registriert seit: 12. Jun 2009
486 Beiträge
 
Delphi 12 Athens
 
#70

AW: schnelle Server Client Verbindung ohne Verluste

  Alt 9. Apr 2025, 13:44
Also wir lassen jetzt mal die Struktur des Codes und alle "Nebensächlichkeiten" unberücksichtigt und gucken nur auf das Empfangsproblem beim Client.

Client:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, IdIOHandler, IdIOHandlerSocket,
  IdIOHandlerStack, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient,
  System.SyncObjs, IdContext, IdGlobal, System.Generics.Collections,
  System.Diagnostics, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TMyTCPClient = class;

  TDataRec = record
    Daten : TIdBytes;
    Context : TIdContext;
  end;

  TReceiveEvent = procedure(Sender: TObject; aData : TDataRec) of Object;

  TDataQueue = class
  private
    FQueue: TQueue<TDataRec>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TDataRec);
    function Dequeue: TDataRec;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    OnReceive : TReceiveEvent;

    constructor Create(ADataQueue: TDataQueue);
  end;

  TReceiveThread = class(TThread)
  private
    FDataQueue: TDataQueue;
    FParent : TMyTCPClient;
    PrtGes : Boolean;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    constructor Create(aParent : TMyTCPClient; ADataQueue: TDataQueue);
  end;

  TMyTCPClient = class
  private
    FDataQueue : TDataQueue;
    FProcessingThread: TProcessingThread;

    FReceiveThread: TReceiveThread;

    FParentClient : TIdTCPClient;
    FForm : TForm;

    procedure OnClientReadData(Sender: TObject; aData : TDataRec);
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure MyConnect(const AHost: string; APort: Integer);
    procedure Disconnect;
    procedure SendData(const Data: TDataRec);
  end;

  TForm1 = class(TForm)
    IdTCPClient1: TIdTCPClient;
    IdIOHandlerStack1: TIdIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure UpdateTimerTimer(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyClient: TMyTCPClient;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TDataRec>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TDataRec;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
    begin
      SetLength(Result.Daten, 0);
      Result.Context := Nil;
    end;
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TDataRec);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    Data := FDataQueue.Dequeue;
    if Length(Data.Daten) > 0 then
    begin
      if Assigned(OnReceive) then
        OnReceive(Self, Data);
    end
    else
      Sleep(1);
  end;
end;

{ TReceiveThread }

constructor TReceiveThread.Create(aParent: TMyTCPClient; ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  FParent := aParent;

  if FParent.FParentClient.UseNagle then
    Sleep(1);

  PrtGes := True;

  Anz := 0;
  inherited Create(False);
end;

procedure TReceiveThread.Execute;
var
  Buffer : TIdBytes;
  RecData : TDataRec;
begin
  while not Terminated do
  begin
    if Assigned(FParent) and Assigned(FParent.FParentClient) then
    begin
      FParent.FParentClient.IOHandler.ReadBytes(Buffer, -1, False); // blocks and wait, no need to Ssleep()

      if Length(Buffer) > 0 then
      begin
        RecData.Daten := Buffer;
        RecData.Context := Nil;
        FDataQueue.Enqueue(RecData);

        Inc(Anz, Length(Buffer));

        TThread.Queue(nil,
          procedure
          begin
            TForm1(FParent.FForm).Log('Received ' + Length(Buffer).ToString + ' bytes');
          end
        );
      end;

      {
      if FParent.FParentClient.UseNagle then
        TForm1(FParent.FForm).Log('01-Client(TReceiveThread): UseNagle aktiv');

      if FParent.FParentClient.IOHandler.InputBuffer.Size > 0 then
      begin
        while FParent.FParentClient.IOHandler.InputBuffer.Size > 0 do
        begin
          SetLength(Buffer, FParent.FParentClient.IOHandler.InputBuffer.Size);
          FParent.FParentClient.IOHandler.ReadBytes(Buffer, Length(Buffer), False);

          //Daten in Verarbeitungsliste aufnehmen
          RecData.Daten  := Buffer;
          RecData.Context := Nil;

          FDataQueue.Enqueue(RecData);
        end;
      end
      else
        Sleep(1);
        }

    end;
  end;
end;

{ TMyTCPClient }

procedure TMyTCPClient.MyConnect(const AHost: string; APort: Integer);
begin
  FParentClient.Host := AHost;
  FParentClient.Port := APort;
  FParentClient.ConnectTimeout := 5000; // 5 Sekunden Timeout
  FParentClient.ReadTimeout := 5000; // 5 Sekunden Timeout für Lesevorgänge
  FParentClient.UseNagle := False;
  FParentClient.Connect;
  TForm1(FForm).Log('Verbunden mit ' + AHost + ':' + APort.ToString);
end;

constructor TMyTCPClient.Create(aForm : TForm);
begin
  FForm := aForm;

  FParentClient := TForm1(FForm).IdTCPClient1;

  if FParentClient.UseNagle then
    Sleep(1);

  FDataQueue := TDataQueue.Create;

  //wird nur beim Slave genutzt
  FProcessingThread := TProcessingThread.Create(FDataQueue);
  FProcessingThread.OnReceive := OnClientReadData;

  FReceiveThread := TReceiveThread.Create(Self, FDataQueue);
end;

destructor TMyTCPClient.Destroy;
begin
  if Assigned(FReceiveThread) then
    FreeAndNil(FReceiveThread);

  if Assigned(FProcessingThread) then
    FreeAndNil(FProcessingThread);

  if Assigned(FDataQueue) then
    FreeAndNil(FDataQueue);

  Disconnect;
  inherited;
end;

procedure TMyTCPClient.Disconnect;
begin
  if FParentClient.Connected then
  begin
    FParentClient.Disconnect;
    TForm1(FForm).Log('Verbindung getrennt.');
  end;
end;

procedure TMyTCPClient.SendData(const Data: TDataRec);
begin
  if FParentClient.Connected then
  begin
    if FParentClient.UseNagle then
      TForm1(FForm).Log('01-Client(SendData): UseNagle aktiv');

    FParentClient.IOHandler.WriteDirect(Data.Daten);
    //TForm1(FForm).Log(Now, ' Gesendet: ', Length(Data), ' Bytes');
  end
  else
  begin
    FParentClient.Connect;
    //TForm1(FForm).Log('Fehler: Nicht verbunden.');
  end;
end;

procedure TMyTCPClient.OnClientReadData(Sender: TObject; aData : TDataRec);
var
  IData : AnsiString;
begin
  if not Assigned(FParentClient) then
    Exit;

  SetLength(IData,Length(aData.Daten));
  Move(aData.Daten[0],IData[1],Length(aData.Daten));

  //irgendwas mit den Daten machen...
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TDataRec;
  Anz : LongWord;
begin
  if not Assigned(MyClient) then
    Exit;

  var sw3 := TStopwatch.StartNew;
  var t3 : Int64;

  SetLength(TestData.Daten, 61000); //1024
  FillChar(TestData.Daten[0], Length(TestData.Daten), 65);

  TestData.Context := Nil;

  Anz := 0;

  for var i := 1 to 200 do
  begin
    Inc(Anz, Length(TestData.Daten));

    MyClient.SendData(TestData);
  end;

  t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
  Log('Zeitdauer: ' + t3.ToString + ' ms');

  Log('Gesamtlänge: ' + Anz.ToString + ' Bytes');
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  SL := TStringList.Create;
  Memo1.Clear;

  IdTCPClient1.UseNagle := False;

  try
    MyClient := TMyTCPClient.Create(Self);
    try
      MyClient.MyConnect('127.0.0.1', 5000);
    finally

    end;
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyClient.Disconnect;
  FreeAndNil(MyClient);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  Exit;

  SL.Add(aStr);

  if UpdateTimer.Enabled then
    Exit;

  UpdateTimer.Enabled := True;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  Exit;

  UpdateTimer.Enabled := False;

  Memo1.Lines.Text := SL.Text;
end;

end.
VCL Zugriffe sind deaktiviert!
Es erfolgt trotzdem kein Aufruf am Breakpoint in Funktion procedure TReceiveThread.Execute; bei if Length(Buffer) > 0 then
Hier der Server

Server:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
  System.Classes, Vcl.Graphics, Vcl.Controls, Vcl.Forms, Vcl.Dialogs,
  IdServerIOHandler, IdServerIOHandlerSocket, IdServerIOHandlerStack,
  IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, System.SyncObjs,
  System.Generics.Collections, System.Diagnostics, IdGlobal, IdContext,
  Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TMyTCPServer = class;

  TDataRec = record
    Daten : TIdBytes;
    Context : TIdContext;
  end;

  TReceiveEvent = procedure(Sender: TObject; aData : TDataRec) of Object;

  TDataQueue = class
  private
    FQueue: TQueue<TDataRec>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TDataRec);
    function Dequeue: TDataRec;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;

    LastPrt : String;

    procedure Log;
  protected
    procedure Execute; override;
  public
    OnReceive : TReceiveEvent;

    constructor Create(ADataQueue: TDataQueue);
  end;

  TSendeThread = class(TThread)
  private
    FDataQueue: TDataQueue;
    FParent : TMyTCPServer;
    PrtGes : Boolean;

    Anz : LongWord;
    LastPrt : String;

    procedure Log;
  protected
    procedure Execute; override;
  public
    constructor Create(aParent : TMyTCPServer; ADataQueue: TDataQueue);
  end;

  TMyTCPServer = class
  private
    FDataQueue: TDataQueue;
    FSendeDataQueue : TDataQueue;
    FParentServer : TIdTCPServer;
    FForm : TForm;

    FProcessingThread: TProcessingThread;
    FSendeThread: TSendeThread;
    FAnzEmpfang : LongWord;
    FBytesEmpfang : LongWord;

    ReadingIsActiv : Boolean;

    LastRecData : TDataRec;

    LastPrt : String;

    LastContext : TIdContext;

    procedure Log;

    procedure OnExecuteHandler(AContext: TIdContext);

    procedure OnServerReadData(Sender: TObject; aData : TDataRec);
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure Start;
    procedure Stop;
  end;

  TForm1 = class(TForm)
    IdTCPServer: TIdTCPServer;
    IdServerIOHandlerStack: TIdServerIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure UpdateTimerTimer(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyServer: TMyTCPServer;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TDataRec>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TDataRec;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
    begin
      SetLength(Result.Daten, 0);
      Result.Context := Nil;
    end;
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TDataRec);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Log;
begin
  //TForm1(FParent.FForm).Log(LastPrt);
end;

procedure TProcessingThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    Data := FDataQueue.Dequeue;
    if Length(Data.Daten) > 0 then
    begin
      if Assigned(OnReceive) then
        OnReceive(Self, Data);

      //TForm1(FParent.FForm).Log('Empfangen: ', Length(Data), ' Bytes' + '- Anz: ' + Anz.ToString);
    end
    else
      Sleep(1);

    if (FDataQueue.FQueue.Count = 0) then
    begin
      //TForm1(FParent.FForm).Log('Gesamtlänge Empfang: ' + Anz.ToString + ' Bytes');
    end;
  end;
end;

{ TMyTCPServer }

constructor TMyTCPServer.Create(aForm : TForm);
begin
  FDataQueue := TDataQueue.Create;
  FSendeDataQueue := TDataQueue.Create;

  LastContext := Nil;

  FProcessingThread := TProcessingThread.Create(FDataQueue);
  FProcessingThread.OnReceive := OnServerReadData;

  FSendeThread := TSendeThread.Create(Self, FSendeDataQueue);

  FForm := aForm;

  LastRecData.Context := Nil;

  FParentServer := TForm1(FForm).IdTCPServer;
  FParentServer.DefaultPort := 5000;
  FParentServer.OnExecute := OnExecuteHandler;
end;

destructor TMyTCPServer.Destroy;
begin
  Stop;
  FreeAndNil(FSendeThread);
  FreeAndNil(FProcessingThread);
  FreeAndNil(FSendeDataQueue);
  FreeAndNil(FDataQueue);
  inherited;
end;

procedure TMyTCPServer.Log;
begin
  TForm1(FForm).Log(LastPrt);
end;

procedure TMyTCPServer.OnExecuteHandler(AContext: TIdContext);
var
  Buffer : TIdBytes;
  RecData : TDataRec;
begin
  if AContext.Connection.IOHandler.InputBuffer.Size > 0 then
  begin
    LastContext := AContext;

    ReadingIsActiv := True;
    while AContext.Connection.IOHandler.InputBuffer.Size > 0 do
    begin
      Inc(FAnzEmpfang);
      Inc(FBytesEmpfang, AContext.Connection.IOHandler.InputBuffer.Size);

      SetLength(Buffer, AContext.Connection.IOHandler.InputBuffer.Size); //<- so viel einlesen wie im Buffer enthalten ist
      AContext.Connection.IOHandler.ReadBytes(Buffer, Length(Buffer), False);

      //Daten in Verarbeitungsliste aufnehmen
      RecData.Daten := Buffer;
      RecData.Context := AContext;

      FDataQueue.Enqueue(RecData);
    end;
    ReadingIsActiv := False;
  end
  else
  begin
    Sleep(1);

    if (FAnzEmpfang <> 0) or (FBytesEmpfang <> 0) then
    begin
      //TForm1(FForm).Log('Receive-Anzahl: ' + FAnzEmpfang.ToString);
      //TForm1(FForm).Log('Receive-Bytes: ' + FBytesEmpfang.ToString);

      FAnzEmpfang := 0;
      FBytesEmpfang := 0;
    end;
  end;
end;

procedure TMyTCPServer.OnServerReadData(Sender: TObject; aData : TDataRec);
var
  IData : AnsiString;
begin
  if not Assigned(aData.Context) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Context-Angabe');

    Exit;
  end;

  if not Assigned(aData.Context.Binding) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Binding-Angabe');

    Exit;
  end;

  SetLength(IData,Length(aData.Daten));
  Move(aData.Daten[0],IData[1],Length(aData.Daten));

  LastRecData := aData;

  //irgendwas mit den Daten machen...
end;

procedure TMyTCPServer.Start;
begin
  FParentServer.Active := True;
end;

procedure TMyTCPServer.Stop;
begin
  FParentServer.Active := False;
end;

{ TSendeThread }

constructor TSendeThread.Create(aParent: TMyTCPServer; ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  FParent := aParent;

  PrtGes := True;

  Anz := 0;
  inherited Create(False);
end;

procedure TSendeThread.Log;
begin
  TForm1(FParent.FForm).Log(LastPrt);
end;

procedure TSendeThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    if Assigned(FParent) and Assigned(FParent.FParentServer) then
    begin
      Data := FDataQueue.Dequeue;
      if Length(Data.Daten) > 0 then
      begin
        Inc(Anz, Length(Data.Daten));

        if FParent.FParentServer.UseNagle then
        begin
          //TForm1(FParent.FForm).Log('01-Server(TSendeThread): UseNagle aktiv');
        end;

        if FParent.ReadingIsActiv then
        begin
          //TForm1(FParent.FForm).Log('01-Server: Lesevorgang parallel aktiv');
        end;

        {
        if Assigned(Data.Context) and Assigned(Data.Context.Connection) then
        begin
          var sw3 := TStopwatch.StartNew;
          var t3 : Int64;

          if Data.Context.Connection.Connected then
          begin
            Data.Context.Connection.IOHandler.WriteDirect(Data.Daten);

            //TForm1(FParent.FForm).Log('01-Server: Gesendet. Restanzahl: ' + FDataQueue.FQueue.Count.ToString);
          end;

          t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
          if t3 > 50 then
          begin
            //TForm1(FParent.FForm).Log('Zeitdauer Senden: [' + t3.ToString + ']');
          end;

        end;
        }

        if Assigned(FParent.LastContext) and Assigned(FParent.LastContext.Connection) then
        begin
          var sw3 := TStopwatch.StartNew;
          var t3 : Int64;

          if FParent.LastContext.Connection.Connected then
          begin
            FParent.LastContext.Connection.IOHandler.WriteDirect(Data.Daten);

            //TForm1(FParent.FForm).Log('01-Server: Gesendet. Restanzahl: ' + FDataQueue.FQueue.Count.ToString);
          end;

          t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
          if t3 > 50 then
          begin
            //TForm1(FParent.FForm).Log('Zeitdauer Senden: [' + t3.ToString + ']');
          end;

        end;
      end
      else
        Sleep(1);
    end;
  end;
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TDataRec;
  tmpInt : Integer;
begin
  if not Assigned(MyServer) then
    Exit;

  for var i := 1 to 100 do
  begin
    tmpInt := Random(60000);
    if tmpInt < 10 then
      tmpInt := 10;

    SetLength(TestData.Daten, 60000);
    FillChar(TestData.Daten[0], Length(TestData.Daten), 65);

    TestData.Context := Nil;
    if Assigned(MyServer.LastRecData.Context) then
      TestData.Context := MyServer.LastRecData.Context;

    MyServer.FSendeDataQueue.Enqueue(TestData);
  end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  Randomize;

  SL := TStringList.Create;
  Memo1.Clear;

  try
    MyServer := TMyTCPServer.Create(Self);
    MyServer.Start;

    Log('Server läuft auf Port 5000');
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyServer.Stop;
  FreeAndNil(MyServer);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  Exit;

  System.TMonitor.Enter(SL);
  try
    SL.Add(aStr);

    if UpdateTimer.Enabled then
      Exit;

    UpdateTimer.Enabled := True;
  finally
    System.TMonitor.Exit(SL);
  end;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  Exit;

  UpdateTimer.Enabled := False;

  System.TMonitor.Enter(SL);
  try
    Memo1.Lines.Text := SL.Text;
  finally
    System.TMonitor.Exit(SL);
  end;
end;

end.
Bitte mal 1:1 diesen Code testen und sagen, wo das Problem beim Receive ist. Ich habe die Funktion im Receive-Thread exakt wie von Ihnen beschrieben eingebaut. Trotzdem kommt der Server nur 5x in den Aufruf (ich weiß, weil der Buffer beim Client voll ist) FParent.LastContext.Connection.IOHandler.WriteDirect(Data.Daten); und der Client empfängt nichts.
  Mit Zitat antworten Zitat