Multi-threading with TThread
Part III - Snippets
Here are collections of snippets solving various tasks using multi-threading.
Run a task on a thread with a variable
| program EX1SingleThread;
{$mode objfpc}{$H+}{$J-}
uses
{$IFDEF UNIX}
cmem, cthreads,
{$ENDIF}
Classes { you can add units after this };
type
// The TThread class encapsulates the native thread support of the OS.
// To create a thread, (1) declare a child of the TThread object, ...
TMyThread = class(TThread)
// (with a data to work with)
private
aString: string;
protected
// (2) override the Execute method, and ...
procedure Execute; override;
public
// (3) lastly, you may include a constructor to setup variables
// for executing this thread.
constructor Create(isSuspended: boolean; message: string);
end;
constructor TMyThread.Create(isSuspended: boolean; message: string);
begin
// Call parent's constructor
// If user pass True, thread won't start automatically
inherited Create(isSuspended);
// Assign a data to work with.
self.aString:=message;
// Won't free the thread when finished.
// The thread will be freed manually on the main block.
FreeOnTerminate:=False;
end;
procedure TMyThread.Execute;
begin
// Execute thread, and DO SOMETHING in this thread.
// Example: if the thread has a data to work with,
// use it to achieve a goal.
WriteLn('Thread ID ', ThreadID, ' is printing ', self.aString);
// Example: simulate a long running process.
Sleep(1000);
end;
var
mythread:TMyThread;
// Main block --------------------------------------------------
begin
// Create a thread, suspended
myThread:=TMyThread.Create(True, 'Hello, World!');
// Debug line
WriteLn('We are in the main thread');
// Start the thread
myThread.Start;
// Wait until the thread is done before going back to
// the main thread.
myThread.WaitFor;
// Free threads manually.
myThread.Free;
// Debug line.
WriteLn('We are in the main thread again');
// Pause console.
WriteLn('Press enter key to quit');
ReadLn;
end.
|
Run same tasks on multiple threads
| program EX2MultiThread;
{
# EX2MultiThread
A simple demo of multi-threading.
## Example of an output
---------------------
Started TThread demo
---------------------
Starting a task from the main thread
Started a task on thread ID 22848
Started a task on thread ID 24428
Completed the task from the main thread
Completed task on thread ID: 22848
Completed task on thread ID: 24428
---------------------
Finished TThread demo
Press Enter to quit
---------------------
}
{$mode objfpc}{$H+}{$J-}
// 2024-02-08 - paweld 🇵🇱 fixed a memory leak issue on the original code.
uses
{$ifdef unix}
cmem, cthreads,
{$endif}
Classes,
SysUtils;
type
// Create a class based on TThread
// TTaskThread
TTaskThread = class(TThread)
protected
// Override the Execute procedure of TThread
procedure Execute; override;
public
// Thread constructor with free on terminate
constructor Create;
end;
// The Execute procedure, simulating a task
procedure TTaskThread.Execute;
begin
WriteLn('Started a task on thread ID ', ThreadID);
Sleep(2000); // Simulating a long-running task.
WriteLn('Completed task on thread ID: ', ThreadID);
end;
// Constructor of TTaskThread
constructor TTaskThread.Create;
begin
// Create as suspended.
inherited Create(True);
// Set Free on Terminate to false, so it won't free itself when completed.
FreeOnTerminate := False;
// Run thread.
Start;
end;
var
task1, task2: TThread;
begin
WriteLn('---------------------');
WriteLn('Started TThread demo');
WriteLn('---------------------');
// Create all threads
task1 := TTaskThread.Create;
task2 := TTaskThread.Create;
// Start a task on the main thread
Writeln('Starting a task from the main thread');
Sleep(2000); // simulate a task
Writeln('Completed the task from the main thread');
// Wait for threads to finish before going back to the main thread.
task1.WaitFor;
task2.WaitFor;
// Free the threads manually
task1.Free;
task2.Free;
WriteLn('---------------------');
WriteLn('Finished TThread demo');
WriteLn('Press Enter to quit');
WriteLn('---------------------');
ReadLn;
end.
|
Sum numbers in an array
Important features of this example.
- Use of
TIntegerDynArray
, which is a dynamic array of integers.
- Thread Class
TMyThread
inherits from TThread
. This class encapsulates the logic for summing a segment of the array within each thread.
- The constructor
TMyThread.Create
initialises the thread with the input array, start index, and end index. This setup ensures each thread works on a specific portion of the array.
- The
Execute
method in TMyThread
performs the actual summing of numbers within the assigned segment of the array.
- Main program
- Calculates the segment size for each thread using
Math.Ceil((Length(inputArray) + MAX_THREADS - 1) / MAX_THREADS)
, ensuring that each thread gets an equal or almost equal portion of the array to process.
- Creates and starts each thread, passing in the relevant segment of the array.
- After all threads complete their work, collect the partial sums from each thread and calculate the total sum.
- Manual memory cleanup by setting
FreeOnTerminate := False
and manually freeing the threads after collecting their results.
- Lastly, the program displays information about each thread's segment and its partial sum.
| program EX3MultiThread;
{$mode objfpc}{$H+}{$J-}
uses
{$IFDEF UNIX}
cmem, cthreads,
{$ENDIF}
Classes,
Types,
Generics.Collections,
Math { you can add units after this };
// --- Custom thread type --------------------------------------
type
// The TThread class encapsulates the native thread support of the OS.
// To create a thread, (1) declare a child of the TThread object, ...
TMyThread = class(TThread)
// (with a data to work with)
private
// Variables to store the input array for this thread to sum, along with
// start index and end index
anArray: TIntegerDynArray;
startIdx: integer;
endIdx: integer;
// The sum of array in the thread
partialSum: integer;
protected
// (2) override the Execute method, and ...
procedure Execute; override;
public
// (3) lastly, you may include a constructor to setup variables
// for executing this thread.
constructor Create(const isSuspended: boolean;
var inputArray: TIntegerDynArray;
const startIndex: integer;
const endIndex: integer);
end;
constructor TMyThread.Create(const isSuspended: boolean;
var inputArray: TIntegerDynArray;
const startIndex: integer;
const endIndex: integer);
begin
// Call parent's constructor
// If user pass True, thread won't start automatically
inherited Create(isSuspended);
// Assign a data to work with.
self.anArray := inputArray;
self.startIdx := startIndex;
self.endIdx := endIndex;
// DO NOT Free thread when finished here.
// The main thread will ...
// 1. collect the results from n threads,
// 2. free n threads from the main thread.
FreeOnTerminate := False;
end;
procedure TMyThread.Execute;
var
index: integer;
begin
// Execute thread, and DO SOMETHING in this thread.
// Initialise partialSum to 0 to start with
self.partialSum := 0;
// partialSum the numbers in the assigned array using for..do loop.
for index := self.startIdx to self.endIdx do
self.partialSum := self.partialSum + self.anArray[index];
// Display user feedback from this thread
WriteLn('Thread ', ThreadID, ' summed up ', self.partialSum);
end;
// const and var for the main block ----------------------------
const
// Specify max number of threads to use.
MAX_THREADS = 4;
// The length of an input array may come from a file.
INPUT_ARRAY_LENGTH = 10000;
var
// Input array containg numbers to sum.
inputArray: TIntegerDynArray;
// Setting an array for the threads.
myThreads: array of TMyThread;
// Size of a segment for each thread
segmentSize: integer;
// total sum -- will collect values from each thread
totalSum: integer;
// Indexes
index, startIndex, endIndex: integer;
// Main block ------------------------------------------------
begin
// Populate input array. This may come from a file.
SetLength(inputArray, INPUT_ARRAY_LENGTH);
for index := 0 to INPUT_ARRAY_LENGTH - 1 do
inputArray[index] := index + 1;
// Calculate segment size for each thread
segmentSize := Math.Ceil((Length(inputArray) + MAX_THREADS - 1) / MAX_THREADS);
// Create and start the threads.
SetLength(myThreads, MAX_THREADS);
for index := 0 to MAX_THREADS - 1 do
begin
// Start index for a thread is i * segmentSize.
startIndex := index * SegmentSize;
// Ensure that each thread processes the correct portion of the array
// without going out of bounds on last iteration.
endIndex := Min((index + 1) * SegmentSize - 1, Length(inputArray) - 1);
// Show user info.
WriteLn('startIndex: ', startIndex, ' ', ' endIndex:', endIndex);
// Create a thread.
myThreads[index] := TMyThread.Create(False, inputArray, StartIndex, EndIndex);
// Start this new thread.
myThreads[index].Start;
end;
// Wait until a thread is done, sum up and free it.
totalSum := 0;
for index := 0 to MAX_THREADS - 1 do
begin
// Wait until thread index n finishes
myThreads[index].WaitFor;
// Get the partial sum from thread index n
totalSum := totalSum + myThreads[index].partialSum;
// Lastly, free thread index n
myThreads[index].Free;
end;
// Display results
WriteLn('Total sum of array is: ', totalSum);
WriteLn('Press enter key to quit');
ReadLn;
end.
|
Increment a counter multi-threading
This snippet features:
- Use of
TRTLCriticalSection
to ensure only one thread can increment a counter variable
program CriticalSectionIncrementCounter;
{
An example adapted from \lazarus\examples\multithreading\multithread_critical
for CLI.
This is a simple example using 4 threads to increase a counter (shared
variable).
To enable critical sections, set isCriticalSectionEnabled to True.
To disable critical sections, set isCriticalSectionEnabled to False.
With critical sections you will always get 4,000,000.
Without you will see different results on each run.
Important: In most Unix like systems, the cthread unit must be added
to the uses section of the .lpr file. Further, cmem is likely to
be significently faster so add it as well. Due to how the units
work a sensible order is cmem, cthreads and then perhaps cwstrings.
But note that heaptrc does not work with cmem so comment it out
while testing/debugging.
}
{$mode objfpc}{$H+}{$J-}
uses
{$IFDEF UNIX}
cmem, cthreads,
{$ENDIF}
Classes,
SysUtils;
type
TCustomThread = class(TTHread)
private
finishedState: boolean;
public
procedure Execute; override;
property isFinished: boolean read finishedState write finishedState;
end;
const
// Use Critical Section or not?
isCriticalSectionEnabled: boolean = True;
var
criticalSection: TRTLCriticalSection;
mainCounter: integer;
procedure TCustomThread.Execute;
var
i: integer;
currentCounter: longint;
begin
// Set the finished state to false.
finishedState := False;
// Increment the mainCounter.
// Because the other threads are doing the same, it will frequently happen
// that 2 (or more) threads read the same number, increment it by one and
// write the result back, overwriting the result of the other threads.
for i := 1 to 1000000 do
begin
if isCriticalSectionEnabled then
// 2. Begins the lock.
// When this call returns, the calling thread is the only thread
// running the code between the EnterCriticalSection call and the
// following LeaveCriticalsection call.
EnterCriticalSection(criticalSection);
try
// Read the current mainCounter
currentCounter := mainCounter;
// Increment mainCounter by one
Inc(currentCounter);
// Write the result back the mainCounter variable
mainCounter := currentCounter;
finally
if isCriticalSectionEnabled then
// 3. Releases the lock.
// Signals that the protected code can be executed by other threads.
LeaveCriticalSection(criticalSection);
end;
end;
// Once the task for this thread is done, set the finished state to True.
finishedState := True;
end;
{
This is the routine that increment a counter
}
procedure IncrementCounter;
var
index: integer;
threadList: array[1..4] of TCustomThread;
isAllThreadsFinished:boolean;
begin
mainCounter := 0;
// 1. Initialises a critical section.
// This call must be made before either EnterCrititicalSection or
// LeaveCriticalSection is used.
InitCriticalSection(criticalSection);
// Start the threadList
for index := Low(threadList) to High(threadList) do
threadList[index] := TCustomThread.Create(False);
WriteLn('All threads created ...');
// Wait till all threadList finished
repeat
isAllThreadsFinished := True;
for index := Low(threadList) to High(threadList) do
if not threadList[index].isFinished then isAllThreadsFinished := False;
until isAllThreadsFinished;
WriteLn('All threads completed ...');
// Free the threadList
for index := Low(threadList) to High(threadList) do
threadList[index].Free;
WriteLn('All threads are freed ...');
// 4. Frees the resources associated with a critical section.
// After this call neither EnterCrititicalSection nor LeaveCriticalSection
// may be used.
DoneCriticalSection(criticalSection);
// Show the mainCounter
WriteLn('Printing the value of shared variable ...');
WriteLn('Counter = ' + IntToStr(mainCounter));
end;
// Main block ------------------------------------------------------------------
begin
// Print status of Critical Section
if isCriticalSectionEnabled then
WriteLn('Critical Section: Enabled')
else
WriteLn('Critical Section: Disabled');
// Callt he routine to increment a counter using multi-threading
IncrementCounter;
// Pause console
WriteLn('Press Enter key to exit.');
ReadLn;
end.
Assigns student IDs to student names
This program assigns an ID to each student name from a text file and sorts them by student ID.
This snippet features:
TFileStream
and TStreamReader
for reading lines from a text file
- Use of four threads to complete the task
- Use of rounding-up division to split the workload between threads
- Use of
TRTLCriticalSection
to ensure only one thread can write to the output list
The common.pas
of assigning IDs to names
This file holds the common type and variable declarations.
There are two important common variables here:
startStudentID: int64 = 200000;
- This variable specifies the starting index for student IDs. All threads will read from this variable and increment it by one for other threads to read from. Therefore, reading and incrementing this variable MUST be done within a critical section to avoid race conditions.
finalStudentList: TStudentList;
- A list of TStudent records containing names and student IDs. All threads will write their output here, so writing to this variable MUST also be done within a critical section.
| unit Common;
{$mode ObjFPC}{$H+}
interface
uses
Classes, SysUtils, Generics.Defaults, Generics.Collections, Math;
type
// A record to hold student information.
TStudent = record
Name: string;
id: integer;
end;
type
// A list to hold student records, along with a comparer for
// sorting the list afterwards.
TStudentList = specialize TList<TStudent>;
TStudentListComparer = specialize TComparer<TStudent>;
type
// A type of string list.
TStrList = specialize TList<string>;
const
// Number of maximum threads to use.
maxThreads: int64 = 4;
var
// This variable specifies the lowest index for student ID.
// All threads will be reading from this variable and increase by one
// for other threads to read from.
// Hence, reading and increment of this variable MUST be done
// from within a critical section to avoid race.
startStudentID: int64 = 200000;
// A list of TStudent records. All threads will write student names and
// student IDs into this variable.
// Hence, writing to this variable MUST be done from within
// a critical section too.
finalStudentList: TStudentList;
// Custom comparison function for sorting by name - ascending
function CompareID(const LeftItem, RightItem: TStudent): integer;
implementation
// Custom comparison function for sorting by student id - ascending
function CompareID(const LeftItem, RightItem: TStudent): integer;
begin
Result := CompareValue(LeftItem.id, RightItem.id);
end;
end.
|
The customthread.pas
of assigning IDs to names
This file defines the implementation of a custom thread based on TThread
.
There are important things to note here:
- The implementation includes a
destructor
to clean up the list used by the threads for the task (lines 30, 63-69).
- The thread receives an array but only processes a portion of it based on the rounding-up division algorithm defined in the main block.
FreeOnTerminate := False
as the main thread is responsible for managing the freeing of all threads (line 47).
- The
Execute
method updates shared variables within a critical section (lines 79-91).
| unit CustomThread;
{$mode ObjFPC}{$H+}{$J-}
interface
uses
Classes, SysUtils, Common;
// Create a thread class deriving from TThread.
type
// The TThread class encapsulates the native thread support of the OS.
// To create a thread, (1) declare a child of the TThread object, ...
TCustomThread = class(TThread)
// (with a data to work with)
private
// A TStrList to store the input array for this thread, along with
// a variable to store an instance of critical section.
list: TStrList;
cs:TRTLCriticalSection;
protected
// (2) override the Execute method, and ...
procedure Execute; override;
public
// (3) include a constructor to setup variables for executing this thread.
constructor Create(const criticalSection: TRTLCriticalSection;
const listToProcess: TStrList;
const startIndex, finishIndex: int64);
// (4) lastly, include destructor to free the TStrList of this thread.
destructor Destroy; override;
end;
implementation
// Create the Custom Thread with an input list to process.
constructor TCustomThread.Create(const criticalSection: TRTLCriticalSection;
const listToProcess: TStrList;
const startIndex, finishIndex: int64);
var
index: int64;
begin
// Call parent's constructor
// If user pass True, thread won't start automatically
inherited Create(True);
// Not free threads on terminate.
// Threads will be freed from the main thread.
FreeOnTerminate := False;
// Assign critical section
self.cs := criticalSection;
// Populate the internal list for the Execute procedure
self.list := TStrList.Create;
for index := startIndex to finishIndex do
begin
self.list.Add(listToProcess[index]);
end;
// User feedback
WriteLn('Thread created with id: ', ThreadID);
end;
destructor TCustomThread.Destroy;
begin
// Free the TStrList.
self.list.Free;
// Call parents' Destroy.
inherited Destroy;
end;
// Enter and leave Critical Section here.
procedure TCustomThread.Execute;
var
index: int64;
student: TStudent;
begin
for index := 0 to self.list.Count - 1 do
begin
EnterCriticalSection(cs); // --------------------------------- enter cs
try
// Add student - ID pair as TStudent, then add into TStudentList
// 1. Get the name from the list with allocated index
student.Name := list[index];
// 2. Get the starting student ID from
student.id := startStudentID;
// 3. Add TStudent into TStudentList (the main block does the init)
finalStudentList.Add(student);
// After a student - ID pair is added, increment the current student ID by 1
startStudentID := startStudentID + 1
finally
LeaveCriticalSection(cs); // ------------------------------- leave cs
end;
end;
end;
end.
|
The main program of assigning IDs to names
Key features of the main snippet:
- The text file is read into
strList
.
- Threads are created and assigned specific subarrays to process.
- Critical sections ensure safe access to shared variables.
- After the threads finish, the results are sorted and printed.
| program AssignStudentIDs;
{
This program assigns student ID to each name from a text file by
using N number of threads.
Pre-requisite
- TThread for managing the threads.
- TRTLCriticalSection for ensuring only one thread can modify a shared
variable at one time.
- Input text file containing a list of names. For example;
Alyssa Morgan
Declan Hayes
Nora Patel
Miles Thompson
Sienna Larson
Kellan Rivera
Camille Chang
Jensen Park
Amara Singh
Holden Myers
Elise Howard
Luca Griffin
Reagan Patel
Kian Gallagher
Mara Nguyen
...
...
Algorithm
- Read the text file into an array.
- All threads will read from the same array, but at differing start and
finish indexes. This depends on the number of max threads.
- Assign workloads to each thread.
- Specify the start and finish indexes to each thread.
- Will use the rounding up division method to ensure near-equal division
of workload for each thread.
- Wait for the threads to finish and then free them.
- Sort the final student list
- Print results on screen.
Sample Output
$ ./AssignStudentIDs.exe student-names.txt
No of students : 200
Max threads : 4
subArray size round up : 51
---------------------------------
Thread created 25040
Thread created 26324
Thread created 11972
Thread created 26028
Starting threads ...
Waiting for threads to finish ...
All threads are done ...
Printing results ...
Alyssa Morgan, 200000
Declan Hayes, 200001
Nora Patel, 200002
Miles Thompson, 200003
Sienna Larson, 200004
Kellan Rivera, 200005
Camille Chang, 200006
Jensen Park, 200007
Amara Singh, 200008
Holden Myers, 200009
Elise Howard, 200010
Luca Griffin, 200011
Reagan Patel, 200012
...
...
}
{$mode objfpc}{$H+}{$J-}
uses
{$IFDEF UNIX}
cmem
, cthreads
{$ENDIF}
Classes,
SysUtils,
Math,
streamex,
Common,
CustomThread;
// All the variables and procedures to get the job done.
var
// Critical Section preventing threads accessing a variable at the same time.
customCriticalSection: TRTLCriticalSection;
// TStreams for reading files
fileStream: TFileStream;
streamReader: TStreamReader;
// A temporary list to hold student names from a text file
strList: TStrList;
// An array of threads
myThreads: array of TThread;
// Variables for calculating subarray size for each thread
subArraySize, index: int64;
// Main block ////////////////////////////////////////////////////////////////
begin
try
// 1. Read input text file and populate input array from a text file
if not FileExists(ParamStr(1)) then
begin
WriteLn(Format('%s does not exist.', [ParamStr(1)]));
Exit;
end;
strList := TStrList.Create;
finalStudentList := TStudentList.Create;
try
fileStream := TFileStream.Create(ParamStr(1), fmOpenRead);
try
streamReader := TStreamReader.Create(fileStream, 65536, False);
try
while not streamReader.EOF do
begin
// Add each line into a list
strList.Add(streamReader.ReadLine);
end;
finally
streamReader.Free;
end;
finally
fileStream.Free
end;
// 2. Init Critical Section as we have threads writing to a shared variable
InitCriticalSection(customCriticalSection);
// 3a. set the number of threads in array of TThread
SetLength(myThreads, maxThreads);
try
{
3b. Now we add threads & assign workloads, using rounding up division --
Ceil((totalElements + N - 1) div N).
- When we divide totalElements by N, we get the quotient of the
division. However, if totalElements is not evenly divisible by N,
there might be a remainder.
- In the context of splitting an array into subarrays, we want
each subarray to have approximately the same number of elements.
Therefore, we want to ensure that any remaining elements after
dividing totalElements by N are included in the last subarray to
avoid losing data.
- The expression Ceil((totalElements + N - 1) div N); effectively
rounds up the division by adding N - 1 to totalElements before
performing the division. **This ensures that any remainder is
accounted for in the last subarray**.
}
subArraySize := Math.Ceil((strList.Count + maxThreads - 1) / maxThreads);
// Show user feedback
WriteLn('No of students : ', IntToStr(strList.Count));
WriteLn('Max threads : ', IntToStr(maxThreads));
WriteLn('subArray size round up : ', IntToStr(subArraySize));
WriteLn('---------------------------------');
{
3c. Assign workload to each thread by using the following info;
- source list
- start index and
- finish index for a thread
}
for index := 1 to maxThreads do
begin
myThreads[index - 1] := TCustomThread.Create(customCriticalSection,
strList,
((index - 1) * subArraySize),
Math.Min(index * subArraySize - 1, strList.Count - 1));
end;
// 4. Start all threads
WriteLn('Starting threads ...');
for index := 0 to High(myThreads) do
myThreads[index].Start;
// 5. Wait for both threads to finish and free
WriteLn('Waiting for threads to finish ...');
for index := 0 to High(myThreads) do
begin
myThreads[index].WaitFor;
myThreads[index].Free;
end;
WriteLn('All threads are done ...');
// 6. Sort by student ID
finalStudentList.Sort(TStudentListComparer.construct(@CompareID));
// 7. Show results
WriteLn('Printing results ...');
for index := 0 to finalStudentList.Count - 1 do
WriteLn(finalStudentList[index].Name, ', ', finalStudentList[index].id);
// 8. Show user feedback
WriteLn('---------------------------------');
WriteLn(Format('Output list contains %d items', [finalStudentList.Count]));
WriteLn('---------------------------------');
finally
// 7. Free Critical Section
DoneCriticalSection(customCriticalSection);
end;
finally
finalStudentList.Free;
strList.Free;
end;
except
on E: Exception do
WriteLn('Error: ' + E.Message);
end;
end.
|
A Large Text File Parser
This example demonstrates how to parse a text file using multi-threading. The code reads the text file, divides it into chunks, and assigns each chunk to a different thread. Each thread then prints the number of bytes it processed and the first line of its chunk.
| program LargeTextFileParser;
{
Description
This program is a simple example of parsing of a text file using N threads.
The code reads and divides the text file into chunks and assigns these chunks
to different threads, ensuring that no chunk splits a paragraph or sentence.
Workflow
1. File Reading - Read the file in 12MB chunks.
2. Data Integrity - Ensure chunks do not split paragraphs or sentences by
adjusting the file pointer based on the last newline
character.
3. Thread Management - Use up to N threads to process chunks in parallel.
4. Display Output - Print the size of each chunk and the first line of each chunk.
}
{$mode objfpc}{$H+}{$J-}
uses
Classes,
SysUtils,
Math;
const
MAX_THREADS = 8; // Max threads to use
CHUNK_SIZE = 12 * 1024 * 1024; // Size of chunk for each thread to process (12MB)
type
// The TThread class encapsulates the native thread support of the OS.
TFileChunkProcessor = class(TThread)
private
FData: array of char; // Chunk of data to work with
FDataSize: integer; // Length of data to the nearest new line
protected
procedure Execute; override;
public
constructor Create(const AData: array of char; ADataSize: integer);
end;
constructor TFileChunkProcessor.Create(const AData: array of char; ADataSize: integer);
begin
// Call parent's constructor and don't start thread automatically
inherited Create(True);
// Assign a AData size to work with
FDataSize := ADataSize;
// Allocate memory for AData and copy it
// Since Char in Free Pascal can be 1 or 2 bytes (depending on whether
// it's an ANSI or Unicode character), using SizeOf(Char) ensures
// the correct number of bytes are moved
SetLength(FData, FDataSize);
Move(AData[0], FData[0], FDataSize * SizeOf(char));
// Do not free thread automatically when finished.
FreeOnTerminate := False;
end;
procedure TFileChunkProcessor.Execute;
var
line: string;
index: integer;
begin
// Example processing: print the chunk size and the first line
line := '';
for index := 0 to FDataSize - 1 do
begin
if FData[index] = #10 then
begin
Writeln('Processed chunk of size: ', FDataSize, ' bytes');
Writeln('First line: ', line);
Break;
end
else if FData[index] <> #13 then
begin
line := line + FData[index];
end;
end;
end;
{
This routine reads a text file in chunks and processes each chunk using
separate threads. It ensures each thread processes a chunk up to the
nearest newline, without breaking a paragraph or sentence.
}
procedure ReadFileInChunks(const AFileName: string);
var
fStream: TFileStream;
buffer: array of char;
bufferSize: integer;
index, lastNewLine: integer;
threadList: array of TFileChunkProcessor;
activeThreads: integer;
begin
fStream := TFileStream.Create(AFileName, fmOpenRead);
try
// Initialise variables for buffer, threads and threads counter.
SetLength(buffer, CHUNK_SIZE);
SetLength(threadList, MAX_THREADS);
activeThreads := 0;
// Read the file until the file pointer reaches the end of the file
while fStream.Position < fStream.Size do
begin
// Determine the buffer size to read and ensuring that the buffer size
// for reading does not exceed the size of the remaining data in the file.
bufferSize := Min(CHUNK_SIZE, (fStream.Size - fStream.Position) div SizeOf(char));
// Read data into buffer
fStream.Read(buffer[0], bufferSize);
// Find the index of the last newline character in the buffer
lastNewLine := -1;
for index := bufferSize - 1 downto 0 do
begin
if buffer[index] = #10 then
begin
lastNewLine := index;
Break;
end;
end;
if lastNewLine = -1 then
lastNewLine := bufferSize - 1;
// Create a thread to process the chunk and its size (index of \n + 1)
threadList[activeThreads] := TFileChunkProcessor.Create(buffer, lastNewLine + 1);
threadList[activeThreads].Start;
{
Next, adjust file position to the character after the last newline.
---
Explanation
---
Let's say we have a buffer size of 1000 characters, and the last
newline character is found at index 950.
The buffer contains 50 characters after the last newline.
bufferSize = 1000
lastNewLine = 950
bufferSize - lastNewLine - 1 = 1000 - 950 - 1 = 49
Assuming SizeOf(Char) = 1 (for simplicity),
```pascal
fStream.Position := fStream.Position - 49 * SizeOf(Char);
```
This means the file position is moved back by 49 bytes, so the next
read operation will start at the 951st character in the buffer,
which is the character immediately following the last newline.
This ensures that no line is split between two chunks and maintains
the integrity of the data being processed.
}
fStream.Position := fStream.Position - (bufferSize - lastNewLine - 1) * SizeOf(char);
// Increment active thread counter
Inc(activeThreads);
// If max threads are active, wait for them to finish
if activeThreads = MAX_THREADS then
begin
for index := 0 to MAX_THREADS - 1 do
begin
threadList[index].WaitFor;
end;
activeThreads := 0;
end;
end; // -- End of the `while fStream.Position < fStream.Size do` loop.
// Wait for any remaining threads to complete
for index := 0 to activeThreads - 1 do
begin
threadList[index].WaitFor;
end;
finally
fStream.Free; // Clean up the file stream
end;
end;
// MAIN block ----------------------------------------------------------------
begin
// Check if the input file exists
if not FileExists(ParamStr(1)) then
begin
WriteLn('File not found. Does ', ParamStr(1), ' exist?');
Exit;
end;
// Parse the text file using multi-threading
try
ReadFileInChunks(ParamStr(1));
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
end.
|