This Blog Is Moving

February 6, 2013 at 10:02 AMAdministrator

All – I am abandoning this blog and will begin posting to it’s newer version at http://www.codepiphany.com.  I’ll leave this blog up for a while so the links don’t become broken, but I encourage everyone to please start following me at the new link.

Thank you for your interest!

Posted in:

Tags:

Using SpeakToMe – A First Project

December 3, 2012 at 2:51 PMAdministrator

 

As I announced some time ago, I have released my natural language processor to Codeplex.  The documentation on the site is fairly complete, but I wanted to take a different approach on my blog by going at it in more a tutorial fashion starting from downloading the binaries and integrating them into your project.  From there, I will be occasionally posting some tips and tricks that show how flexible the engine is and how to get the most out of it.  Let’s have a look at how to get going with the engine.

Obtaining the Binaries

The first step in building a new project with SpeakToMe is to obtain the binaries containing the types we’ll need.  To get them, go to the Codeplex site’s download page and download the binaries release (SpeakToMe_bin_2012_10_19 as of this writing).  Once the zip file is downloaded, extract the files to a place where you can find them, later.  The zip file should contain three assemblies: Microsoft.Practices.Prism.dll, SpeakToMe.Core.dll and SpeakToMe.Speech.dll.

Create a New Project

Start Visual Studio and create a new WPF project.  The new project dialog should look similar to the below.

Sample Project

Click OK to generate the project.

We need to set reference to the SpeakToMe assemblies.  To do that, create a sub-folder in your new project (I called mine ThirdParty) and copy the three assemblies, mentioned above, into the folder.  Next, in the project references, add references to the three assemblies.

Supporting Users and Conversations

There are a couple of pieces of plumbing we need to build before we get into the fun stuff.  One of these is a data layer that knows how to access information about users and conversations.  Why is this necessary?  SpeakToMe has built in support for multiple users and multiple ways for users to access the engine.  If multiple conversations are going on, the engine needs to be able to figure out how to get a response back to the correct user.  If one user is accessing the system via email and another via chat, there must be a way to insure the correct channel is used to send the response.  So, for the purposes tracking this information, some form of data storage must be implemented.

SpeakToMe does not dictate how you do this.  Instead, it provides an interface describing the needed functionality which you must implement.  There are also types the engine expects to work with which are defined for you.  Let’s see how to pull this all together by building the necessary parts.

Creating a Data Store

The example code that is included in the SpeakToMe source download includes an embedded database implementation.  We’ll use the same schema to create a new database on a local instance of SQL Express.

I have created a new database named SpeakToMe and have created three tables in that database named “Users”, “Conversations” and “ConversationHistory”.  The schema looks like this.

SpeakToMe_Schema

Let’s take a moment to discuss the data that will be stored here and what each column holds.  First of all, each table has an integer key column that is auto-incrementing.  Now, starting with the Users table, the column are pretty self-explanatory where the user name is for display in tooling and the first and last name can be used anywhere.  Next, look at the conversation table.  Each entry in this table describes a single conversation with a single user.  The initiated and active columns describe when the conversations started and whether or not it is still active.  In some cases we are able to determine whether the user has ended the conversation and in other situations we cannot.  If a user ends an IM session with us, we can assume the conversation is over.  If we get an email from a user, we can’t determine whether or not we’ll receive another, so conversations for this type of communication tend to just stay open.

NOTE: I won’t be discussing building functionality for communicating over email and IM in this article, but stay tuned!

The Mode and Address columns are used to determine what channel the communication came from.  Mode is defined as an enumeration in the Core assembly, so we know how we got the message.  However, what if a single user initiates two distinct conversations from two different email accounts?  We include address, here, to make the distinction between the two.  Lastly, UserId denotes the user we’re talking to.

The ConversationHistory table contains each individual communication between the user and the system.  Message and MessageDateTime contain the text of the message and when it was sent.  UserInitiated is true if the message came from the user and is false if it originated with the system.  Tag and TagType are used to store a piece of state along with a particular message so it can be retrieved and referenced later on in the conversation.

Be sure to add the foreign keys between the tables so you’ll be able to access table references on your entities.  I find that creating a Database Diagram is an easy way to do this.

dbdiag

 

Creating a Data Access project.

For this example, I’m going to be using the Entity Framework to access data so I need to create a class library project that will contain my data access code and create a conceptual model.

Add a new class library project to the solution you started.  I’m calling mine “SpeakToMe.Sample.Data”.  Next, delete the class1.cs file that is generated for you.

Now, we need to add the Entity Framework to our data project.  We’ll use NuGet to do this by right-clicking on the references note in the project and selecting “Manage NuGet packages…”.  When the dialog appears, enter “entity framework” into the search box.  The list should populate.  Select the entry in the list entitled “EntityFramework” and click the install button.

addef

After the libraries have been installed into your project, close the NuGet dialog.  Now we need to create our model from the database.  Right-click your project and select Add->New Item.  Select “ADO.Net Entity Data Model” and specify a name for your model.  I named mine “SpeakToMe”.  Click the “Add” button.

AddModel

In the wizard, select “Generate from database” and click next.  Now, click the Create New Connection button and select your server (local for me) and the database you created.

CreateConnection

Click the “Test Connection” button to ensure the information is correct.  If so, click OK.  As a convenience, you might want to leave the Save Connection String checkbox checked.  This will create the connectionstring entry in your config file for use later.

save_connection

Click Next and wait for the database information to populate in the next screen.  Now, drill down to the tables you created and select all of them.

TableList

Click “Finish” and your model will be created.

Implementing the IUserData Interface

We need to add a class, now, that implements the IUserData interface.  In your data project, add a class named UserData and add the attributes shown below.

UserData Class
  1. namespace SpeakToMe.Sample.Data
  2. {
  3.     [Export(typeof(IUserData))]
  4.     [PartCreationPolicy(System.ComponentModel.Composition.CreationPolicy.Shared)]
  5.     public class UserData : IUserData
  6.     {
  7.         public Core.Models.ConversationHistory AddConversationHistory(int conversationId, string text, string tagString, string tagType, bool userInitiated)
  8.         {
  9.             throw new NotImplementedException();
  10.         }
  11.  
  12.         public Core.Models.Conversation CreateConversation(int userId, Core.Enums.ConversationType type, string address)
  13.         {
  14.             throw new NotImplementedException();
  15.         }
  16.  
  17.         public Core.Models.User CreateUser(string userName, string firstName, string lastName)
  18.         {
  19.             throw new NotImplementedException();
  20.         }
  21.  
  22.         public Core.Models.Conversation GetConversation(int userId, Core.Enums.ConversationType type, string address)
  23.         {
  24.             throw new NotImplementedException();
  25.         }
  26.  
  27.         public List<Core.Models.ConversationHistory> GetConversationHistory(int conversationId)
  28.         {
  29.             throw new NotImplementedException();
  30.         }
  31.  
  32.         public Core.Models.User GetUserById(int id)
  33.         {
  34.             throw new NotImplementedException();
  35.         }
  36.     }
  37. }

Note that you’ll need to set references to SpeakToMe.Core and System.ComponentModel.Composition in order to be able to successfully compile this class.  As for the attributes on the class, these allow the use of Microsoft Extensibility Framework (MEF).  SpeakToMe uses MEF as a IoC container and the engine will use it to obtain an instance of this class when it needs to.

Now, we need to implement the methods on the interface.  Below is my implementation.

Completed UserData Class
  1. using SpeakToMe.Core.Interfaces;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.ComponentModel.Composition;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8.  
  9. namespace SpeakToMe.Sample.Data
  10. {
  11.     [Export(typeof(IUserData))]
  12.     [PartCreationPolicy(System.ComponentModel.Composition.CreationPolicy.Shared)]
  13.     public class UserData : IUserData
  14.     {
  15.         public Core.Models.ConversationHistory AddConversationHistory(int conversationId, string text, string tagString, string tagType, bool userInitiated)
  16.         {
  17.             using (var ctx = new SpeakToMeEntities())
  18.             {
  19.                 //create new database entry using entity defined by EF
  20.                 SpeakToMe.Sample.Data.ConversationHistory ch = new ConversationHistory
  21.                 {
  22.                     ConversationId = conversationId,
  23.                     Message = text,
  24.                     MessageDateTime = DateTime.Now,
  25.                     Tag = tagString,
  26.                     TagType = tagType,
  27.                     UserInitiated = userInitiated
  28.                 };
  29.  
  30.                 ctx.ConversationHistories.Add(ch);
  31.                 ctx.SaveChanges();
  32.  
  33.                 //return an instance of the entity.  Note that this is not the entity defined by EF, but the one defined in SpeakToMe.Core which is the only one the engine would know about.
  34.  
  35.                 SpeakToMe.Core.Models.ConversationHistory conversationHistory = new Core.Models.ConversationHistory
  36.                 {
  37.                     ID = ch.ConversationId,
  38.                     Message = ch.Message,
  39.                     MessageDateTime = ch.MessageDateTime,
  40.                     Tag = ch.Tag,
  41.                     TagType = ch.TagType,
  42.                     UserInitiated = ch.UserInitiated
  43.                 };
  44.  
  45.                 return conversationHistory;
  46.  
  47.             }
  48.  
  49.         }
  50.  
  51.         public Core.Models.Conversation CreateConversation(int userId, Core.Enums.ConversationType type, string address)
  52.         {
  53.             using (var ctx = new SpeakToMeEntities())
  54.             {
  55.                 Conversation conv = new Conversation
  56.                 {
  57.                     Active = true, //must be true if we're creating a conversation
  58.                     Address = address,
  59.                     Initiated = DateTime.Now,
  60.                     Mode = (int)type,
  61.                     UserId = userId
  62.                 };
  63.  
  64.                 ctx.Conversations.Add(conv);
  65.                 ctx.SaveChanges();
  66.  
  67.                 Core.Models.Conversation conversation = new Core.Models.Conversation
  68.                 {
  69.                     Active = conv.Active,
  70.                     Address = conv.Address,
  71.                     ID = conv.ConversationId,
  72.                     Initiated = conv.Initiated,
  73.                     Mode = (Core.Enums.ConversationType) conv.Mode,
  74.                     UserId = conv.UserId
  75.                 };
  76.  
  77.                 return conversation;
  78.             }
  79.         }
  80.  
  81.         public Core.Models.User CreateUser(string userName, string firstName, string lastName)
  82.         {
  83.             using (var ctx = new SpeakToMeEntities())
  84.             {
  85.                 User usr = new User
  86.                 {
  87.                     FirstName = firstName,
  88.                     LastName = lastName,
  89.                     UserName = userName
  90.                 };
  91.  
  92.                 ctx.Users.Add(usr);
  93.                 ctx.SaveChanges();
  94.  
  95.                 Core.Models.User user = new Core.Models.User
  96.                 {
  97.                     FirstName = usr.FirstName,
  98.                     LastName = usr.LastName,
  99.                     ID = usr.UserId,
  100.                     UserName = usr.UserName
  101.                 };
  102.  
  103.                 return user;
  104.             }
  105.         }
  106.  
  107.         public Core.Models.Conversation GetConversation(int userId, Core.Enums.ConversationType type, string address)
  108.         {
  109.             using (var ctx = new SpeakToMeEntities())
  110.             {
  111.                 Core.Models.Conversation conversation = ctx.Conversations.Where(c => c.UserId == userId && c.Mode == (int)type && c.Address == address).Select(c => new Core.Models.Conversation
  112.                     {
  113.                         Active = c.Active,
  114.                         Address = c.Address,
  115.                         ID = c.ConversationId,
  116.                         Initiated = c.Initiated,
  117.                         Mode = (Core.Enums.ConversationType)c.Mode,
  118.                         UserId = c.UserId
  119.                     }).FirstOrDefault();
  120.  
  121.                 return conversation;
  122.  
  123.             }
  124.         }
  125.  
  126.         public List<Core.Models.ConversationHistory> GetConversationHistory(int conversationId)
  127.         {
  128.             List<Core.Models.ConversationHistory> messages = new List<Core.Models.ConversationHistory>();
  129.             using (var ctx = new SpeakToMeEntities())
  130.             {
  131.                 var msgs = ctx.ConversationHistories.Where(c => c.ConversationId == conversationId);
  132.  
  133.                 msgs.ToList().ForEach(m =>
  134.                     {
  135.                         messages.Add(new Core.Models.ConversationHistory
  136.                             {
  137.                                 ID = m.ConversationHistoryId,
  138.                                 Message = m.Message,
  139.                                 MessageDateTime = m.MessageDateTime,
  140.                                 Tag = m.Tag,
  141.                                 TagType = m.TagType,
  142.                                 UserInitiated = m.UserInitiated
  143.                             });
  144.                     });
  145.  
  146.                 return messages;
  147.             }
  148.         }
  149.  
  150.         public Core.Models.User GetUserById(int id)
  151.         {
  152.             using (var ctx = new SpeakToMeEntities())
  153.             {
  154.                 var usr = ctx.Users.Where(u => u.UserId == id).FirstOrDefault();
  155.  
  156.                 if (usr == null)
  157.                     return null;
  158.  
  159.                 return new Core.Models.User
  160.                 {
  161.                     FirstName = usr.FirstName,
  162.                     LastName = usr.LastName,
  163.                     ID = usr.UserId,
  164.                     UserName = usr.UserName
  165.                 };
  166.             }
  167.         }
  168.     }
  169. }

 

Presence

With SpeakToMe, a particular presence is a mode of communication.  In this post, I have already mentioned email and IM.  You could also implement SMS or any other communication protocol you desire.  In this sample, we’re going to create a simple test presence that will serve as the channel between our WPF application and SpeakToMe.  In a later post, I’ll describe how to build an XMPP (IM) presence.

Let’s create another new class library project to hold our presence class.  I’m calling mine SpeakToMe.Sample.Presence.  Delete the Class1.cs file that is created by default and create a new class called TestPresence.  Now add a references to SpeakToMe.Core and System.ComponentModel.Composition and make your class declaration look like this.

Code Snippet
  1. [Export(typeof(IPresence))]
  2. [PartCreationPolicy(CreationPolicy.Shared)]
  3. public class TestPresence : IPresence
  4. {
  5.     public void Initialize()
  6.     {
  7.         throw new NotImplementedException();
  8.     }
  9.  
  10.     public bool IsConnected
  11.     {
  12.         get { throw new NotImplementedException(); }
  13.     }
  14.  
  15.     public void ProcessCommand(string command, int userId, ISmartHomeServiceCallback callback)
  16.     {
  17.         throw new NotImplementedException();
  18.     }
  19.  
  20.     public void OnImportsSatisfied()
  21.     {
  22.         throw new NotImplementedException();
  23.     }

A couple of things to note, here.  First of all, you can tell from the class attributes that SpeakToMe is going to use MEF to load an instance of this class when it’s needed.  Also, this class is implementing another interface that is provided in SpeakToMe.Core.  Any class that will serve as another presence for the system will need to be exported this way and will also need to implement this same interface.  This approach allows you to add as many communication mechanisms as you like without having to touch the source code of the library. 

When implementing the interface, the Initialize method should be used to get everything set up for listening to a channel and sending messages through it.  For example, connecting to an email server or an XMPP server.  The IsConnected property should report whether or not the channel is open for communication.  If a connection is lost, for example, the property should return false.  All processing by the way of sending a message into the SpeakToMe engine should be handled in the ProcessCommand method.  This method has a void return type because we will be notified via an event when the response is ready to be sent back to the user.  Lastly, the OnImportsSatisfied will be called when MEF has supplied all the instances we have requested of it.  More on this in a second.  Now, set references to the SpeakToMe.Core, SpeakToMe.Speech and Microsoft.Practices.Prism assemblies we copied into the third party folder earlier.  Here is my implementation of the TestPresence class.

Completed TestPresence Class
  1. [Export(typeof(IPresence))]
  2.     [PartCreationPolicy(CreationPolicy.Shared)]
  3.     public class TestPresence : IPresence
  4.     {
  5.         [Import]
  6.         public IEventAggregator EventAggregator { get; set; }
  7.  
  8.         public bool IsConnected
  9.         {
  10.             get { return true; }
  11.         }
  12.  
  13.         public void Initialize()
  14.         {
  15.             //do nothing
  16.         }
  17.  
  18.         private void ReplyToChannelEventHandler(ReplyToChannelEventArgs args)
  19.         {
  20.             if (args.Mode == ConversationType.Test)
  21.             {
  22.                 //ConversationData.CreateConversationHistory(args.ConversationId, args.Reply, args.TagString, args.Tag, false);
  23.  
  24.                 if (args.Callback != null)
  25.                 {
  26.                     args.Callback.ReturnResult(args.Reply);
  27.                 }
  28.             }
  29.         }
  30.  
  31.         public void OnImportsSatisfied()
  32.         {
  33.             this.EventAggregator.GetEvent<ReplyToChannelEvent>().Subscribe(ReplyToChannelEventHandler);
  34.         }
  35.  
  36.         public void ProcessCommand(string command, int userId, ISmartHomeServiceCallback callback)
  37.         {
  38.             var processor = ServiceLocator.GetInstance<CommandProcessor>();
  39.             processor.ProcessCommand(command, userId, ConversationType.Test, "", callback);
  40.         }

Off the bat, you’ll notice the EventAggregator property and its associated [Import] attribute.  The attribute is one way we can ask MEF for an instance of a type.  If you look at the OnImportsSatisfied method, you’ll see we’re setting up an event listener there.  We want to do this setup here, because we are using an instance of the EventAggregator type and we want to make sure MEF has given it to us before we start using its reference.  For our sample, we’ll be calling ProcessCommand directly from our client and this method will pass the message on to the NLP engine for processing.  When processing is complete, we’ll be called back on the ReplyToChannelEventHandler where we call the passed in callback to send the reply back to the caller.  For other communication types, we might be sending an email or IM message instead of calling the callback.

The Bootstrapper

I mentioned, earlier, that SpeakToMe uses MEF to load types.  MEF provides an easy way to extend a system without having to put explicit code dependencies in place.  If you’re not familiar with MEF and how it works, I’d suggest you take a bit of time to look at the documentation here.

In a nutshell, however, MEF utilizes a catalog of types.  In this case, we need to point MEF at assemblies we want it to look at.  It will go through the types in the assembly and identify any that have an Export attribute on them.  These are then added to the catalog.  Now, when code needs an instance of the type, it can ask MEF to get one from the catalog.

There are many ways to specify to MEF which assemblies to include.  In our case, we’re going to do this in code.  SpeakToMe includes a BootStrapperBase class that automatically loads the assemblies it knows about.  But, hey, we just added those data access and presence projects and we need a way to tell MEF about them.  This is what the Bootstrapper class is for.  I should also note, however, that it is absolutely required that you implement the bootstrapper whether you have additional assemblies to load or not because without doing so, the default assemblies will never be loaded. 

In the WPF application you created at the beginning of this exercise, create a new class and call it Bootstrapper.  You’ll need to set a reference from that project to SpeakToMe.Speech.  Now, have the new class inherit from BootStrapperBase and override the AddCustomAssembliesToCatalog method

Bootstrapper
  1. public class Bootstrapper : BootStrapperBase
  2.     {
  3.         protected override void AddCustomAssembliesToCatalog(System.ComponentModel.Composition.Hosting.AggregateCatalog catalog)
  4.         {
  5.             base.AddCustomAssembliesToCatalog(catalog);
  6.         }
  7.     }

 

It is in this overridden method that we will add our custom assemblies like so.

Adding Assemblies To Catalog
  1. catalog.Catalogs.Add(new AssemblyCatalog(typeof(UserData).Assembly));
  2. catalog.Catalogs.Add(new AssemblyCatalog(typeof(TestPresence).Assembly));

You’ll need to add references to your projects in order to get a clean compile.

The client

Now that we’ve added all the required plumbing, we can start working on the client we’re going to use to communicate with the engine.  I’m going to start with the default MainWindow.xaml file that was created when we created our WPF project and add a ListBox, TextBox and Button to it like this.

window

The listbox is at the top a will contain our conversation.  The textbox is at the bottom and will be used to specify our input.  The button sends the input to the engine.  Now let’s name the controls and clean up a bit.  Here is the resulting Xaml.

Main Window Xaml
  1. <Window x:Class="SpeakToMe.Sample.MainWindow"
  2.         xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
  3.         xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
  4.         Title="MainWindow" Height="350" Width="525">
  5.     <Grid>
  6.         <ListBox HorizontalAlignment="Left" Height="248" Margin="10,10,0,0" VerticalAlignment="Top" Width="497" x:Name="Conversation"/>
  7.         <TextBox HorizontalAlignment="Left" Height="34" Margin="25,275,0,0" TextWrapping="Wrap" Text="" VerticalAlignment="Top" Width="397" x:Name="Message"/>
  8.         <Button Content="Say" HorizontalAlignment="Left" Margin="432,275,0,0" VerticalAlignment="Top" Width="75" Height="34" x:Name="SayButton"/>
  9.  
  10.     </Grid>
  11. </Window>

Let’s do some work in the code behind of this window, now.

First, let’s add some code to the constructor to create and initialize our MEF catalog using the bootstrapper class we created.

Code Snippet
  1. public MainWindow()
  2.       {
  3.           Bootstrapper bs = new Bootstrapper();
  4.           bs.Initialize();
  5.           InitializeComponent();
  6.       }

Now, we need to add an event handler to the button so we can do something when it’s clicked.

Here is the class after adding the event handler.

Code Snippet
  1. using SpeakToMe.Core;
  2. using SpeakToMe.Core.Interfaces;
  3. using SpeakToMe.Sample.Presence;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Windows;
  10. using System.Windows.Controls;
  11. using System.Windows.Data;
  12. using System.Windows.Documents;
  13. using System.Windows.Input;
  14. using System.Windows.Media;
  15. using System.Windows.Media.Imaging;
  16. using System.Windows.Navigation;
  17. using System.Windows.Shapes;
  18.  
  19. namespace SpeakToMe.Sample
  20. {
  21.     /// <summary>
  22.     /// Interaction logic for MainWindow.xaml
  23.     /// </summary>
  24.     public partial class MainWindow : Window
  25.     {
  26.         public MainWindow()
  27.         {
  28.             Bootstrapper bs = new Bootstrapper();
  29.             bs.Initialize();
  30.             InitializeComponent();
  31.         }
  32.  
  33.         private void SayButton_Click_1(object sender, RoutedEventArgs e)
  34.         {
  35.             this.Dispatcher.Invoke(new Action(() => this.Conversation.Items.Add(Message.Text)));
  36.             IPresence presence = ServiceLocator.GetInstance<TestPresence>();
  37.             presence.ProcessCommand(Message.Text, 1, new CallbackWrapper(new Action<string>((msg) =>
  38.             {
  39.                 this.Dispatcher.Invoke(new Action(() =>
  40.                 {
  41.                     Conversation.Items.Add(msg);
  42.                 }));
  43.             })));
  44.         }
  45.     }
  46. }

 

The first thing we do after the button is clicked is to add our message to the system to the listbox.  Next, we use the ServiceLocator to get an instance of our presence class.  The ServiceLocator is a class we can use to explicitly get an instance of a class from MEF.  If you’ll recall, we exported this class and it should be in our catalog.  Once we have an instance, we call ProcessCommand on that instance and pass in the message, a userId and a callback to be called when a response is ready.  All the callback does is add the system’s response to the listbox so we can observe it.

Initializing the Data in the Database

The last task we have to do before testing is to add some data to the database.  Use whatever tools you need to to add a single user to the users table.  You can specify whatever values you like for the columns.  Just make sure that the id you pass in the ProcessCommand call matches your user’s ID.

We also need an active conversation to be manually created for this test.  Create a new record in the Conversations table for this with the initiated value set to any datetime value, active set to true, mode = 0, address null and userId set to the id of the user you created.

One last thing.  When we were creating our entity model, a connection string was placed in the app.config file for the data project.  You’ll need to copy this connection string and paste it into the app.config file of your WPF app.

Testing

That completes the coding for the test.  Be sure to set the WPF application as the startup project and press F5.  The main window should open.  Type “hello” into the textbox and press the button.  After a second or two, you should see “Hello” followed by your user’s first name appear in the listbox.  This is the response from the system.  Congratulations!  You have just created an application that makes use of the SpeakToMe NLP engine.

Conclusion

Actually, this is more of a beginning than a conclusion.  I will be posting several posts pertaining to the SpeakToMe engine in the coming months.  I’ll cover authoring new token classes, building rules and creating an XMPP presence for the engine.  Along the way, I’ll be pointing out specific best practices and tips.  Hope to have you join me.

Once again, I’d like to let everyone know that I welcome and would greatly appreciate any community participation in this effort.  Contact me through comments on this blog or the SpeakToMe Codeplex site for details on getting started.

Posted in: Natural Language Processing | MEF | .Net | Database | SpeakToMe

Tags: , , , ,

6 Ways You Win With Typescript

October 29, 2012 at 10:50 AMAdministrator

Perhaps a bit behind the wave on posting about Typescript, but I wanted to take the time to get as familiar as possible with the language and tools before weighing in.  After spending some time with it, I have to say I see no downside to using it in your web applications and I’ve identified the below ways you win by putting Typescript to use in your projects.

 

No Unlearning Of JavaScript or Learning a New Language

Typescript is a superset of JavaScript which is a unique approach to the problem when compared to something like Coffee Script.  This means you can continue to use all you already know about JavaScript and only need to learn what Typescript adds to the language.  In the end, all your Typescript compiles down to regular JavaScript which makes debugging much easier.

 

Familiar OO Concepts

There are some very talented web developers out there. Many have been using JavaScript for a very long time and have learned the idiosyncrasies of the language and how to make it mimic some object oriented behaviors by using closures and the like.  Recently, however, Microsoft has been nudging those of us developing with their tools more toward HTML5 and JavaScript.  This means that many of us are coming from an OO background and the related concepts are familiar to us.  We’re going to be more productive using these concepts.

See the below example taken from the Typescript template default script file.

Typescript Example
  1. class Greeter {
  2.     element: HTMLElement;
  3.     span: HTMLElement;
  4.     timerToken: number;
  5.     
  6.     constructor (element: HTMLElement) {
  7.         this.element = element;
  8.         this.element.innerText += "The time is: ";
  9.         this.span = document.createElement('span');
  10.         this.element.appendChild(this.span);
  11.         this.span.innerText = new Date().toUTCString();
  12.     }
  13.  
  14.     start() {
  15.         this.timerToken = setInterval(() => this.span.innerText = new Date().toUTCString(), 500);
  16.     }
  17.  
  18.     stop() {
  19.         clearTimeout(this.timerToken);
  20.     }
  21.  
  22. }
  23.  
  24. window.onload = () => {
  25.     var el = document.getElementById('content');
  26.     var greeter = new Greeter(el);
  27.     greeter.start();
  28. };

 

The class definition is being used in this example and the similarities with an OO language are evident.  Note that the class also can contain a constructor.  For me, this allows the same code file per class paradigm to be used that I’m familiar with when used in conjunction with the module keyword we’ll look at in a minute.  Notice, too, how the start and stop methods are defined.  Again, familiar to a C# developer.

Compare the above with the below JavaScript which is the result after compiling.

JavaScript Example
  1. var Greeter = (function () {
  2.     function Greeter(element) {
  3.         this.element = element;
  4.         this.element.innerText += "The time is: ";
  5.         this.span = document.createElement('span');
  6.         this.element.appendChild(this.span);
  7.         this.span.innerText = new Date().toUTCString();
  8.     }
  9.     Greeter.prototype.start = function () {
  10.         var _this = this;
  11.         this.timerToken = setInterval(function () {
  12.             return _this.span.innerText = new Date().toUTCString();
  13.         }, 500);
  14.     };
  15.     Greeter.prototype.stop = function () {
  16.         clearTimeout(this.timerToken);
  17.     };
  18.     return Greeter;
  19. })();
  20. window.onload = function () {
  21.     var el = document.getElementById('content');
  22.     var greeter = new Greeter(el);
  23.     greeter.start();
  24. };

It’s possible to create a construct in pure JavaScript that behaves the same as the Typescript class, but look at how much more complicated the syntax is.  The constructor has been replaced by an inner function and the method definitions make use of the prototype pattern.

ECMAScript 6 Features Today

I have seen negative feedback about Typescript from people who are pushing back on changes to their beloved JavaScript language.  Truth is, many of the additions Typescript makes to the language, such as lambdas and modules are already proposed for ECMAScript 6 and Typescript is making them available for developers to use today.

Code Organization

I’ve mentioned the module keyword a couple of times already.  This is a powerful construct that allows the developer much more flexibility in how they organize their script files in their projects.  Below is an example of defining module and exporting a class form it.

Module with Export
  1. module Sayings {
  2.     export class Greeter {
  3.         greeting: string;
  4.         constructor (message: string) {
  5.             this.greeting = message;
  6.         }
  7.         greet() {
  8.             return "Hello, " + this.greeting;
  9.         }
  10.     }
  11. }
  12. var greeter = new Sayings.Greeter("world");
  13.  
  14. var button = document.createElement('button')
  15. button.innerText = "Say Hello"
  16. button.onclick = function() {
  17.     alert(greeter.greet())
  18. }
  19.  
  20. document.body.appendChild(button);

Here, a module called “Sayings” is being defined and a class named “Greeter is being exported from it.  This means that we can access the Greeter class by specifying “Sayings.Greeter”.  This is an example of what Typescript calls an “internal module”.  That is a module that is explicitly defined in the code.  A second kind of module is an “external module”.  There are modules that are implicitly defined in a separate file and are referred to by the file name.  Here is an example from the Typescript documentation.

In File main.ts:
      import log = module("log");
       log.message("hello");
In File log.ts:
      export function message(s: string) {    
            console.log(s);
      }

In the log.ts file, a single function is being exported (made visible outside the module).  In main.ts, the file log.ts is being imported as a module and the exported method is being called.  When you import an external module, its code is physically copied into the file as an internal module.  Note that the log module is being imported by file name.  In fact, the module name can contain an absolute or relative path such as “C/myproject/script/logger/log” or “./logger/log” where a path that begins with “.” or “..” is determined to be a relative path.  This means that you can organize your source files in a hierarchical folder structure like you would in C# and then import them using the relative path similar to how you would use a namespace. 

Better Tooling Support

Intellisense for a dynamic language is hard to do.  Since you don’t need to specify a type when declaring a variable, when you the variable name followed by a dot, what should intellisense show you?  Over the past few versions of Visual Studio, Microsoft has attempted to populate intellisense in useful ways, but there are so many scenarios where it’s just impossible to make even an educated guess about what members might be on a type.  In Typescript, if I enter the following

Intelli-1

After typing the dot after the variable name “foo”, there is not attempt to display intellisense.  After all, what is foo and what members does it have?  However, if I specify a type for the variable such as Typescript allows, I get much more useful tooling.

Intelli-2

Not only do I now have intellisense, it also displays the the members of the string type which are what apply in this situation.  It isn’t necessary to specify types for everything.  The typescript compiler goes to great lengths to try and infer the types in your code where they are not specified.  For example, if you assign the result of a function that returns a string to a variable, that variable will be considered a string.  Likewise, if I had initialized the foo variable to a value of “”, it would also have been inferred to be a string.

Type Checking at Compile Time

JavaScript does not support runtime type checking and Typescript does not add this.  However, at compile time, if the Typescript compiler can determine the type of a variable either because the type was explicitly specified or it was inferred, the use of that variable will be checked.  This check occurs one time when your ts files are compiled.  You can still shoot yourself in the foot at runtime using the dynamic features of JavaScript, but the type checking the compiler gives you brings you one step closer to bug-free code.

 

Conclusion

This has not been an attempt to exhaustively cover the features of Typescript but simply a post to point out some of the advantages of using it.  I hope to post more about specific features and their use in the near future.

Posted in: .Net | ASP.Net | JavaScript | Typescript

Tags: , , ,

My Natural Language Processing Engine is Now Open Source

October 20, 2012 at 1:16 PMAdministrator

After a lot of thought, I have decided to make my Natural Language Engine available as an open source project on Codeplex.  My reservations with doing so in the past had to do with my concern over the quality of the code and also the possibility of potentially monetizing the project at some point.  This processor was written alongside my smart home system and as such was both a bit sloppy as a prototype can be and also pretty tightly integrated with the smart home code.

My change of heart came when I realized that a stand-alone NLP engine is of little use to anyone, but once it becomes an interface to a more capable, feature rich system can be worth quite a bit.  To me, it seems better to put the NLP portion of my hobby project out there for the community to improve and innovate so we can all reap the rewards in the projects we build.  I’ve spent some time cleaning up the code and creating clean extension points.  Please have a look at http://SpeakToMe.Codeplex.com

Roadmap

  • First of all, I welcome anyone who would like to contribute to the project. Please contact me at ghostwrtrone at yahoo dot com.
  • My goal for the project is to both harden the existing project and also to innovate it by adding new features. Among the areas I’d like to see better implemented are the following.
  • · Better rule matching logic. Currently, exact matches are required on token types in the rule method signatures. I’d like a fuzzier matching algorithm so variation in input can still be matched to the proper rule.
  • · Expanded library of Tokens. There is no way to predict what domain the application will be used in and thus custom tokens will always be required. Adding more general tokens, however, would shorten the path to a completed application.
  • · Better implementation of questions. Currently, the library supports a system asking questions of the user to clarify their statements. This implementation basically places a question object in a collection along with an associated expected token list and a callback that is executed when the question is answered. This list specifies the tokens expected to be in the answer. When the answer is specified, the matching question is located, the callback is executed and the question is removed from the collection. This works ok in many situations, but there are flaws with it as well. First of all, if the answer to a question prompts the system to ask another question, the code structure of the nested callbacks is atrocious. Second, if a user has more than one question in the collection that is expecting the same token list in the answer, there is no way for the system to distinguish between the two questions.

I hope you enjoy the library. If you create a really cool project using it, please shoot me an email and tell me about it. Also, to those who wish to join in the effort, welcome. Let’s keep the project moving forward for all.

Posted in: .Net | Natural Language Processing

Tags: ,

Getting Started With TPL Dataflow–Part 5

October 12, 2012 at 7:53 AMAdministrator

To get caught up to where we are, you can read about the Dataflow architecture, Buffering Blocks, Executor Blocks and Joining Blocks in my previous posts.

In this post, I’d like to take a look at how to configure a block and what options are available.

DataflowBlockOptions

When configuring a DataflowBlock instance, you have two options.  You can accept the default behavior for the block which is usually to to process one item at a time and to do so synchronously, or you can pass in an optional second parameter to the constructor that specifies configuration options.  This second parameter is of type DataflowBlockOptions or one of its descendants (ExecutionDataflowBlockOptions or GroupingDataflowBlockOptions).  You specify your desired options by setting properties on a new instance of one of these option classes.  Below are some of the available properties and their significance.

TaskScheduler

Each block is responsible for scheduling its own work and this work will run on a TaskScheduler which represents an execution resource in TPL.  By default, a block will schedule this work to TaskScheduler.Default which targets the default ThreadPool underlying the application.  To override this behavior, set the TaskScheduler property to an instance of the abstract TaskScheduler class and the block will use that instance to schedule its work.

.Net Framework 4 included 2 built in task schedulers; the default and another that targets SynchronizationContext.  The latter allows work to be scheduled on the UI thread.  System.Threading.Tasks.Dataflow.dll includes a third option called ConcurrentExclusiveSchedulerPair which basically functions as a reader / writer lock.  Have a look at the Parallel Extensions Extras project which includes close to a dozen additional TaskScheduler implementations.

MaxDegreeOfParallelism

By default, a block will process one item at a time and buffer any messages not yet processed.  However, to exploit the real power of Dataflow, you’ll probably want to have, at least some blocks running on multiple threads.  By default, this property’s value is one, but of set to a value greater than 1, up to the specified number of items can be processed in parallel by the block.  If a value of DataflowBlockOptions.Unbounded (-1) is specified, any number of items can be processed concurrently and the actual maximum will be determined by the TaskScheduler being used by the block.  Again, this property specifies a maximum and there is no guarantee that the specified number of threads will be used.

MaxMessagesPerTask

Remember that a block wraps one or more Task instances.  By default, any task can process an unlimited number of items.  Consider a system that contains two blocks.  The first has data dropped to it and it begins processing.  When the data has been exhausted, the tasks the block used to process the data are destroyed and the block waits for more data to process.  At this point, the second block receives a huge amount of data and has saturated the machine.  If the first block receives more data to process, at this point, it wants to spin up Task instances to process the new data, but the machine is saturated.  Block one must either wait till block two runs out of data or risk oversubscribing the machine.  This type of situation can be mitigated by setting MaxMessagesPerTask (which defaults to unbounded) to a value greater than 1.  This property tells the block how many items it can process with a given Task before closing it down and creating a new Task instance.  In the above example, setting MaxMessagesPerTask to an explicit value of say 2 would allow each block to continue processing the items it has queued after waiting for the other block to process no more than 2 items.

MaxNumberOfGroups

The Grouping Blocks are capable of tracking how many groups they have produced.  By default, they can produce as many as is required since this property is set to unbounded.  By specifying a number greater than 0, the developer constrains the block to producing only that number of groups and once the maximum has been returned, the block will reject any further messages.

CancellationToken

The concept of a cancellation token is common throughout the TPL and this property allows it to be used with Dataflow as well.  Basically a CancellationToken is a conduit through which it can be communicated to a Task that is should stop processing immediately; cancel what it’s processing.  The developer can set this property to an instance of a CancellationToken.  The block monitors this token to see if Cancel has been called on it.  If it has, the block allows any currently executing Tasks to finish and accepts no more items.  The Task is then completed, which is to say the tasks Completion function is called and the Task will be in a Cancelled state.

Greedy

This Boolean property determines how a target block behaves in regard to accepting data from a source it is joined to.  As noted in the post about Joining Blocks, this behavior can have huge repercussions in how the block behaves.  In general, a block configured to be greedy will accept any data offered it and non-greedy means the block will only receive data if conditions are met.  These conditions depend on the block.  By default, all TargetBlocks are greedy.  You can override this behavior by setting this property to false.

BoundedCapacity

This property represents the maximum number of items a block can have buffered and in play at once.  If the capacity is reached, the block will not receive any further items until space becomes available.  By default, this value is Unbounded meaning that the block can buffer an unlimited number of items.  For the purposes of stability, it’s desirable to be able to limit the amount of memory any block can occupy with its buffered items.  By specifying an explicit maximum number of items a particular block can reference at any one time, the amount of memory the block can use can be enforced.

Another interesting case would be where a single SourceBlock is linked to several TargetBlocks.  If all the TargetBlocks are configured as greedy and an unbounded capacity, the SourceBlock will offer all messages to the first TargetBlock which will happily accept them all, starving the other TargetBlocks.  By setting a relatively small BoundedCapacity for each of the Target Blocks, balance control can be established.

Static Methods

Other than the functionality exposed by the ITargetBlock and ISourceBlock interfaces, there are common methods developers use when working with Dataflow.  The following methods are exposed on the DataflowBlock class.

Choose

Choose allows multiple sources to be specified along with an action delegate for each. It will atomically accept one and only one message across all of the sources, executing the associated action delegate with the received data.

Encapsulate

Creates a propagator block from a source block and a target block

LinkTo

The ISourceBlock<TOutput> interface provides a LinkTo method for connecting the source to a target, but extension methods are provided for additional control over linking. Such functionality includes being able to add a filter predicate to a link in order to control what data is propagated across what links and what the behavior should be if the predicate is not met (e.g. should such a message simply be dropped, should such a message be declined and offered to other targets, etc.)

OutputAvailableAsync

o An extension method that asynchronously informs a consumer when data is available on a source block or when no more data will be available. Data is not removed from the source.

Post

An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.

SendAsync

An extension method that asynchronously sends to target blocks while supporting buffering. A Post operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline. SendAsync enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.

Receive

An extension method that aynchronously receives data from a source, blocking until data is available or the block informs the operation that no data will be available. The blocking may be timed out or canceled through parameters passed to the Receive method.

ReceiveAsync

The same as Receive, except asynchronously. Rather than returning instances of TOutput, ReceiveAsync returns instances of Task<TOutput>.

AsObservable

An extension method that creates an IObservable<T> for an ISourceBlock<T>. Any IObserver<T> instances subscribed to the IObservable<T> will be broadcast data removed from the source block.

AsObserver

An extension method that creates an IObserver<T> for an ITargetBlock<T>. OnNext calls on the observer will result in the data being sent to the target, OnError calls will result in the exception faulting the target, and OnCompleted calls will result in Complete called on the target.

There are a plethora of interesting extension methods that could be built on top of the ISourceBlock<TOutput> and ITargetBlock<TInput> interfaces. The methods exposed from DataflowBlock represent a common subset and are those expected to be most useful to developers building solutions that incorporate dataflow blocks.

Next time we’ll take a look at debugging tips.

Posted in: .Net | TPL Dataflow

Tags: ,

Getting Started with TPL Dataflow–Part 4

October 12, 2012 at 7:50 AMAdministrator

Up till now, in this series we’ve looked at the architecture of TPL Dataflow, BufferingBlocks and ExecutorBlocks.  Continuing to look at the blocks supplied with the library, today we’ll look at JoiningBlocks.

Joining Blocks

In trying to think of a good way to explain how the various parts of Dataflow work as compared to something the reader may already be familiar with, I find that to me it seems similar to Windows Workflow in that we can build our process as distinct blocks of functionality and then compose them together into a flow that we can exert some degree of control over.  However, when one considers the TransformManyBlock and the joining blocks we’ll look at, below, I can also see similarities in how a multi-threaded application might be written where new threads are started on then rejoined to the main thread.

The TransformManyBlock takes a piece of data and then returns any number of data items thereby splitting execution across many instances of the same type of block or possibly across different block types.  There may also be a need for the developer to join these many branches back into a single (maybe synchronous) pipeline and that’s where Joining Blocks come into play.

BatchBlock

This block takes a number of input items and then batches them up into a single return item represented by an array of items.  Said another way, this block is an ISourceBlock<T[]> and an ITargetBlock<T>.  When an instance is constructed, the developer specifies a batch size.  Once the block has received enough items to complete a batch of that size, it returns that batch.  If the block receives a signal that it should discontinue processing, whatever items it has buffered will be returned as a batch.

The block can be configured to be greedy or not.  If greedy, no matter how many sources the block has, it will receive all messages from all of them.  In non-greedy mode, the block accepts no items at all until it has been offered enough items, from all its sources, to complete a batch at which time it receives the items and returns the batch.  In yet another configuration option, the developer can specify a maximum number of batches to be returned by the block and once this maximum has been reached, the block will receive no more items.

Batches of 100 items
  1. var batchRequests = new BatchBlock<Request>(batchSize: 100);
  2.             var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
  3.             batchRequests.LinkTo(sendToDb);

 

Batch per Second
  1. var batch = new BatchBlock<T>(batchSize: Int32.MaxValue);
  2.             new Timer(delegate { batch.TriggerBatch(); }).Change(1000, 1000);

 

JoinBlock

JoinBlock is intended to join together data from multiple sources.  It is not, itself, a Target Block, but instead exposes properties that are targets for the specified source types.  For example, one could define a JoinBlock<string, int, DateTime> and it would expose properties of types TargetBlock<string>, TargetBlock<int> and TargetBlock<DateTime>.  The block will accept values on all these inputs and return a Tuple containing all the values.  In the above example, the block would also be a SourceBlock<Tuple<string, int, DateTime>>.  This block can be configured to be either greedy or non-greedy.  In greedy mode, the block will accept any data from all sources even if it doesn’t have all the inputs required to build a tuple.  When configured to be non-greedy, it will postpone accepting data until all inputs are present at which time a two-phase commit protocol is used to accept all the inputs.  Currently, this block can be defined as a JoinBlock<T1, T2> or JoinBlock<T1, T2, T3> only.

JoinBlock
  1. var throttle = new JoinBlock<ExpensiveObject, Request>();
  2.             for (int i = 0; i < 10; i++)
  3.                 requestProcessor.Target1.Post(new ExpensiveObject());
  4.             var processor =
  5.                 new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
  6.                 {
  7.                     var request = pair.Item2;
  8.                     var resource = pair.Item1;
  9.                     request.ProcessWith(resource);
  10.                     return resource;
  11.                 });
  12.             throttle.LinkTo(processor);
  13.             processor.LinkTo(throttle.Target1);

 

BatchedJoinBlock

This block is a combination of the above two.  JoinBlock is used to aggregate multiple inputs into a single tuple and BatchBlock collects multiple inputs into a collection.  The BatchedJoinBlock takes multiple instances of each input and batches them in a tuple of collections.  So,  a BatchedJoinBlock<string, int> would expose properties of types TargetBlock<string> and TargetBlock<int>.  It would accept inputs on both these properties and after the given batch size it reached, it would source a Tuple<IList<string>, IList<int>> where each list contained a number of entries equal to the batch size.

BatchedJoinBlock
  1. var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
  2. for (int i = 0; i < 10; i++)
  3. {
  4.     Task.Factory.StartNew(() =>
  5.     {
  6.         try { batchedJoin.Target1.Post(DoWork()); }
  7.         catch (Exception e) { batchJoin.Target2.Post(e); }
  8.     });
  9. }
  10. var results = await batchedJoin.ReceiveAsync();
  11. foreach (string s in results.Item1) Console.WriteLine(s);
  12. foreach (Exception e in results.Item2) Console.WriteLine(e);

 

The Join Blocks are useful for gathering the results from work done by other blocks and aggregating. 

Next time we’ll look at configuration, in more detail, and debugging support.

Posted in: .Net | TPL Dataflow

Tags: ,

Getting Started with TPL Dataflow–Part 3

October 10, 2012 at 5:54 AMAdministrator

Till now in this series, we’ve looked at the architecture of TPL Dataflow at a high level and the Buffering blocks which is one type of blocks included in the Dataflow library.  We saw that buffering blocks are both sources and targets and provide a way of buffering data between Executor blocks that are linked together or controlling how data is distributed to the Executor blocks.  Today, let’s look at the Executor blocks in more detail.

What are Executor Blocks?

Executor blocks are where the actual tasks are executed.  When creating a block of this type, the developer specifies the type of data the block is expecting to be passed and logic that is to be executed on that data.  Action blocks must implement the ITargetBlock interface and may also implement the ISourceBlock interface.  That is to say, all Executor blocks must be able to receive data to act upon, but depending on the block type may also pass that data on to another block in the matrix.  Let’s start by looking at the simplest of the Executor blocks, the ActionBlock.

ActionBlock

The ActionBlock is a target-only block.  That is, it receives data and acts on it, but is incapable of passing that data on to another block.  When creating an ActionBlock, the developer specifies a delegate that will be executed on the data.  This delegate can be either an Action<T> or a Func<T, Task>.  If the former is specified, the execution of the block is considered complete when the delegate returns.  In the case where the latter is specified, the block execution for any one piece of data is considered complete when the Task completes.  In this way, the ActionBlock is capable of executing in both a synchronous way and asynchronously.  Data is sent to an ActionBlock the same way it is sent to a BufferingBlock.  That is, either by posting to the block, explicitly, or by virtue of the block being linked to another block.  When a piece of data is received b the block, it is buffered until the block is ready to process it.  By default the ActionBlock will process a single piece of data at a time, but at construction the developer can specify parameters that modify that behavior.  Below are some examples.

Sequential and Synchronous
  1. var downloader = new ActionBlock<string>(url =>
  2.             {
  3.                 // Download returns byte[]
  4.                 byte[] imageData = Download(url);
  5.                 Process(imageData);
  6.             });
  7.  
  8.             downloader.Post("http://msdn.com/concurrency");
  9.             downloader.Post("http://blogs.msdn.com/pfxteam");

 

Sequential and Asynchronous
  1. var downloader = new ActionBlock<string>(async url =>
  2.             {
  3.                 byte[] imageData = await DownloadAsync(url);
  4.                 Process(imageData);
  5.             });
  6.  
  7.             downloader.Post("http://msdn.com/concurrency ");
  8.             downloader.Post("http://blogs.msdn.com/pfxteam");

Note the use of async and await.

Throttling async to 5 maximum
  1. var downloader = new ActionBlock<string>(async url =>
  2.             {
  3.                 byte[] imageData = await DownloadAsync(url);
  4.                 Process(imageData);
  5.             }, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
  6.  
  7.             downloader.Post("http://msdn.com/concurrency ");
  8.             downloader.Post("http://blogs.msdn.com/pfxteam");

 

Load Balancing
  1. var dbo = new DataflowBlockOptions { BoundedCapacity = 1 };
  2.  
  3.             var workers = (from i in Enumerable.Range(0, N)
  4.                            select new ActionBlock<int>(DoWork, dbo)).ToArray();
  5.  
  6.             ISourceBlock<int> dataSource = ;
  7.             foreach(var worker in workers) dataSource.LinkTo(workers);

N instances of the ActionBlock are created and each are linked to the dataSource.  Therefore, all the data supplied by the source will be passed to multiple ActionBlocks in a round-robin way

Run on UI Thread
  1. var ab = new ActionBlock<Bitmap>(image =>
  2.             {
  3.                 panel.Add(image);
  4.                 txtStatus.Text = "Added image #" + panel.Items.Count;
  5.             }, new DataflowBlockOptions
  6.             {
  7.                 TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
  8.             });

This example illustrates how one can specify the delegate inside the ActionBlock runs on the UI thread.

 

TransformBlock

The TransformBlock is both a source and a target.  That is to say, it receives data to process and then returns some value after processing is complete.  This block wraps a delegate of either Func<TIn, TOut> or Func<TIn, Task<TOut>>.  In the former case, execution is considered complete when the delegate returns and in the latter case, when the Task completes.  Like the ActionBlock, this allows the block to execute either synchronously or asynchronously.  This block type buffers both its inputs and its outputs.  The output buffer behaves just like a BufferBlock.

By default, this block processes one piece of data at a time, in a FIFO manner.  However, at construction the developer can specify that multiple instances run at the same time.  Interestingly, regardless of how many instances a block is running, the order or the output data is guaranteed to match the order it was input.

Concurrent Pipeline
  1. var compressor = new TransformBlock<byte[], byte[]>(
  2.             input => Compress(input));
  3.             var encryptor = new TransformBlock<byte[], byte[]>(
  4.                 input => Encrypt(input));
  5.             compressor.LinkTo(Encryptor);

A powerful use of the TransformBlock is to chain them into a pipeline where data is processed incrementally, as in the above example.

 

TransformManyBlock

The TransformManyBlock is very similar to the TransformBlock with one major difference.  Where the TransformBlock processes a piece of data and returns one, the TransformManyBlock can return any number of data items per single input (0 or more).  This block, like the previous two, can be execute in either a synchronous way or asynchronous way.  As a synchronous block, it runs as a delegate of type Func<Tin, IEnumerable<TOut>> and as an asynchronous block as Func<TIn, Task<IEnumerable<TOut>>> .  Like the other two blocks, this one defaults to sequential, synchronous processing but may be configured to act asynchronously.

Async Web Crawler
  1. var downloader = new TransformManyBlock<string,string>(async url =>
  2.             {
  3.                 Console.WriteLine(Downloading  + url);
  4.                 try { return ParseLinks(await DownloadContents(url)); } catch{}
  5.                 return Enumerable.Empty<string>();
  6.             });
  7.             downloader.LinkTo(downloader);

The above example demonstrates recursive behavior as the block is lined to itself.

 

Expand an array
  1. var expanded = new TransformManyBlock<T[], T>(array => array);

Takes an array as input and returns its members individually.

Filtering
  1. public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
  2.             {
  3.                 return new TransformManyBlock<T,T>(item =>
  4.                     filter(item) ? new [] { item } : Enumerable.Empty<T>());
  5.             }

 

Conclusion

With these three executor blocks, most all scenarios can be covered.  In the next post, we’ll look at Joining Blocks and then, later, at configuration options.

Posted in: .Net | TPL Dataflow

Tags: ,

Getting Started With TPL DataFlow–Part 2

October 10, 2012 at 3:23 AMAdministrator

In the last post, I talked a bit about what Dataflow is, why it’s useful and a bit about its architecture.  Today,  I’d like to start looking at some of the types that are included in the Library and how they’re used.  Specifically, I’d like to look at the buffering blocks that are supplied.

What’s a Buffering Block?

Suppose you have a system that is retrieving URL’s for RSS feeds from a database.  For each one that’s read, a call is made to that URL to retrieve the RSS document with a list of entries.  Each of these entries is then passed to a second piece of code (running asynchronously) for further processing.  Consider that if each RSS feed contains the last 10 entries, the first piece of code will run one time through for each feed while the second will run 10 times.  For any given system, there could be significant differences in processing speed, as well.  In these cases, we don’t want either process to have to wait for the other.  We need some way to accumulate data between the pieces of the system so that both can continue processing to the greatest extent possible.  Buffering accomplishes this and the Dataflow library includes three stand alone buffering blocks; BufferBlock<T>, BroadcastBlock<T> and WriteOnceBlock<T>.

BufferBlock<T>

The purpose of the BufferBlock<T> is to store a first-in-first-out collection of objects of type T.  This collection can be either bounded or unbounded and can be accessed synchronously or asynchronously.  The block can be posted to or received from either explicitly or via links to other blocks (which we’ll discuss later).  The following examples are specified in the documentation.

Synchronous Producer/Consumer
  1. // Hand-off through a BufferBlock<T>
  2.             private static BufferBlock<int> m_buffer = new BufferBlock<int>();
  3.  
  4.             // Producer
  5.             private static void Producer()
  6.             {
  7.                 while(true)
  8.                 {
  9.                     int item = Produce();
  10.                     m_buffer.Post(item);
  11.                 }
  12.             }
  13.  
  14.             // Consumer
  15.             private static void Consumer()
  16.             {
  17.                 while(true)
  18.                 {
  19.                     int item = m_buffer.Receive();
  20.                     Process(item);
  21.                 }
  22.             }
  23.  
  24.             // Main
  25.             public static void Main()
  26.             {
  27.                 var p = Task.Factory.StartNew(Producer);
  28.                 var c = Task.Factory.StartNew(Consumer);
  29.                 Task.WaitAll(p,c);
  30.             }

 

Asynchronous Producer/Consumer
  1. // Hand-off through a BufferBlock<T>
  2.             private static BufferBlock<int> m_buffer = new BufferBlock<int>();
  3.  
  4.             // Producer
  5.             private static void Producer()
  6.             {
  7.                 while(true)
  8.                 {
  9.                     int item = Produce();
  10.                     m_buffer.Post(item);
  11.                 }
  12.             }
  13.  
  14.             // Consumer
  15.             private static async Task Consumer()
  16.             {
  17.                 while(true)
  18.                 {
  19.                     int item = await m_buffer.ReceiveAsync();
  20.                     Process(item);
  21.                 }
  22.             }
  23.  
  24.             // Main
  25.             public static void Main()
  26.             {
  27.                 var p = Task.Factory.StartNew(Producer);
  28.                 var c = Consumer();
  29.                 Task.WaitAll(p,c);
  30.             }

 

Throttling Producer
  1. // Hand-off through a bounded BufferBlock<T>
  2.             private static BufferBlock<int> m_buffer = new BufferBlock<int>(
  3.                 new DataflowBlockOptions { BoundedCapacity = 10 });
  4.  
  5.             // Producer
  6.             private static async void Producer()
  7.             {
  8.                 while(true)
  9.                 {
  10.                     await m_buffer.SendAsync(Produce());
  11.                 }
  12.             }
  13.  
  14.             // Consumer
  15.             private static async Task Consumer()
  16.             {
  17.                 while(true)
  18.                 {
  19.                     Process(await m_buffer.ReceiveAsync());
  20.                 }
  21.             }
  22.  
  23.             // Start the Producer and Consumer
  24.             private static async Task Run()
  25.             {
  26.                 await Task.WhenAll(Producer(), Consumer());
  27.             }

Accessing the BufferBlock<T> is thread safe, so it can be done from multiple threads.  As you can see, you can also specify how large the buffer can grow.  If the specified size is exceeded, older instances of T will be discarded.  When this block is linked to a single target block, it will pass each instance into the target only once.  If more than one target is linked to the buffer, each will receive a message, in-turn, in a round-robin type fashion.

 

BroadcastBlock<T>

Where BufferBlock’s purpose was to hold on to data before passing it on to a target, BroadcastBlock is used to send a copy of each instance it holds to EACH target that is linked to it.  The user may optionally provide a Func<T,T> that will be executed to create the copies if the original instance.  If this Func is not specified, the original instance will be passed on to all targets.  One thing to remember is that if the original instance is passed to more than one target, each target will then hold a reference to the same instance. If the instance is not thread safe, unexpected changes can occur on a different thread from the perspective of any one thread. 

Another behavior of this block to watch out for is how it passes instances to each target.  Each instance in the buffer will be offered to all targets once and only once.  Once it has done so, it moves on to the next instance.  This means that of a target is busy and does not accept this piece of data immediately, it will not get another chance to receive it.  Therefore, you’ll probably want to make sure all targets are eager in that they immediately receive any data offered.  We’ll talk more about which blocks have eager behavior a little later.

Here are some code examples.

Save and Display Image
  1. var ui = TaskScheduler.FromCurrentSynchronizationContext();
  2.             var bb = new BroadcastBlock<ImageData>(i => i);
  3.  
  4.             var saveToDisk = new ActionBlock<ImageData>(item =>
  5.                 item.Image.Save(item.Path));
  6.  
  7.             var showInUi = new ActionBlock<ImageData>(item =>
  8.                 imagePanel.AddImage(item.Image),
  9.                 new DataflowBlockOptions { TaskScheduler = ui });
  10.  
  11.             bb.LinkTo(saveToDisk);
  12.             bb.LinkTo(showInUi);

Here, a BroadcastBlock is instantiated along with two ActionBlocks (which we’ll look at next time).  Both ActionBlocks are then attached as targets to the BroadcastBlock.  The  developer can now Post() instances of ImageData to the BroadcastBlock and each instance will be passed to both actions which would result in each image being both saved to disk and displayed in the UI.  Notice, also, that the ActionBlock defined for adding the image to the UI is configured to use the CurrentSynchronizationContext which is the UI thread.  In this way, the action will always be run on the UI thread which avoids the problem of trying to update the UI from a background thread.  The ActionBlock will still run asynchronously, however.

 

WriteOnceBlock<T>

In C#, you can define a ReadOnly variable.  This variable can be then set once from the constructor and from that point on it acts like a constant.  The WriteOnceBlock behaves in very much the same way.  With this block, you can post a single value to it.  From that point on, that value never changes.  Like the BroadcastBlock, all targets linked to the block will receive a copy of the original value.

ReceiveFromAny
  1. public T ReceiveFromAny<T>(params ISourceBlock<T> [] sources)
  2.           {
  3.               var wob = new WriteOnceBlock<T>();
  4.               foreach(var source in sources) source.LinkTo(wob, unlinkAfterOne:true);
  5.               return wob.Receive();
  6.           }

 

A Dataflow Const of T
  1. // Code that ensures c_item is initialized
  2. c_item.Post(/* lazily retrieved value */);
  3. // Code that relies on c_item always being the same
  4. T value = await c_item.ReceiveAsync();

 

Splitting a Task's Outputs
  1. public static async void SplitIntoBlocks(this Task<T> task,
  2.     out IPropagatorBlock<T> result,
  3.     out IPropagatorBlock<Exception> exception)
  4. {
  5.     result = new WriteOnceBlock<T>(i => i);
  6.     exception = new WriteOnceBlock<Exception>(i => i);
  7.  
  8.     try { result.Post(await task); }
  9.     catch(Exception exc) { exception.Post(exc); }
  10. }

 

Request and Response
  1. // with WriteOnceBlock<T>
  2. var request = ;
  3. var response = new WriteOnceBlock<TResponse>();
  4. target.Post(Tuple.Create(request, response));
  5. var result = await response.ReceiveAsync();
  6.  
  7. // with TaskCompletionSource<T>
  8. var request = ;
  9. var response = new TaskCompletionSource<TResponse>();
  10. target.Post(Tuple.Create(request, response));
  11. var result = await response.Task;

 

Conclusion

That covers the Buffering Blocks included in the TPL Dataflow Library.  Next time, we’ll start looking at the Executor blocks

Posted in: .Net | TPL Dataflow

Tags: ,

Visual Studio Launch Event 10/13

October 4, 2012 at 7:37 AMAdministrator

On October 13th, Magenic’s Atlanta Office will be presenting a Visual Studio 2012 Launch Event.  I’ll be presenting on some of the new feature available in this version for web and cloud developers.  See you there!

You can register for the event at http://visual_studio_2012_launch_atlanta.eventbrite.com/.

 

Posted in: Events

Tags:

Getting Started with TPL DataFlow–Part 1

October 3, 2012 at 6:34 AMAdministrator

I recently posted a series on using TPL DataFlow and AlchemyAPI to classify blog posts.  In those posts, I promised to circle back and take a deeper look at TPL DataFlow.  In this series, I’ll talk about what TPL DataFlow is and how to use it in situations where a data-driven or agent based approach fits the problem domain.

What is TPL DataFlow

The DataFlow API contains a series of predefined “dataflow blocks” that wrap TPL Tasks to make integrating the Tasks together easier.  It also utilizes Concurrent Collections and more.  The TPL (Task Parallel Library) has become the go to technology in the .Net space in regard to asynchronous and parallel programming because it presents an API that makes these types of development much easier to work with.  Dataflow builds on the TPL abstractions to make reacting to the availability of data and the chaining of tasks together more natural.

IDataflowBlock

All of the blocks in the DataFlow library implement this interface which is defined as below:

IDatflowBlock
  1. public interface IDataflowBlock
  2. {
  3.     void Complete();
  4.     void Fault(Exception error);
  5.     Task Completion { get; }
  6. }

The Complete method provides a way to communicate to the block that it should stop processing normally, Fault tells the block to stop processing due to an exception and the Completion property provides a reference to the underlying Task the block is wrapping.  This reference can be used to retrieve results, continue the tasks processing with another Task, awaiting and so on.

ISourceBlock and ITargetBlock

Each of these interfaces inherit from from IDataflowBlock interface and each block can inherit from either or both of these so a block can be a source (provides data), a target (receives data) or both.  ISourceBlock looks like this:

ISourceBlock
  1. public interface ISourceBlock<out TOutput> : IDataflowBlock
  2. {
  3.     bool TryReceive(out TOutput item, Predicate<TOutput> filter);
  4.     bool TryReceiveAll(out IList<TOutput> items);
  5.  
  6.     IDisposable LinkTo(ITargetBlock<TOutput> target, bool unlinkAfterOne);
  7.  
  8.     bool ReserveMessage(
  9.         DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
  10.     TOutput ConsumeMessage(
  11.         DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target,
  12.         out bool messageConsumed);
  13.     void ReleaseReservation(
  14.         DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
  15. }

The generic type, TOutput, specifies the type of data this source will provide.  A source would be expected to obtain or generate objects of type TOutput and make them available for targets to consume.  TryReceive and TryReceiveAll provide a way to obtain results from the underlying task.  LinkTo allows the developer to link this source block to a target.  The next three methods allow the target block to negotiate the receiving of an instance of the data in the three-phase like process.  In cases where a single source is supplying data to multiple targets, this allows the targets to obtain data without duplication or contention.

ITargetBlock
  1. public interface ITargetBlock<in TInput> : IDataflowBlock
  2. {
  3.     DataflowMessageStatus OfferMessage(
  4.         DataflowMessageHeader messageHeader, TInput messageValue,
  5.         ISourceBlock<TInput> source, bool consumeToAccept);
  6. }

The ITargetBlock adds the ability for a source to offer data to the target.

Blocks that implement both ITargetBlock and ISourceBlock implement the IPropagatorBlock which implements both the above interfaces without adding any additional functionality.

IPropagatorBlock
  1. public interface IPropagatorBlock<in TInput, out TOutput> :
  2.     ITargetBlock<TInput>, ISourceBlock<TOutput> { }

What’s next?

In the next post, I’ll be introducing the buffer blocks that are part of the DataFlow library and demonstrating how they are used.

Posted in: .Net | TPL Dataflow

Tags: ,