Monitor Object or Thread-safe passive object pattern

Background - 

Object is an instance of a class. That class may have many methods. Monitor object make sure, only one method of class can execute at a time. Monitor object helps in case many threads are accessing a single object.

Monitor object pattern also helps to achieve serialization at the function level. There are many benefits to using it. With monitor object pattern, we can avoid the old style of pooling (looping through based on one shared boolean variable), which is a waste of CPU time. Monitor object puts the thread in sleep mode until other concurrently executing methods notify it. Through this, we can also define the execution sequence of methods of a class.

The good part is, now clients (who before pooling to get notified) don't have to worry about notification and serialization. Monitor object class can take care of all these issues.


How Monitor object achieve it?

In OOP, we learned to access data members of a class through API only. It means data members have to be private in your class. Extending it, we can make all real implementation methods, get executed through their public counterpart APIs only.

It means the monitor object has two layers of execution. One, public APIs, which is a higher layer, and follows the serialization mechanism through mutex. Second, private implementation methods, low level, which don't have to bother about the mutex and serialization as higher layer has already done this. Low-level APIs have to focus on real business logic. But they are not supposed to call higher-level APIs. Because already one high-level API has locked the mutex and called it, and calling another high-level API, it is a bad idea first, undefined behavior in case of locking the same mutex from another high-level API. Second, chances of deadlock if high-level API is locking other mutex.


Disadvantage - 

  1. Scalability, as only a single lock, can handle all serialization. We can solve it by having multiple locks, but that become complex to manage.
  2. Monitor object's functionality and serialization mechanism tightly coupled. Any tight coupling is not a good practice.

Sequence diagram

Monitor object pattern sequence diagram
Please click on the image to see the zoom view.


Class diagram

Class ProcessQueue is monitor object in below mentioned class diagram. 

Monitor object pattern Class diagram
Please click on the image to see the zoom view.


ProcessQueue.h

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/
#ifndef _PROCESS_QUEUE_H_
#define _PROCESS_QUEUE_H_
#pragma once

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "IMessage.h"
#include "HandleOrder.h"

using namespace IO_MSG;

namespace MonitorObject
{
  class ProcessQueue
  {
  private:
    std::unique_ptr<IHandlePurchaseOrder>             pOrderHandler;
    std::queue<std::shared_ptr<IMessage> >            requestQ;
    short                                             qSize;
    mutable std::mutex                                mLock;
    std::condition_variable                           cNotEmpty;
    std::condition_variable                           cNotFull;

    //_________________________________________private functions for real implementation
    void                                              PlaceOrder_impl(std::shared_ptr<IMessage> pMsg);
    std::shared_ptr<IMessage>                         GetConfirmation_impl();
    bool                                              isEmpty_impl();
    bool                                              isFull_impl();

  public:
    /*Constructor*/                                   ProcessQueue();
    /*Destructor*/                                    ~ProcessQueue();

    //_________________________________________Public APIs
    void                                              PlaceOrder(std::shared_ptr<IMessage> pMsg);
    std::shared_ptr<IMessage>                         GetConfirmation();
    bool                                              isEmpty();
    bool                                              isFull();
  };
}

#endif // !_PROCESS_QUEUE_H_

ProcessQueue.cpp

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/
#include "ProcessQueue.h"
#include "Log.h"
#include "GlobalFactory.h"

using namespace LOG;
using namespace SESSION;

namespace MonitorObject
{
  ProcessQueue::ProcessQueue() : qSize(0)
  {
    LOG_IT(SETUP_INFO, "Construction Process: ProcessQueue is being created ");
    qSize = std::stoi(FactoryInstance::GetGlobalFactory()->GetConfigurationManager()->GetValue("Queue_Max_size", VALUE));
    pOrderHandler = make_unique<DomesticOrder>();
  }

  ProcessQueue::~ProcessQueue()
  {
    LOG_IT(SETUP_INFO, "Cleanup Process: of ProcessQueue has begin");
  }

  void ProcessQueue::PlaceOrder(std::shared_ptr<IMessage> pMsg)
  {
    std::unique_lock<std::mutex> lockIt(mLock);
    while (isFull_impl())
    {
      cNotFull.wait(lockIt);
    }
    PlaceOrder_impl(pMsg);
    cNotEmpty.notify_one();
  }

  std::shared_ptr<IMessage> ProcessQueue::GetConfirmation()
  {
    std::unique_lock<std::mutex> lockIt(mLock);
    while (isEmpty_impl())
    {
      cNotEmpty.wait(lockIt);
    }
    std::shared_ptr<IMessage> msg = GetConfirmation_impl();
    cNotFull.notify_one();
    return msg;
  }

  bool ProcessQueue::isEmpty()
  {
    std::lock_guard<std::mutex> lockIt(mLock);
    return isEmpty_impl();
  }

  bool ProcessQueue::isFull()
  {
    std::lock_guard<std::mutex> lockIt(mLock);
    return isFull_impl();
  }

  void ProcessQueue::PlaceOrder_impl(std::shared_ptr<IMessage> pMsg)
  {
    requestQ.push(pMsg);
    pOrderHandler->PlaceOrder(pMsg);
  }

  std::shared_ptr<IMessage> ProcessQueue::GetConfirmation_impl()
  {
    std::shared_ptr<IMessage> msg = requestQ.front();
    pOrderHandler->OrderResponse(msg);
    requestQ.pop();
    return msg;
  }

  bool ProcessQueue::isEmpty_impl()
  {
    return requestQ.empty();
  }

  bool ProcessQueue::isFull_impl()
  {
    return requestQ.size() == qSize - 1;
  }
}

ClientRequestHandler.h

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/
#ifndef _CLIENT_REQUEST_HANDLER_H_
#define _CLIENT_REQUEST_HANDLER_H_
#pragma once

#include "SocketAcceptor.h"
#include "ProcessQueue.h"

using namespace IO_MSG;
using namespace SERVER_SOCKET;


namespace MonitorObject
{
  //class ClientRequestHandler : std::enable_shared_from_this<ClientRequestHandler>
  class ClientRequestHandler
  {
  private:
    std::unique_ptr<SocketAcceptor>     pConnect;
    std::unique_ptr<ProcessQueue>       pQueue;
    std::shared_ptr<Socket>             pSocket;
    thread*                             pListen;
    thread*                             pRespond;

  public:
    /* constructor */                   ClientRequestHandler();
    /* destructor */                    ~ClientRequestHandler();

    /*!
    * Description : This function will help to open the server connection for always listening.
    */
    void                                Setup();
    void                                Cleanup();
    void                                PlaceOrder(std::shared_ptr<IMessage> msg);
    void                                SendResponse(std::shared_ptr<IMessage> pMsg);
    /*!
    * Description : This function is the starting point of new thread.
    */
    static void                         Respond(void* ptr);
    static void                         Run(void* ptr);
  };
}
#endif // !_CLIENT_REQUEST_HANDLER_H_

ClientRequestHandler.cpp

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/

#include "ClientRequestHandler.h"
//#include "GlobalFactory.h"

namespace MonitorObject
{
  ClientRequestHandler::ClientRequestHandler() : pListen(nullptr), pRespond(nullptr)
  {
    LOG_IT(SETUP_INFO, "Construction Process: ClientRequestHandler has begin");
    pConnect = std::make_unique<SocketAcceptor>();
    pQueue = std::make_unique<ProcessQueue>();
    if (!pConnect->IsServerSocketCreated())
    {
      LOG_IT(ERROR_MSG, "Construction Process: Setting up the server socket has been failed");
    }
    pSocket = std::make_shared<Socket>();
  }

  ClientRequestHandler::~ClientRequestHandler()
  {
    LOG_IT(SETUP_INFO, "Cleanup Process: of ClientRequestHandler has begin");
    Cleanup();
    if (pListen != nullptr)
    {
      delete pListen;
      pListen = nullptr;
    }
  }

  void ClientRequestHandler::Setup()
  {
    LOG_IT(SETUP_INFO, " ClientRequestHandler is about to create a new listening thread.");
    pListen = new std::thread(ClientRequestHandler::Run, this);
    pRespond = new std::thread(ClientRequestHandler::Respond, this);
    if (pConnect->IsServerSocketCreated())
    {
      pConnect->MultiClientAccept();
    }
  }

  void ClientRequestHandler::Cleanup()
  {
    if(pListen->joinable())
      pListen->join();
  }

  void ClientRequestHandler::Run(void *ptr)
  {
    LOG_IT(SETUP_INFO, " ClientRequestHandler Created new listening thread.");
    ClientRequestHandler* thisPtr = static_cast<ClientRequestHandler *>(ptr);
    Queue <shared_ptr<IMessage > >& Q = Socket::GetClientRequests();
    while (true)
    {
      std::unique_lock<std::mutex> lockIt(Q.GetMutex());
      Q.GetNotified().wait(lockIt, [&]() {return  !Q.IsEmpty(); });
      shared_ptr<IMessage > pMsg = Q.Front();
      Q.Remove();
      thisPtr->PlaceOrder(pMsg);
    }
  }

  void ClientRequestHandler::PlaceOrder(std::shared_ptr<IMessage> pMsg)
  {
    LOG_IT(SETUP_INFO, "Placing customer order");
    pQueue->PlaceOrder(pMsg);
    
  }

  void ClientRequestHandler::Respond(void* ptr)
  {
    LOG_IT(SETUP_INFO, " Respond loop has begin ");
    ClientRequestHandler* thisPtr = static_cast<ClientRequestHandler*>(ptr);
    while (true)
    {
      thisPtr->SendResponse(thisPtr->pQueue->GetConfirmation());
    }
  }

  void ClientRequestHandler::SendResponse(std::shared_ptr<IMessage> pMsg)
  {
    LOG_IT(SETUP_INFO, "Sending response back to the customer");
    pSocket->SendToClient(pMsg->GetResponse().c_str(), stoi(pMsg->GetClientSocket()));
  }

}

HandleOrder.h

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/
#ifndef _HANDLE_ORDER_H_
#define _HANDLE_ORDER_H_
#pragma once

#include "WriteOnlyFile.h"
#include "IMessage.h"

using namespace FILE_IO;
using namespace IO_MSG;

namespace MonitorObject
{
  /*!
  * Description : When IHandlePurchaseOrder's concrete implementation completes its execution. it 
                  can acquier a write lock on FutureObject (Which clients owns through ServerProxy::
                  GetConfirmation method) object and update it's value.
                - Any client thread who is waiting for result, can wake-up and see it after this.
  */
  class IHandlePurchaseOrder
  {
  public:
    virtual void                            PlaceOrder(std::shared_ptr<IMessage> msg) = 0;
    virtual void                            OrderResponse(std::shared_ptr<IMessage> msg) = 0;
    virtual                                 ~IHandlePurchaseOrder();
  };

  class DomesticOrder : public IHandlePurchaseOrder
  {
  private:
    void                                                  LogOrder(std::shared_ptr<IMessage> msg, std::unique_ptr<IWriteOnlyFile>& pFile);
  public:
    virtual void                            PlaceOrder(std::shared_ptr<IMessage> msg) override;
    virtual void                            OrderResponse(std::shared_ptr<IMessage> msg) override;
  };
  
}
#endif // end of _HANDLE_ORDER_H_

HandleOrder.cpp

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/

#include "HandleOrder.h"
#include "GlobalFactory.h"

using namespace FILE_IO;
using namespace SESSION;

static const string RESPONSE = "Your order is placed ";

namespace MonitorObject
{
  IHandlePurchaseOrder::~IHandlePurchaseOrder()
  {
    LOG_IT(SETUP_INFO, "Cleanup Process: of IHandlePurchaseOrder base class.");
  }

  void DomesticOrder::PlaceOrder(std::shared_ptr<IMessage> msg)
  {
    LOG_IT(SETUP_INFO, "placing the order at server end");
    std::string filePath = FactoryInstance::GetGlobalFactory()->GetFileName();
    filePath = filePath.substr(0, filePath.find_last_of("\\"));
    filePath = filePath + "\\" + msg->GetClientCode() + "\\" + msg->GetClientSocket() + ".log";
    std::unique_ptr<IWriteOnlyFile> pFile = std::make_unique<WriteOnlyFile>(filePath, KEEP_OLD_CONTENT_ON_EVERY_RELOAD, false);
    LogOrder(msg, pFile);
  }

  void DomesticOrder::LogOrder(std::shared_ptr<IMessage> msg, std::unique_ptr<IWriteOnlyFile>& pFile)
  {
    LOG_IT(INFO, "Create a folder and file at server, with client's request massage");
    string response = msg->GetMsg();
    response = RESPONSE + response;
    msg->SetResponse(response);
    pFile->Write(response.c_str());
  }

  void DomesticOrder::OrderResponse(std::shared_ptr<IMessage> msg)
  {
    LOG_IT(SETUP_INFO, "Responding to clients order");
    string finalResponse = msg->GetResponse();
    finalResponse = finalResponse + "You will receive it within one day";
    msg->SetResponse(finalResponse);
  }
}

Main.cpp

/***************************************************************************************************
//   Author:    Kunjesh Singh Baghel
//   Web-page:  https://sites.google.com/site/kunjeshsinghbaghel/
***************************************************************************************************/

#include "ClientRequestHandler.h"
#include "GlobalFactory.h"

using namespace MonitorObject;
using namespace SESSION;

void Test_MonitorObject()
{
  std::unique_ptr<ClientRequestHandler> pClientRequestHandler = std::make_unique<ClientRequestHandler>();
  pClientRequestHandler->Setup();
}


int main()
{
  FactoryInstance::GlobalFactoryInit(); // This will initialize all global mandatory modules.
  Test_MonitorObject();
  FactoryInstance::GlobalFactoryCleanUp();

  return 0;
}


Thanks for reading it. To learn more about design patterns and basic design principles, please see my web page.

Comments

Popular posts from this blog

Non-virtual interface idiom (NVI)

Architectural patterns => Mud to structure => layers.

Architectural style -> Adoptable system -> Reflection.