Implementing the Actor-Model in C#

After learning the basics (isolated state, communication and computation) of the the Actor-Model I I wrote my own implementation and I'm ready for a through code review.

You will be able to see that in the test code I've created 2 tasks which are simultaneously sends multiple messages to an instance of the Actor class.

What I'm wondering the most is if the use of threading within the actor is correct for processing the incoming messages or should I process messages without a thread.

Full code:

using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks;  namespace ActorModel {     /// <summary>     /// Client/Test Code     /// </summary>     public static class Program     {         private static void Main()         {             var actor = new Actor<string>(x => Console.WriteLine(x.ToUpper())).Start();             var task1 = Task.Factory.StartNew(() => SendMessagesToActor(actor));             var task2 = Task.Factory.StartNew(() => SendMessagesToActor(actor));             Task.WaitAll(new[] {task1, task2});             Console.WriteLine();             Console.WriteLine();             Console.WriteLine("PRESS ENTER TO STOP THE ACTOR");             Console.ReadLine();              actor.Stop();         }          private static void SendMessagesToActor(Actor<string> actor)         {             var counter = 0;             while (counter < 5)             {                 actor.Send(String.Format("message #[{0}] from thread #[{1}]", counter,                                          Thread.CurrentThread.ManagedThreadId));                 Thread.Sleep(100); // To avoid of OutOfMemory issues                 counter++;             }         }     }      /// <summary>     /// Actor-Based Class      /// </summary>     /// <remarks>basics: Immutability, Communication and Computation</remarks>     /// <typeparam name="TMessage"></typeparam>     public class Actor<TMessage> where TMessage : class     {         private volatile bool _started;         private readonly Action<TMessage> _messageHandler;         private readonly ConcurrentQueue<TMessage> _messagesQueue; // Consider replace with TPL workflow         private readonly Task _processingTask;         private readonly CancellationTokenSource _source;          public Actor(Action<TMessage> messageHandler)         {             if (messageHandler == null)             {                 throw new ArgumentNullException("messageHandler");             }             _messagesQueue = new ConcurrentQueue<TMessage>();             _messageHandler = messageHandler;              _source = new CancellationTokenSource();             _processingTask = new Task(() => ProcessMessages(_source.Token), _source.Token, TaskCreationOptions.LongRunning);         }          //---------------------------------------------------------------------------------------------------------------------------------------------------          public Actor<TMessage> Start()         {             if (!_started)             {                 _processingTask.Start();                 _started = true;             }              return this;         }          public void Stop()         {             Console.WriteLine("PROCESSING STOP REQUESTED");             _source.Cancel();         }          //---------------------------------------------------------------------------------------------------------------------------------------------------          public void Send(TMessage message)         {             _messagesQueue.Enqueue(message); // any capacity bounding is required here?         }          //---------------------------------------------------------------------------------------------------------------------------------------------------          private void ProcessMessages(CancellationToken ct)         {             while (true)             {                 if (_messagesQueue.Count > 0)                  {                     TMessage message;                     var hasRemoved = _messagesQueue.TryDequeue(out message);                      if (hasRemoved)                     {                         _messageHandler(message);                     }                      continue;                 }                  if (ct.IsCancellationRequested)                 {                     Console.WriteLine("PROCESSING STOPED");                     return;                 }             }         }     } } 


There is one drawback(?) in your actor, in some cases it can't be interrupted.
Actor can't be stopped untill there are tasks in inner queue.
So if some outer threads will enque new tasks little bit faster then actor can process it, then you wan't be able to stop your actor.
If it's an actors feature - ok, else you can fix it by removing continue or placing cancellation check at the top of the loop.

Category: c# Time: 2016-08-02 Views: 87

Related post

iOS development

Android development

Python development

JAVA development

Development language

PHP development

Ruby development


Front-end development


development tools

Open Platform

Javascript development

.NET development

cloud computing


Copyright (C), All Rights Reserved.

processed in 0.285 (s). 12 q(s)