C++ How to organize multi-threaded code [on hold]

I’m writing a program that will make use of more than 2 threads. I never did anything like that before and I’m looking for a piece of advice about how to organize those threads.

Is it a good practice to keep each thread seperated in different file? Or should I put them together into a single file, for example in main.cpp or in MyApp.cpp?

Proof of lemma from Hong’s article about multi-threaded max flow algorithm

I’m struggling to prove Lemma 3 and Lemma 4 from an article about parallel version of push-relabel algorithm: A lock-free multi-threaded algorithm for the maximum flow problem.

Lemma 3. Any trace of two push and/or lift operations is equivalent to either a stage-clean trace or a stage-stepping trace.

And

Lemma 4. For any trace of three or more push and/or lift operations, there exists an equivalent trace consisting of a sequence of non-overlapping traces, each of which is either stage-clean or stage-stepping.

Pdf version of the article can be found here

Multithreaded parameterized programs with superexponential shared memory size in the number of threads?

In this question, a program means a parameterized multithreaded program with the interleaving semantics, a finite number of per-thread states (which may depend on the number of threads), and finite number of shared states (which may depend on the number of threads).

A shared state is a valuation of the shared memory, and a per-thread (in other terminology, local) state is the valuation of thread-local memory (we assume no stack). Interleaving semantics means that the actions of the threads are interleaved on a single processor and a thread has rw-access to the shared memory and its own local memory and no access to the local memories of the other threads. Parameterized means that we conside a family of programs generated from a finite-desciption template such that the $ n$ th member of the family has $ n$ threads (which typically coincide up to the thread identifier).

To the best of my knowledge, for such a program, the size of the shared state-space is anywhere between constant (e.g., for a single boolean lock variable) and exponential (e.g., Peterson mutual exclusion protocol) in the number of the threads $ n$ .

Is there any well-known academic program in which the size of the shared state-space grows superexponentially in $ n$ ?

Is there something that prevents a multithreaded C# compiler implementation?

The C/C++ language family apparently has multithreaded compiler implementations that work on file level parallelism, but the Microsoft C# compiler (csc.exe) is single-threaded only. MSBuild supports multithreaded building, but only at the project level, and obviously dependency projects have to be compiled before the projects that depend on them so there are practical limits to parallelism.

Is there a multithreaded C# compiler that I’m not aware of? If not, is there something about the C# language specification that prevents a multithreaded C# compiler, or is it just “hard” and therefore not worth it?

Multithreaded OpenGL Rendering Pipeline

I’ve decided to move my rendering code to a separate thread, to help facilitate the move I’ve created a pipeline system for rendering. With this pipeline I can insert commands into a queue which will be processed later on the rendering thread. Thus far the code has proven to work well, but I would like to know if there is anything I can do better.

There are a few things I would like to know:

  1. Does the code have a reasonably fast locking system, if not is there any way I can improve it?
  2. Am I properly locking around data accesses, and should I be using more mutexes?
  3. When I pass parameters for the queue I use a structure than can store 8 parameters (8 64 bit integers) – I only store the data needed for the command in the buffer – is there any way with templates (potentially varargs) that I can make the parameter passing cleaner?

I’ve removed a significant number of commands (rendering functions) for brevity.

The RenderingPipeline class header:

#pragma once  #include <cstdint> #include <thread> #include <mutex> #include <Windows.h>  // The various rending instructions. // Some have been removed for brevity. enum class RenderingOpcode : std::uint8_t {     FINISH_RENDER = 1, // First opcode set to 1 instead of 0 to allow for faster thread locking.     LOAD_SHADER_UNIFORM,     ACTIVATE_SHADER_PROGRAM,     GL_DRAW_ARRAYS,     GL_CLEAR_BUFFERS };  // The various uniform types. // Some have been removed for brevity. enum class ShaderUniformType : std::uint8_t {     INTEGER,     FLOAT,     MAT4F };  // Some simple macros to make parameter packing easier #define PACK_FLOAT(__F)  (reinterpret_cast<std::uint32_t&>(__F)) #define PACK_DOUBLE(__D) (reinterpret_cast<std::uint64_t&>(__F)) #define PACK_PTR(__P)    (reinterpret_cast<std::uint64_t>(__P))  /**  *  A series of parameters for an instruction.  *  *  Ideally I would use templates, potentially with varargs.  */ struct ParameterPack final {     std::uint64_t p0, p1, p2, p3, p4, p5, p6, p7;      ParameterPack(std::uint64_t _p0 = 0, std::uint64_t _p1 = 0, std::uint64_t _p2 = 0, std::uint64_t _p3 = 0,                    std::uint64_t _p4 = 0, std::uint64_t _p5 = 0, std::uint64_t _p6 = 0, std::uint64_t _p7 = 0)         : p0(_p0), p1(_p1), p2(_p2), p3(_p3), p4(_p4), p5(_p5), p6(_p6), p7(_p7)     { }      ~ParameterPack() = default;      ParameterPack(const ParameterPack&) = default;     ParameterPack(ParameterPack&&) = default;      ParameterPack& operator =(const ParameterPack&) = default;     ParameterPack& operator =(ParameterPack&&) = default; };  class RenderingPipeline final { private:     std::uint8_t* _instBuffer;     std::uint32_t _insertPtr;     std::uint32_t _instPtr;      volatile bool _initialReady;     volatile bool _readyForInsert;     volatile bool _running;     std::mutex    _mutex;     std::thread   _renderThread; public:     RenderingPipeline(HDC hdc, HGLRC renderingContext, const std::uint32_t bufferSize = 16384);      ~RenderingPipeline();      void pushInstruction(const RenderingOpcode opcode, const ParameterPack&& params); private:     void runRenderingCycle();      void renderThreadFunc(HDC hdc, HGLRC renderingContext); }; 

The RenderingPipeline class implementation:

#include <cstring> #include <GL/glew.h>  #include "RenderingPipeline.hpp"  RenderingPipeline::RenderingPipeline(HDC hdc, HGLRC renderingContext, const std::uint32_t bufferSize)     : _instBuffer(new std::uint8_t[bufferSize]), _insertPtr(0), _instPtr(0),       _initialReady(false), _readyForInsert(true), _running(true), _renderThread(&RenderingPipeline::renderThreadFunc, this, window) {     _mutex.lock();     /* Unload context on main thread.         The context will be loaded on the rendering thread.        Not entirely sure if this required.       */     wglMakeCurrent(nullptr, nullptr);     std::memset(_instBuffer, 0, bufferSize);     _initialReady = true; // Notify the rendering thread that the buffer is cleared.     _mutex.unlock(); }  RenderingPipeline::~RenderingPipeline() {      _mutex.lock();     _running = false;     _mutex.unlock();     _renderThread.join();     delete[] _instBuffer;  }  void RenderingPipeline::pushInstruction(const RenderingOpcode opcode, const ParameterPack&& params) {     /*      *    After the `RenderingOpcode::FINISH_RENDER` insruction      *  is inserted we lock insertion until the rendering thread      *  loops through everything.      *      *  This will yield the main thread until the rendering thread finishes.      */     do     {         _mutex.lock();         if(_readyForInsert)         {             _mutex.unlock();             break;         }         _mutex.unlock();         std::this_thread::yield();     } while(true);      _mutex.lock();     _instBuffer[_insertPtr++] = static_cast<std::uint8_t>(opcode);  // Macro to copy data into the instruction buffer. #define LOAD_VALUE(__VAR) std::memcpy(reinterpret_cast<void*>(_instBuffer + _insertPtr), reinterpret_cast<const void*>(&__VAR), sizeof(__VAR)); \                           _insertPtr += sizeof(__VAR);      switch(opcode)     {         case RenderingOpcode::FINISH_RENDER:          {             _readyForInsert = false;             break;         }         case RenderingOpcode::LOAD_SHADER_UNIFORM:          {             const ShaderUniformType uniformType = static_cast<ShaderUniformType>(params.p0);             const std::int32_t uniformID = static_cast<std::int32_t>(params.p1);              _instBuffer[_insertPtr++] = static_cast<std::uint8_t>(uniformType);             LOAD_VALUE(uniformID);              switch(uniformType)             {                 case ShaderUniformType::INTEGER:                 {                     LOAD_VALUE(params.p2);                     break;                 }                 case ShaderUniformType::FLOAT:                 {                     const std::uint32_t iVal = static_cast<std::uint32_t>(params.p2);                     const float fVal = reinterpret_cast<const float&>(iVal);                     LOAD_VALUE(fVal);                     break;                 }                 case ShaderUniformType::MAT4F:                 {                     const float* const mat = reinterpret_cast<const float*>(params.p2);                     LOAD_VALUE(mat);                     break;                  }                 default: break;             }             break;          }         case RenderingOpcode::GL_DRAW_ARRAYS:         {             const GLenum  drawArraysMode  = static_cast<GLenum>(params.p0);             const GLint   drawArraysFirst = static_cast<GLint>(params.p1);             const GLsizei drawArraysCount = static_cast<GLsizei>(params.p2);              LOAD_VALUE(drawArraysMode);             LOAD_VALUE(drawArraysFirst);             LOAD_VALUE(drawArraysCount);             break;         }         case RenderingOpcode::GL_CLEAR_BUFFERS:         {             const GLbitfield clearBuffers = static_cast<GLbitfield>(params.p0);             LOAD_VALUE(clearBuffers);             break;         }         default: break;     }  #undef LOAD_VALUE      _mutex.unlock(); }  void RenderingPipeline::runRenderingCycle() { // Macro to retrieve values from the instruction buffer. #define GET_VALUE(__TYPE, __VAR) const __TYPE __VAR = *reinterpret_cast<const __TYPE*>(_instBuffer + _instPtr); \                                  _instPtr += sizeof(__VAR);     _instPtr = 0;      while(true)     {         std::uint8_t instByte;          /*          *  If there is no instruction ready yield the thread.          *          *  This is why the first `RenderingOpcode` has a value of 1 instead of 0.          */         while(true)         {             _mutex.lock();             instByte = _instBuffer[_instPtr];             _mutex.unlock();             if(instByte != 0) { break; }             std::this_thread::yield();         }         ++_instPtr;          const RenderingOpcode currOpcode = static_cast<RenderingOpcode>(instByte);          if(currOpcode == RenderingOpcode::FINISH_RENDER) { break; }          switch(currOpcode)         {             case RenderingOpcode::LOAD_SHADER_UNIFORM:             {                 GET_VALUE(ShaderUniformType, uniformType);                 GET_VALUE(std::int32_t, uniformID);                  switch(uniformType)                 {                     case ShaderUniformType::INTEGER:                     {                         GET_VALUE(std::uint64_t, iVal);                         glUniform1i(uniformID, static_cast<GLint>(iVal));                         break;                     }                     case ShaderUniformType::FLOAT:                     {                         GET_VALUE(float, fVal);                         glUniform1f(uniformID, fVal);                         break;                     }                     case ShaderUniformType::MAT4F:                     {                         GET_VALUE(float*, mat);                         glUniformMatrix4fv(uniformID, 1, GL_FALSE, mat);                         break;                     }                     default: break;                 }                 break;             }             case RenderingOpcode::ACTIVATE_SHADER_PROGRAM:             {                 GET_VALUE(GLuint, shaderProgramID);                 glUseProgram(shaderProgramID);                 break;             }             case RenderingOpcode::GL_DRAW_ARRAYS:             {                 GET_VALUE(GLenum, drawArraysMode);                 GET_VALUE(GLint, drawArraysFirst);                 GET_VALUE(GLsizei, drawArraysCount);                  glDrawArrays(drawArraysMode, drawArraysFirst, drawArraysCount);                 break;             }             case RenderingOpcode::GL_CLEAR_BUFFERS:             {                 GET_VALUE(GLbitfield, clearBuffers);                 glClear(clearBuffers);                 break;             }             default: break;         }     }      _mutex.lock();     std::memset(_instBuffer, 0, _instPtr);     _insertPtr = 0;     _readyForInsert = true;     _mutex.unlock(); #undef GET_VALUE }  void RenderingPipeline::renderThreadFunc(HDC hdc, HGLRC renderingContext) {     // Wait for instruction buffer to be cleared.     do     {         _mutex.lock();         if(_initialReady)         {             _mutex.unlock();             break;         }         _mutex.unlock();         std::this_thread::yield();     } while(true);      // Re-assign OpenGL rendering context to this rendering thread.     wglMakeCurrent(hdc, renderingContext);      // Some basic OpenGL setup.     glEnable(GL_DEPTH_TEST);     glEnable(GL_CULL_FACE);     glCullFace(GL_BACK);     glFrontFace(GL_CW);      while(true)     {         _mutex.lock();         if(!_running)          {             _mutex.unlock();             break;          }         _mutex.unlock();          runRenderingCycle();         SwapBuffers(hdc);     } } 

An example of how it supposed to be used:

void exampleUsage(HDC hdc, HGLRC renderingContex, GLuint shaderProgram) {     RenderingPipeline rp(hdc, renderingContex);      while(true)     {         rp.pushInstruction(RenderingOpcode::ACTIVATE_SHADER_PROGRAM, ParameterPack(shaderProgram));         // ...         rp.pushInstruction(RenderingOpcode::FINISH_RENDER, ParameterPack());     } } 

How to analyse the throughput of multithreaded client server programs?

I am practicing socket programming in C language. I have two codes – server and client (based on TCP) on two different laptops. The server forks a new process for every new request. For simulating multiple simultaneous clients, I have used pthreads library in my client program. I have a lots of files at the server, each of fixed size (2 MB).

I am calculating two things at the client – throughput and response time. Throughput is the average no. of files downloaded per second. And response time is the average time taken to download a file.

I want to see that at what no. of simultaneous users (threads), the throughput gets saturated. But I am facing problem in analyzing that since the network speed is varying largely.

Can someone suggest a way to analyse that? May be some factor other than throughput and response time which does not depend on network speed, because I need to find the number N (simultaneous users) for which the server is at maximum load.

If network bandwidth control is necessary, than is there some simple way to control the maximum upload speed at server. I am using Ubuntu 18.04. The max limit may be imposed on the whole system only (for simplicity) since there is no other foreground process running parallely. For example, my upload speed varies from 3-5 MBps, can it be restricted to some lower value like 2 MBps.

Or should I restrict the download speed at client rather than server’s upload speed so it is constant and always less than that of upload speed at server, since the analysis is done by client program.

How would you design a “multithreaded” LRU cache using C++ (unordered_map and Linkedlist)?

Please note that this is about a thread-safe LRU cache (not simply a LRU cache as specified in https://leetcode.com/problems/lru-cache/description/). It isn’t a duplicate of LRU cache design question as there are some tricky aspects of Locking Hashtable/Linkedlist(LL) that aren’t addressed in other multithreaded LRU design questions.

The credited approach on how to make LRU cache thread-safe in C++ seems to be all over the place. I noticed some links mentioning that locking both Hashtable/LL is ok while some others don’t directly address the issue in C++ and just vaguely discuss various ways to do it.

  • Can someone describe a functional and efficient approach using locks that includes what’s locked and when implementing a multithreaded version of the LRU cache in C++ https://leetcode.com/problems/lru-cache/description/ so all get/set operations are O(1) ?

  • how do caches such as memcached by Facebook implement multithreading support?

Initializing and managing many references in multithreaded control application

I have a large multi-window GUI program (implemented with MFC) that controls a variety of physics experiment apparatus. When the user starts the main function of the program (the “experiment” run), a large worker thread starts which needs references various singleton objects (generally one per piece of apparatus) to do it’s work – in particular programming a variety of machines. These objects are in general managed by the windows which contain the GUI elements relative to those objects, and because they are tied to the GUI windows, they will always exist as long as the program is running. I’m struggling to figure out the best way to handle the references to these objects. What I’ve done in the past is just use raw pointers to these objects, but this seems like bad style so I’m looking at improvements.

My current scheme is having class structures like this:

#include "RF_Generator.h" #include "AnalogOutputs.h"  struct MasterThreadInput {     MasterThreadInput(RfWindow& rfWin, AnalogOutputWindow& aoWin) :          objA(winA.getObjA()),          objB(winB.getObjB())     {    }     RfGenerator& rfSys;     AnalogOutputs& aoSys; }  class RfWindow : public CDialog { public:     RfGenerator& getRfSys(); private:     RfGenerator rfSys; }  class AnalogOutputWindow : public CDialog { public:     AnalogOutputs& getAoSys(); private:     AnalogOutputs aoSys; } 

This then gets filled and used like this:

void startThread(RfWindow& rfWin, AnalogOutputWindow& aoWin) {     MasterThreadInput* input = new MasterThreadInput(rfWin, aoWin);     // other checks on the input     _beginthreadex( NULL, NULL, &MainExperimentThreadProcedure, input, NULL, NULL ); }   unsigned int __stdcall MainExperimentThreadProcedure( void* voidInput ) {     MasterThreadInput* input = (MasterThreadInput*)voidInput;     try     {         // And then do work.         input->rfSys.programRfGeneration();         input->aoSys.programAnalogOutputs();         // etc.     }     catch (...)     {          // handle errors     }     delete input;     return 0; } 

This example code should be very representative of the real thing, except that in the real thing there are 5 windows, many more objects to fill (about 15 in total), and a much more complicated thread procedure – it’s a big worker thread. I thought about using smart pointers instead of references considering the seemingly awkward constructor needed to initialize the references, and since I was using raw pointers before these references. However, considering that I don’t actually need the worker thread to manage the memory for these objects (the objects will exist as long as the windows do, and the windows will exist for the entire program), it seems that I should just try to avoid pointers all-together. Does this code follow best practices regarding managing memory and shared access in multitheaded c++ applications? Are there ways to improve on this organization? It’s a very important part of the control system, so I’d like to get it right.

Multithreaded local server

I’m writing several local servers which have almost the same code in main.cpp. Appreciate your comments, improvement suggestions and especially notes on potential memory leaks since the services are supposed to run 24/7 and process large requests. Thanks!

#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <unordered_map>  #include "UdsServer.hpp" #include "RequestManager.hpp" #include "MSutils.hpp" //MS::log()    void pullRequests();   const std::string pathToSocket = "/var/run/SomeServer.sock"; const std::string SERVICE_NAME = "SomeServer";  RequestManager service; //Does the actual processing of the request   std::unordered_map<int, std::string> requestQueue; std::mutex requestQueue_mutex; std::condition_variable processorsThreadSwitch; bool gotNewRequests = false;    int main() {     UdsServer app; //Server listening on a Unix Domain Socket      try     {         app.createServer(pathToSocket);     }     catch (const std::string & err)     {         MS::log(SERVICE_NAME, "Failed to start the service. Error: " + err, MS::MessageType::FatalException);          return -1;     }      unsigned n_concThreads = std::thread::hardware_concurrency();      if (!n_concThreads) //if the query failed...     {         std::ifstream cpuinfo("/proc/cpuinfo");          n_concThreads = std::count(std::istream_iterator<std::string>(cpuinfo),                         std::istream_iterator<std::string>(),                         std::string("processor"));          if (!n_concThreads)             n_concThreads = 6; // ~number of CPU cores. TODO: make the number of worker processes/threads configurable using a config file     }      for (int i = 0; i < n_concThreads; ++i)     {         std::thread t (pullRequests);         t.detach();     }        while ((int clientConnection = app.newConnectionEstablished()) > -1) //Uses accept() internally     {         std::string command = app.getMsg (clientConnection); //Uses read() internally          if (command.empty())             app.closeConnection(clientConnection);         else if (command == "SHUTDOWN")         {             app.closeConnection(clientConnection);              return 0;         }         else         {             { //Anonymous scope just to get rid of the lock before notifying a thread                 std::lock_guard<std::mutex> writeLock(requestQueue_mutex);                  requestQueue[clientConnection] = std::move(command);                  gotNewRequests = true;             }              processorsThreadSwitch.notify_one(); //nothing happens here if all threads are busy         }     } }    void pullRequests() {     UnixDomainSocket uds;      std::unique_lock<std::mutex> writeLock(requestQueue_mutex);      while (true) //Let the thread run "forever"     {         while (!gotNewRequests)              processorsThreadSwitch.wait(writeLock);           std::unordered_map<int, std::string> queueCopy (std::move(requestQueue));         requestQueue.clear();          gotNewRequests = false;          writeLock.unlock(); //Don't let the other threads wait when this threads doesn't need to access the shared data any more                   if (queueCopy.empty())             continue;         else if (queueCopy.size() == 1)         {             std::string response = service.pullRequests(queueCopy.cbegin()->second);               if (response.length())             {                 auto sendResult = uds.sendMsg(queueCopy.cbegin()->first, response);                  if (!sendResult.isValid())                     MS::log(SERVICE_NAME, "Could not send the response for request: " + queueCopy.begin()->second, MS::MessageType::Error);             }              if (!uds.closeConnection(queueCopy.begin()->first))                 MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);         }         else //Multiplex         {             std::unordered_map<std::string, std::vector<int>> multiplexedRequests;              for (auto & request : queueCopy)                 multiplexedRequests[std::move(request.second)].push_back(request.first);              for (const auto & request : multiplexedRequests)             {                 std::string response = service.pullRequests(request.first);                  if (response.length())                     for (auto socket : request.second)                     {                         auto sendResult = uds.sendMsg(socket, response);                          if (!sendResult.isValid())                             MS::log(SERVICE_NAME, "Could not send the response for request: " + request.first, MS::MessageType::Error);                          if (!uds.closeConnection(socket))                             MS::log(SERVICE_NAME, "Could not close the connection.", MS::MessageType::Error);                     }             }         }           writeLock.lock();     } }