User-Defined Data Types in Microsoft SQL Server 2008

A user-defined data type provides you with a convenient way to standardize the usage of native data types for columns that have the same domain of possible values. For example, when you store many e-mail addresses in different tables. Since there is no single, definitive way to store e-mail addresses it is hard to maintain consistency. You might store it as a varchar(30) in Customers table and as a varchar(50) in the Newsletter table. We can however create a user-defined data type emailaddress and use it in any table that keeps track of e-mail addresses to ensure that they all use the same native data type.

Our first user-defined data type

We’ll start by creating a new data type emailaddress:

CREATE TYPE emailaddress FROM varchar(30) NOT NULL;

We can now use the newly created data type as we would use a native data type. When the table is created, internally the emailaddress data type is known to be a varchar(30).

CREATE TABLE customer
(
   Id            smallint       NOT NULL,
   FirstName     varchar(50)    NOT NULL,
   LastName      varchar(50)    NOT NULL,
   Phone         varchar(50)    NOT NULL,
   Email         emailaddress   NOT NULL
)

Behind the scenes

We can gather more information about the columns in our tables by querying the catalog view sys.columns. We’ll use a basic query that shows us two columns in sys.columns, one containing a number that represents the underlying system data type and another containing a number that represents the data type that was used when the table was created.

SELECT column_id, name, system_type_id, user_type_id,
type_name(user_type_id) as user_type_name, max_length
FROM sys.columns WHERE object_id = object_id('customer')
column_id name system_type_id user_type_id user_type_name max_length
1 Id 52 52 smallint 2
2 FirstName 167 167 varchar 50
3 LastName 167 167 varchar 50
4 Phone 167 167 varchar 50
5 Email 167 257 emailaddress 30

We can see that both the Phone column and the Email column have the same system_type_id, although the Email column shows that the user_type_id is a user-defined data type. This user-defined type is resolved when the table is created and can’t be dropped or changed as long as a table is using it. Once declared, a user-defined data type is static and immutable, so no inherent performance penalty occurs in using a user-defined type instead of their native variant.

The usage of these data types can make your database more consistent and clear. SQL Server implicitly converts between compatible columns of different types. Currently the user-defined types don’t support subtyping or inheritance, not do they allow a DEFAULT value or a CHECK constraint to be declared as part of de user-defined type itself.

30. October 2011 by Jeroen Verhulst
Categories: MS SQL Server, Uncategorized | 4 comments

A quick overview of Microsoft SQL Server Metadata

SQL Server has a set of tables that store information about all the objects, data types, constraints, … In SQL Sever 2008 these tables are called the system base tables.  These tables can exist either only in the master database (and contain system-wide information) or in all the databases (and contain information about objects and resources of that database). These system base tables are not always visible by default. You won’t see these tables when you use the Object Explorer in SQL Server Management Studio and when you are not a system administrator you won’t see them either when executing the sp_help stored procedure. As an administrator one can retrieve these base tables by executing this query:

USE master;
SELECT name FROM sys.objects
WHERE type_desc = 'SYSTEM_TABLE';

But when you try to query one of the tables you’ll get an 208 error (‘Invalid object name’). The only way to see this data is using the DAC (Dedicated Administrator Console).  It is important to note that these system base tables are used for internal purpose and there is no guarantee for compatibility.

In SQL Server 2008 there are three types of metadata objects. You have Compatibility Views, Catalog Views and Dynamics Memory Objects. The Dynamics Memory Objects don’t correspond to physical tables but contain data that is gathered from internal structures. We’ll explore these in a later post. The Catalog Views and Compatibility Views are actually views build on the system tables.

Compatibility Views

Prior to SQL Server 2005 you were allowed to see all the information in the system tables. Although this wasn’t intended many people used these system tables to develop their own reporting tools and applications to provide results that are not available using the supplied system procedures. You could assume that you would have to use the Dedicated Administrator Console to use these custom tools (due to the inaccessibility of the system base tables).

However you will be disappointed. In SQL Server 2008 many of the names and content has been changed so prior code will be completely useless, even within the DAC. The DAC is only intended in case you need emergency access and there is no support for any other use. To help you, SQL Server offers a set of compatibility views. These views allow you to have access to a subset of the SQL Server 2000 system tables. These compatibility views are accessible from any database.

Some of these compatibly views will have names that might be quite familiar to you (these include sysobjects, sysindexes, sysusers and sysdatabases) and some more exotic ones (sysmembers and sysmessages). Since it’s all about compatibility the views in SQL Server 2008 have the same names (and the same column names) as the SQL Server 2000 system tables. This means that your existing code won’t break.

It is however important to note that there is no guarantee that these views offer exactly the same results as the corresponding tables in SQL Server 2000. It’s also evident that these views won’t contain any information about new features in SQL 2008. These compatibility views should be used for backward compatibility only since they will be removed in a future version of SQL Server.

There are also compatibility views for the psuedotables of SQL Server 2000 (e.g. sysprocesses and syscacheobjects). These psuedotables are actually tables that are not based on data stored on disk. They are build from internal structures but can be queried exactly as if they were tables. Since SQL Server 2005 these are replaced by Dynamic Management Objects. It is important to note that there is no one-on-one correspondence between these psuedotables and the Dynamics Management Objects. For example: When you want to retrieve all information available in sysprocesses you have to access 3 Dynamic Management Objects: sys.dm_exec_connections, sys.dm_exec_sessions and sys_dm_exec_requests.

Catalog Views

Since SQL Server 2005 there is a set of catalog views as a general interface to the system metadata. All the catalog views (and the Dynamic  Management Objects and compatibility views as well) are in the sys schema. You must reference this schema name when accessing these objects. Some of the names are quite similar to the SQL Server 2000 table names. Let’s give an example, there is a catalog view called objects in the sys schema, so when we want to reference the view, we execute this:

SELECT * FROM sys.objects;

However the names are similar, the columns in these catalog views can be very different from those in the compatibility views. This is (among others) the case with sys.indexes and sys.databases. You can observe this behavior by running these queries:

SELECT * FROM sys.databases;
SELECT * FROM sysdatabases;

The compatibility view (sysdatabases) is located in the sys schema, so this view can be referenced as well as sys.sysdatabases. For compatibility reasons, the schema name is not required for these compatibility views. It is however required for the catalog views. (You can’t just select from a view called databases, you need to use the sys schema as prefix).

When we compare the output of the two above queries, we notice that the sys.databases catalog view has a lot more columns. Instead of a bitmap status field that needs to be decoded, each possible database property had its own column in sys.databases. When using SQL Server 2000, running the stored procedure sp_helpdb decodes all these database options, but because this is a stored procedure it is difficult to filter the results. As a view, sys.databases can be queried and filtered. A small example, when we want to know which of our database is in simple recovery mode, we can run this code:

SELECT name FROM sys.databases
WHERE recovery_model_desc = 'SIMPLE';

The catalog views are built on an inheritance model, so that common attributes don’t have to be redefined. When we look at sys.objects we can see 12 columns (common to all types of objects). When we look at sys.tables we notice that we have the same 12 columns (in the same order) and an additional 15 columns with additional information that only applies to tables.

Some of the metadata can only be found in the master database and applied to system-wide data such as logins and databases. Other metadata appears in all databases and contains information such as the objects and the permissions.

Since the metadata objects are views, they are based on an underlying Transact-SQL (T-SQL) definition. It is possible to see the definition of these views using the object_definition function.  When we want to see the definition of sys.tables, we execute the following:

SELECT object_definition (object_id('sys.tables'));

All the metadata that starts with sys_dm (such as sys_dm_exec_cached_plans) are Dynamic Management Objects. We’ll explore these in a later post. Although the catalog views are the preferred way to access metadata, there are some other tools available as well.

Information Schema Views

The Information Schema Views (introduced in SQL Server 7.0) were the original system table-independent views of SQL metadata. These views can be found in a schema called INFORMATION_SCHEMA. Some of the information that can be found in the catalog views can also be found in the information schema views. You should consider using the information schema views when you need to write a portable application that accesses the metadata.

However you should note that these views only show the objects that are with the SQL-92 standard. This means that there are no views for certain features (for example indexes, since these are not defined in the standard). Thus if your code doesn’t need to be portable or if you need metadata about features that are not standard (indexes, filegroups, the CLR, SQL Server Service Broker, …) it is suggested that you use the catalog views.

System Functions

Most of the SQL Server system functions are property functions. These property functions give us individual values for many of the SQL Server objects, the SQL Server databases and the SQL Server instance. The values returned by these property functions are scalar, thus they can be used as values returned by SELECT-statements and as values to populate columns in tables. The property functions available in SQL Server 2008 are:

  • SERVERPROPERTY
  • COLUMNPROPERTY
  • DATABASEPROPERTY
  • DATABASEPROPERTYEX
  • INDEXPROPERTY
  • INDEXKEY_PROPERTY
  • OBJECTPROPERTY
  • OBJECTPROPERTYEX
  • SQL_VARIANT_PROPERTY
  • FILEPROPERTY
  • FILEGROUPPROPERTY
  • TYPEPROPERTY
  • CONNECTIONPROPERTY
  • ASSEMBLYPROPERTY

Some of the information returned by these  property functions can also be seen using the catalog views. The DATABASEPROPERTYEX function has  a property called Recovery. This returns the recovery model of a database. To view the recovery model of a single database you can use the function like this:

SELECT DATABASEPROPERTYEX('msdb', 'Recovery');

The view the recovery model for all database you can use the sys.databases view:

SELECT name, recovery_model, recovery_model_desc
FROM sys.databases;

In addition to these property functions there are also functions that serve as shortcuts to catalog views. When we want to find the database id for an ‘MyDatabase’ database it’s possible to either query the sys.databases catalog view or you could use the DB_ID() function. Both statements should return the same result.

SELECT database_id
FROM sys.databases
WHERE name = 'MyDatabase';

SELECT DB_ID('MyDatabase');

Note: When you see a column name ending with _desc you are looking at the ‘friendly named’ column. This column has always a paired column with a more compact (and cryptic) content. Both are available so that use can use what suits you best.

System Stored Procedures

The system stored procedures are the original metadata access tool of SQL Server in addition to the system base tabled themselves. Most of the initial system stored procedures are still available in the current versions however the catalog views are a bug improvement. Using the catalog views you can query the views as if they are tables. With the system stored procedures you have to accept the data in the way it’s returned. Some system stored procedures allow some limited parameters.

For the sp_helpdb stored procedure you can either pass no parameter to see all databases or use a parameter and see only the information for the specified database. However when you want to execute more complex queries you’re stuck. These complex queries are however easy to execute using the catalog views.

24. February 2011 by Jeroen Verhulst
Categories: MS SQL Server, Uncategorized | Tags: | Leave a comment

Microsoft SQL Server: SAVE TRANSACTION

Today somebody asked me a question concerning SAVE TRANSACTION, so I decided to write a small blog post about the basic concept. Consider a database test that resides on our SQL Server instance. In this database we have a table called Item which has the columns ItemNo and Description:

image

We assume that our table is empty and we execute the following T-SQL code:

BEGIN TRANSACTION

INSERT INTO Item Values (1, 'Computer')
INSERT INTO Item Values (2, 'Printer')
DELETE FROM Item Where ItemNo = 1

SAVE TRANSACTION SaveA

INSERT INTO Item Values (3, 'Router')
INSERT INTO Item Values (4, 'Monitor')

SAVE TRANSACTION SaveB

INSERT INTO Item Values (5, 'Keyboard')
UPDATE Item SET Description = 'Wireless Keyboard'
WHERE ItemNo = 5

ROLLBACK TRANSACTION SaveA

UPDATE Item SET Description = 'Mouse'
WHERE ItemNo = 4

COMMIT TRANSACTION

Question: Which item or items are there in the Item table after executing the T-SQL?


In this example you executed Transact-SQL code that uses transactional savepoints. These savepoints allow you to roll back portions of the transaction as needed. To create a savepoint to which you can roll back you use the SAVE TRANSACTION statement and specify a savepoint name.

When you issue a ROLLBACK statement you can specify the safepoint to which you want to roll back. In this example we issued two INSERT statements to insert the first two rows and a DELETE statement that deletes the first row. Then we created a savepoint named SaveA.  Next we inserted row 3 and row 4 and created another savepoint named SaveB. By having multiple savepoints we can roll back to different places in the transaction.

Next we insert another row in the table and edit the description of item 5.  Then we issue a rollback to savepoint SaveA. This rolls back all changes made since creating that savepoint. After this rollback we update item 4. However since the INSERT for this row has been rolled back, no update occurred. Therefore, after we commit the transaction, all we have left is the second row (containing ‘Printer’).

You should note that when rolling back to a savepoint, SQL Server holds resources until the entire transaction is either commited or rolled back. Therefore you should always issue a COMMIT or a complete ROLLBACK, even if you have previously rolled back part of the transaction.

We’ll explore transactions more in detail in a later post.

16. February 2011 by Jeroen Verhulst
Categories: MS SQL Server | Leave a comment

It’s all about being lazy (in C#)

Most programming languages we use today employ something called eager evaluation, this means that our program will execute all statements and expression in the same order as they were written by the developer. For example, all arguments of a method call will be evaluated before the actual method is called. Sometimes though it could be useful to delay a certain execution of code until the result is actually needed. This may be there case when the result may not be needed at all (but we don’t know prior whether it will be needed or not). Secondly it could be because we don’t want our program to block whilst computing everything in advance. Instead we want to wait and just execute the complex computations when we actually need the result.

Although C#  has its own Lazy<T> class, we’ll implement our own Lazy class for educational purposes. It should be easy to switch to using the .NET class.

So how can we write these lazy computations in C#? We’ll start by implementing our own Lazy class that represents this kind of lazy computation. Next we’ll also use this class in some simple examples to demonstrate how this class can be used.

Exploring our Lazy<T> Class

We’ll dive in by writing an generic class that represents the delayed (and lazy) computation, Lazy<T>.  How can we describe this class? Actually it is just a class that will execute a piece of code once somebody asks for the a result. It will also need to cache this result, so that we can enforce that the code is at most executed once. It is pretty easy to represent this in C#: for the ‘piece of code’ that we need to execute, we can use a Func delegate. We’ll also use generics, since this ‘piece of code’ can return any .NET type.

public class Lazy<T>
{
    private Func<T> func;
    private T result;
    private bool hasValue;

    public Lazy(Func<T> func)
    {

        this.func = func;
        this.hasValue = false;
    }
    public T Value
    {
        get
        {
            if (!this.hasValue)
            {
                this.result = this.func();
                this.hasValue = true;
            }
            return this.result;
        }
    }
}

Our new class had three fields; we have our func field, a delegate that will be executed when someone tries to access the value. The hasValue field is a Boolean saying whether we already called our delegate. Finally, our result field stores the result after the delegate was called. The only logic of the class is in our Value property, which returns the value calculated by the delegate. The getter first tests if we already have our result and decides whether the delegate should be executed or not. In either case we can return the result that was calculated earlier (either previously or just before).

Wouldn’t it be fun if we could actually use this class as well? Let’s find out how we can do that. Our constructor expects a value of type Func<T>, this represent a delegate that doesn’t have any arguments and returns a value of the type T. The main advantage of using this type is that we can use an anonymous function to write our code that will be executed in a very compact way directly when getting the value of our Lazy<T>. In our first example we will create a lazy value that first prints a string to the console window and then returns an numeric value once it is executed. Now we can trace when the Lazy<T> is executed.

Lazy<int> lazy = new Lazy<int>(() =>
{
    /* This should take a while */
    Console.WriteLine("Working on it...");
    return 42;
});

Console.WriteLine("What is the answer to the ultimate question of life,
                                the universe, and everything?");

Console.WriteLine("The answer is {0}", lazy.Value);           /* First try */
Console.WriteLine("The answer is (again) {0}", lazy.Value);   /* Second try */

In our sample we first create a variable called lazy. This variable represents our lazy value. We print a string to the console and then use the Value property of our lazy value to get the result from our computation (even twice!). You’ll probably expect this result:

image

Taking it to the next level (with C# Type Inference)

Before we move on to more interesting (and more useful) examples, we’ll try to improve our Lazy<T> class. Our syntax that we currently use for creating our lazy value is somewhat too verbose since we have to write Lazy<int> twice.  Fortunately we can simplify this using C# type inference. Because of this it’s possible to omit the type arguments when calling a generic method. To use this technique we write a simple static helper class with a static method for creating a lazy value.

public static class Lazy
{
    public static Lazy<T> New<T>(Func<T> func)
    {
        return new Lazy<T>(func);
    }
}

When using this static class, we can easily create a lazy value by calling Lazy.New() instead of writing new Lazy<int>.  When we combine this with the var keyword in C# (which tells the compiler to infer the type of the variable automatically depending on the expression used to initialize the variable) we get an even more compact syntax. In addition this helper class gives you the ability to use anonymous types as well. This can often be useful, as demonstrated in the following example where we’ll use this to create a lazy type representing both the sum and the product of two numbers.

int a = 22, b = 20;

var lazy = Lazy.New(() =>
{
    Console.WriteLine("Calculating...");
    return new { Prod = a * b, Sum = a + b };
});

Console.WriteLine("The product is {0} and the sum is {1}", lazy.Value.Prod, lazy.Value.Sum);

And of course the output is just as expected:

image

What about a lazy value as an argument of a method?

As mentioned in the introduction, one of the aspects of eager evaluation is that the arguments of a method are evaluated before the method is actually called. This may not be the best thing to do if calculations could take a long time to process or if the result may even not be necessary. On the other side, when talking about lazy evaluation, the arguments are never evaluated unless the value is actually required in the method body. We can simulate this behavior using our lazy values. In our example we’ll take two lazy values and end up using only one of them.

static Random rnd = new Random();
static void UseRandomArgument(Lazy<int> a0, Lazy<int> a1)
{
    int res;
    if (rnd.Next(2) == 0)
        res = a0.Value;
    else
        res = a1.Value;
    Console.WriteLine("Our result is {0}", res);
}

In order to call this method we will need to create two lazy values (using our Lazy.new method). To show how this code behaves we’ll add a call to Console.WriteLine in every anonymous function.

var calc1 = Lazy.New(() =>
{
    Console.WriteLine("Calculating #1"); return 42;
});

var calc2 = Lazy.New(() =>
{
    Console.WriteLine("Calculating #2"); return 44;
});

UseRandomArgument(calc1, calc2);
UseRandomArgument(calc1, calc2);

When you execute this code you’ll experience different behavior on each run. Depending on the run only the first, only the second or both the lazy arguments are calculated. It is important (and easy) to notice here that not each argument is always executed. 

image

30. November 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

IIS 7–Error – Handler PageHandlerFactory-Integrated has a bad module ManagedPipelineHandler in its module list

Whilst deploying a WCF service on a clean Windows Server 2008 R2 with IIS 7.5 I kept having an HTTP Error 500 Internal server error. When browsing to the localhost on the webserver, I discovered the actual error was: “Handler “PageHandlerFactory-Integrated” has a bad module “ManagedPipelineHandler” in its module list”.

 

This issue can easily be resolved by running the following command (replace v4.0.30319 with your own version).

%windir%\Microsoft.NET\Framework\v4.0.30319\aspnet_regiis.exe -i

14. November 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

Windows Communication Foundation For Dummies

What exactly is Windows Communication Foundation?

Windows Communication Foundation (WCF in short) is an programming interface that’s part of the .NET Framework for building connected service-oriented applications. Let’s immediately start with an example to make clear what’s it all about:

Let’s say we are consuming (or using) a web service. This services needs a zip code (input) and returns the weather forecast (output). From a consumer point of view, all you care about is the input (zip code) and the output (the forecast), not the specific implementation. A WCF service takes a request, and returns a response and never talks to the client in-between. This means they interoperate on only what they had agreed to interoperate upon, i.e. the contract. This contract is the first important piece of an WCF service.

The contract

Your WCF service is based upon a contract. This contract is usually implemented  as an interface decorated with the attribute [ServiceContractAttribute]. Let’s see what our weather forecasting  contract could look like. Our contract will define a method GetForeCast (with the attribute [OperationContract]) that takes a zip code and returns a WeatherForecast object (the implementation of this object is trivial at this point).

[ServiceContract]
public interface IWeatherForecastService
{
    [OperationContract]
    public WeatherForecast GetForeCast(int zipCode);
}

The ABC of Windows Communication Foundation

Of course a WCF service is more than just a contract. When we want to be able to consume this service we’ll need some these building blocks:

A first block is the Address. This is where your service can be found and usually in a familiar form: scheme://domaon[:port]/path (e.g.: http://localhost:8080/MyWCFService ). Secondly we have the Binding, this refers to the protocols that are used by a specific endpoint (e.g.: http, tcp, msmq, …). Last (but not least) we need a Contract. As explained above this is the core of your service and expresses what your service does.

When we combine this ABC (Address, Binding and Contract), we get an EndPoint. This EndPoint is how a ServiceHost exposes a service so that clients can invoke the operations that are defined in the contract.

In the above contract we returned a type called WeatherForecast. This WeatherForecast type is a class we wrote and it represents a business object that conveniently holds all data related to the forecast. When sending this object to a consumer it will be serialized.

When you combine this type with the EndPoint, we get the MetaData for our service. Given this MetaData, a consumer of the service can create a Proxy.

This Proxy is what the consumer uses to communicate with the host and basically is an empty type that exposes all operations in a service contract, and hides serialization/sending over wire details. A single proxy, uses a single endpoint.

With the client and host talking to each other through various Channels (this is what a message goes through when it goes between a client and a host. Usually you have multiple stacked channels) you may have various Behaviors associated with the service. There behaviors modify the message as they flow through a channel stack. A good example of a behavior is authorization.

Conclusions

Let’s first summarize what we’ve learnt above before moving on:

We have a host and client who agree upon a contract.  Our host exposes one or more EndPoints which are a combination of Address, Binding and Contract. On the other side our client uses a Proxy (this proxy is tied to a specified EndPoint thus also tied to a particular Contract). The actual communication related details are abstracted in Channels and Behaviors.

Your first WCF Service

Moving on to the obligatory ‘Hello World’ example, let’s first think how we need to construct this WCF service. Keeping in mind the above process we can easily come up with a basic plan.

   -  First, think of the contract (What am I trying to do?!)

   -  Secondly, think of where you will host it, and what technology you will use to host it.  
   -  Finally think of the other parts like authentication.

Keeping the above plan in mind, it should be pretty clear that we need to start with defining the problem. For our example that’s pretty simple: We want to create a service that accepts a name and returns Hello “name”. So how do we actually create this service?

1. Open Visual Studio 2010 and create a new ‘WCF Service Application’

image

2.  By default, Visual Studio will generate a bunch of files for you. Although these are handy, we’ll delete

      them for new, and focus on writing the code our self. We’ll delete both IService1.cs and Service1.svc.

3. We’ll continue by adding a new WCF Service (HelloWorld.svc). Visual Studio will create HelloWorld.svc

     and IHelloWorld.cs.

image

4. When we look at IHelloWorld.cs we can see that Visual Studio has generated some code to help us:

namespace HelloWorldService
{
    [ServiceContract]
    public interface IHelloWorld
    {
        [OperationContract]
        void DoWork();
    }
}

Since we don’t need the DoWork method, but do need an SayHello method, we’ll adapt this interface to match our problem definition:

namespace HelloWorldService
{
    [ServiceContract]
    public interface IHelloWorld
    {
        [OperationContract]
        string sayHello(string name);
    }
}

Next we need to alter HelloWorld.svc to match the above interface and to implement the actual method:

namespace HelloWorldService
{
   public class HelloWorld : IHelloWorld
    {
        public string sayHello(string name)
        {
            return "Hello " + name;
        }
    }
}

Since .NET Framework 4, Windows Communication Foundation has some default bindings defined. So although our Web.config file is actually pretty empty. We just can run the application and use the default bindings. When you run the service you’ll notice that Visual Studio open the “WCF Test Client”. This test client can also be manually used. You can find it here: C:\Program Files (x86)\Microsoft Visual Studio 10.0\Common7\IDE\WcfTestClient.exe

image

To test the service we click on the sayHello method, fill in an input value and click ‘Invoke’.

image

You can either choose to see the formatted information, on click on XML to see the raw information as it’s transmitted by WCF.

The request:

<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
  <s:Header>
    <Action s:mustUnderstand="1"                  xmlns="http://schemas.microsoft.com/ws/2005/05/addressing/none">                 http://tempuri.org/IHelloWorld/sayHello</Action>
  </s:Header>
  <s:Body>
    <sayHello xmlns="http://tempuri.org/">
      <name>Jeroen</name>
    </sayHello>
  </s:Body>
</s:Envelope>

The response:

<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">
  <s:Header />
  <s:Body>
    <sayHelloResponse xmlns="http://tempuri.org/">
      <sayHelloResult>Hello Jeroen</sayHelloResult>
    </sayHelloResponse>
  </s:Body>
</s:Envelope>

So now we have written our first WCF Service! We’ll continue by writing an client that uses this WCF Service instead of using the Visual Studio Test Client.

Writing an WCF Client

So now that we have our ultra valuable ‘Hello World’ service, we’ll need to write some code to consume this service. We can accomplice this using two different methods. First we could use the metadata that is exposed by the EndPoint to generate proxies on the fly. Secondly we could pre-distribute the metadata as a definition of the interface, or a dll that contains the IHelloWorld interface. In this post we’ll only explore the first method.

Using the metadata exposed by the EndPoint

We’ll start by creating a new console application and adding a new service reference to our WCF service:

image

We’ll go ahead and add our service (in our case located at http://localhost:5558/HelloWorld.svc) and call it HelloWorldReference.

image

After adding the above service reference, we’ll write some code that’s pretty self-explanatory:

static void Main(string[] args)
{
    // Create a new HelloWorldClient object 
    HelloWorldReference.HelloWorldClient client = new HelloWorldReference.HelloWorldClient();

    // Write the response from the sayHello method
    Console.WriteLine(client.sayHello("Jeroen"));

    // Close the WCF connection
    client.Close();

    // Just to display the result :-) 
    Console.Read();

}

First make sure your service is running and when you test the client, you should see some results:

image

03. November 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

Exploring Reactive Extension’s Subject

image

If you would want to create your own implementation of the IObservable<T> interface you may find out that you need to expose some methods to publish the items to the subscribers, throw errors and notify when the stream is completed. I hope you notice that this sounds exactly like methods that are available in the IObserver<T> interface (otherwise, make sure to read my post about IObservable).

Subject<T>

While it sounds pretty strange to implement both interfaces in just one type, it sure would make things easier. This is exactly what Subject<T> does. While you can expose your Subject<T> behind a method that just returns an IObservable<T> you can internally use the OnNext, OnError and OnCompleted methods to control the data. Let’s dive into a basic example where we’ll create a subject, then subscribe to that subject and a lust publish some data to the stream.

class Program
{
    static void Main()
    {
        // Create a new Subject<T> (as well IObservable<T> as IObserver<T>)
        var subject = new Subject<int>();

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some items to the observable.
        subject.OnNext(1);
        subject.OnNext(2);
        subject.OnNext(3);

        // Just to keep the console visible :-) 
        Console.ReadKey();  

    }
}

What actually happens is that the subscribe method is a part of the IObsersable<int> implementation of our Subject<int>. Since we provide a delegate to this method; the delegate’s action will be executed everytime a new item is published to it’s subscribers. It’s also possible to use this Subscribe method to pass combinations of delegates to be invoked for the OnNext, OnCompleted and OnError methods. It doesn’t end with Subject<T> since Reactive Extensions provide us with some variations. Those offer us slightly different implementations, with some drastic consequences.

ReplaySubject<T>

ReplaySubject<T> is different to Subject<T> since it will remember all publications that are made. Like this any subscribers that subscribe after publications have been made will still receive all publications. Let’s first alter our previous example and see what happens:

class Program
{
    static void Main()
    {
        // Create a new Subject<T> (as well IObservable<T> as IObserver<T>)
        var subject = new Subject<int>();

        // Push a item to the observable.
        subject.OnNext(1);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some more items to the observable.
        subject.OnNext(2);
        subject.OnNext(3);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

You’ll notice that the behavior of this code is pretty much what can be expected. You’ll get 2 and 3 printed in the console, but 1 is not printed since it had been published before we subscribed to the data source. Next we change our Subject<int> to the ReplaySubject<int> and see what impact this has.

class Program
{
    static void Main()
    {
        // Create a new ReplaySubject<T>
        var subject = new ReplaySubject<int>();

        // Push a item to the observable.
        subject.OnNext(1);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some more items to the observable.
        subject.OnNext(2);
        subject.OnNext(3);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

As you’ll notice the behavior of the program is drastically changed. By using an ReplaySubject<T> our subscriber gets all publications made, including those that where made before we subscribed to our source.

BehaviorSubject<T>

In a way, the BehaviorSubject<T> is similar to the ReplaySubject<T>. The main difference is that it only remembers the last publication and pushes this to the new subscribers. Also, it’s required to provide a default value of T. This means that all subscribers will certainly receive a value when subscribe to the data. Let’s explore some examples to check out the differences:

class Program
{
    static void Main()
    {
        // Create a new BehaviorSubject<T> 
        var subject = new BehaviorSubject<int>(1);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
} 

Rather obvious, the above code returns the default value that we provide when creating our BehaviorSubject<int>. Up till now the behavior seems rather similar to the ReplaySubject’s behavior. Let’s see what happens when we publish two items:

class Program
{
    static void Main()
    {
        // Create a new BehaviorSubject<T> 
        var subject = new BehaviorSubject<int>(1);

        // Push a item to the observable.
        subject.OnNext(2);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
} 

Publishing two items and then subscribing makes us receive only the last item that was published. Here we see a clear difference with ReplaySubject’s behavior since there we would have gotten both values. It seems that BehaviorSubject<T> only remembers the last published value. Let’s test to see whether this statement is true:

class Program
{
    static void Main()
    {
        // Create a new BehaviorSubject<T> 
        var subject = new BehaviorSubject<int>(1);

        // Push a item to the observable.
        subject.OnNext(2);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some more items to the observable.
        subject.OnNext(3);
        subject.OnNext(4);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

As we can see, we now get the behavior we expected: we get the value that was last published before we subscribed (in this case ‘2’) and we don’t get the initial value. We do get all values that are pushed after subscribing. Finally let’s see what happens when the stream is completed before we subscribe:

class Program
{
    static void Main()
    {
        // Create a new BehaviorSubject<T> 
        var subject = new BehaviorSubject<int>(1);

        // Push some items to the observable.
        subject.OnNext(2);
        subject.OnNext(3);
        subject.OnNext(4);

        // And mark this observable as completed.   
        subject.OnCompleted();

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

As you might have expected, no values will be published since the stream has already completed. Thus nothing is written to the console.

AsyncSubject<T>

The AsyncSubject<T> also has some similarities with the ReplaySubject<T> and BehaviorSubject<T>, but then again with  slightly different implementation. The AsyncSubject<T> will only store the last value, and will only publish this value when the stream is completed. An example:

class Program
{
    static void Main()
    {
        // Create a new AsyncSubject<T> 
        var subject = new AsyncSubject<int>();

        // Push a item to the observable.
        subject.OnNext(1);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some more items to the observable.
        subject.OnNext(2);
        subject.OnNext(3);

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

In the above example there will be no output written to the console since the stream is never completed. Let’s see what happens when we do complete the stream:

class Program
{
    static void Main()
    {
        // Create a new AsyncSubject<T> 
        var subject = new AsyncSubject<int>();

        // Push a item to the observable.
        subject.OnNext(1);

        // Use a delegate to subscribe an action to our observable.
        subject.Subscribe(Console.WriteLine);

        // Push some more items to the observable.
        subject.OnNext(2);
        subject.OnNext(3);

        // Complete our stream
        subject.OnCompleted();

        // Just to keep the console visible :-) 
        Console.ReadKey();

    }
}

As you can see we now get just one value pushed, being the last value before completion.

IScheduler?!

When looking at the constructors of all the above variations of Subject<T> you’ll notice that each one also has an overload with an IScheduler. We’ll explore this IScheduler soon in another post in this Rx series.

29. September 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

Reactive Extensions Unleashed

image

Reactive Extensions (Rx) is a new library for composing asynchronous and event-based programs using observable collections.

Why Reactive Extensions?

When currently working with data sources one will use an IEnumerable<T> to work with this source. IEnumerable<T> is synchronous, meaning that you as a developer have to ask the source to get the current element or to move to the next element (Current property and MoveNext method). The implication of this is that your application could get stuck whilst performing these actions.

IEnumerable<T> has some great advantages like the ability to query collections using LINQ, the fact remains that when dealing with asynchronous data it gets a lot more complicated. IObservable<T> would be the solution to this problem, since it’s the dual form of IEnumerable<T>. To make this duality complete, IObservable<T> should be queryable using LINQ as well. This can be achieved using Reactive Extensions.

The building blocks

IObservable<T>

IObservable<T> is one of the 2 main interfaces for working with Reactive Extensions. It’s not a complicated interface with only a Subscribe method. Since version 4 of the .NET Framework this interface is included in the BCL. If you’re still using .NET 3.5 you can use this interface as well since the Rx team has includes these interface in a separate assembly for this scenario (being System.Observable).

An implementation of this interface can best be described as a collection of elements of type T. The main target of this IObservable<T> interface is to provide elements to the observers (subscribers). Thus, an IObservable<Animal> can be thought of as a collection of Animals, where the Animals will be pushed to the subscribed observers.

IObserver<T>

IObserver<T> is the other main interface for working with Reactive Extensions and is as well included in the BCL since .NET Framework 4. The IObserver<T> interface is meant to be the dual of IEnumerable<T>.

Although using Rx you will be exposed to the IObservable<T> interface all the time, you almost never will implement these interfaces yourselves since Rx provides a lot of implementations out of the box.

Overloads of Subscribe

As you know IObservable exposes only one method, the Subscribe method. The basic extension method is an overload to Subscribe where you pass just on Action<T> that will be performed when OnNext is invoked. The other extensions are also very useful and allow you to pass different combinations of delegates for the different events that could occur when subscribing to the source.

public static class ObservableExtensions
{
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,                                                                Action<TSource> onNext);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,                                     Action<TSource> onNext, Action<Exception> onError);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,                                             Action<TSource> onNext, Action onCompleted);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,                  Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
} 

You will use these overloads a lot but there also are a lot of other static operators that are very helpful on the Observable static class. But how did we get this syntax? When we explore the IObserver interface we get something very familiar:

public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action                                                                                   onCompleted);

Putting this together with the fact you need to provide an Observer when Subscribing to an Observable, we should get something like this:

Observable.Subscribe(Observer.Create(Action<T> onNext, Action<Exception> onError, Action                                                                                  onCompleted));

Although this would have made sense, actually the Observer.Create isn’t really necessary; so the Rx-team has decided to omit this step.

Create and generate observables

The Observable class has plenty of methods that make creating common types of Obsersables more easy. We’ll explore some of these methods below and include an example of how to use them. In our examples I’ll use Subject<T> and its variants, I’ll explain what they exactly do and their differences in a later post, and I’m sure you can see the concept of them in the code.

Observable.Empty<T>()

This method returns an IObservable<T> that just publishes an OnCompleted.

// Create an empty Observable
var emptyObservable = Observable.Empty<int>();

Observable.Return<T>(T)

The return-method will return an observable that publishes the value that is provided and then publishes OnCompleted.

// Create our Observable
var returnObservable = Observable.Return<int>(1);

Observable.Never<T>()

This method will just return an IObservable<T> without publishing any events.

// Create our Observable
var neverObservable = Observable.Never<int>();

Observable.Throw<T>(Exception)

This method will only publish the specified exception.

// Create our Observable
var exceptionObservable = Observable.Throw<int>(new Exception("This should not happen :-) "));

Observable<T>.Create(Func<IObserver<T>,Action>

This method (and its signature) is a bit more complicated than the above methods. The main idea is that this method will let you specify a delegate that will be executed when a subscription is made. The IObserver<T> that made the subscription will be passed to the delegate you specify so that you can call the OnNext, OnError and OnCompleted methods as you wish. Your delegate actually is a Func that returns an Action. This action is the method that can be called when a subscriber disposes from their subscription (more on unsubscribing later in this post). Also notice the analogy with the Observer.Create explained above.

var obs = Observable.Create<int>(
        observable =>
        {
            observable.OnNext(1);
            observable.OnNext(2);
            observable.OnCompleted();
            return () => Console.WriteLine("Our observer has unsubscribed");
        });

obs.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));

Observable.Range(int, int)

This method just returns a range on integers. The first parameter is the initial value, the second is the number of values that should be generated. The example will write all values from 5 to 12.

var rangeObservable = Observable.Range(5, 7);
rangeObservable.Subscribe(Console.WriteLine);

Observable.Interval(TimeSpan)

The Interval method will publish values incrementing from 0 every period that you specify. In the example we’ll publish a value every 500 milliseconds.

var intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(500));
intervalObservable.Subscribe(Console.WriteLine);

This method can be handy when emulating fluctuating data like the stock market.

// Random generator to emulate fluctuations
var random = new Random();

// Last know stock value; we'll start at 10
var lastStock = 10.0;

// Start our fluctuation every 500 milliseconds
var interval = Observable.Interval(TimeSpan.FromMilliseconds(500))
    .Select(i =>
    {
        // Generate the fluctuation
        var fluctuation = random.NextDouble() - 0.5;

        lastStock += fluctuation;

        return lastStock;
    });

// Subscribe to our observable
interval.Subscribe(Console.WriteLine);

Observable.Start

This method has some different signatures. Basically the method allows you turn a Func<T> or an Action into an Observable. When using the overload with the Func<T>, the code will execute and when the Func return its value, it will be published followed by OnCompleted. An example:

var start = Observable.Start(() =>
{
    Console.Write("Getting some work done");
    for (int i = 0; i < 20; i++)
    {
        Thread.Sleep(100);
        Console.Write(".");
    }

    Console.WriteLine();

    return "My value";
});

start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));

When using the overload using Action, then the returned Observable will be of type IObservable<Unit>. Unit can best be described as the void for generic types. In this case, it is just used to publish a notice that the Action is complete. However this is not very important since OnCompleted will be published immediately after Unit anyway. Let’s write our above example as an Action:

var start = Observable.Start(() =>
{
    Console.Write("Getting some work done");

    for (int i = 0; i < 20; i++)
    {
        Thread.Sleep(100);
        Console.Write(".");
    }

    Console.WriteLine();

});

start.Subscribe(unit => Console.WriteLine("Unit published"), () =>                                                       Console.WriteLine("Action completed"));

An important thing to not when using Observable.Start is that it also starts the Observable on the moment specified by the Scheduler). By default this will be set to ‘Know’. A small code example that clearly shows this is the following where we reuse part of the above code, but sleep some milliseconds before subscribing. You’ll see that we do get values printed from the Observable before we subscribed.

var start = Observable.Start(() =>
{
    Console.Write("Getting some work done");
    for (int i = 0; i < 20; i++)
    {
        Thread.Sleep(100);
        Console.Write(".");
    }

    Console.WriteLine();

    return "My value";
});

Thread.Sleep(500);  // Wait before subscribing

Console.WriteLine("Subscribing"); // Make visible when we subscribe

start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));

ToObservable<T>(this IEnumerable<T>)

This method can be used to convert an IEnumerable<T> into an IObservable<T>. Although this method is pretty straightforward, it also has some great value. By using this method it’s very easy to get some quick mocking done. In addition to this, the ToObservable and ToEnumerable methods are important from a concurrency point of view since they can be used to convert between synchronous and asynchronous.

// Create a new IEnumerable
var enumerate = new List<int>()
                { 1, 2, 3 };

// Convert the IEnumerable to IObservable
var observable = enumerate.ToObservable();

// Subscribe to the IObservable
observable.Subscribe(Console.WriteLine);

ToEnumerable<T>(this IObservable<T>)

This function is analogue with the above function but works in the opposite direction. Given an IObservable<T>, this observable will be converted to an IEnumerable<T>.

Observable.Generate

Generate is a method for generating sequences of numbers. Let’s just stick with a basic example and create a sequence starting for 5, incrementing with 5 while the value is less than 50.

var generatedObservable = Observable.Generate(5, i => i < 50,  i => i + 5, i => i);
generatedObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

Check the existence of values

Moving on from methods that create or generate observables, these methods confirm the existence of a published value.

Any<T>(this IObservable<T>)

This method will either check whether the IObservable will publish any value at all, or if you choose to use the overload (taking a Func<T, bool>) then it will check if any of the values satisfy your predicate. One of the interesting things of Any is that it is non-blocking and thus returns an IObservable<bool>. This will only yield one value.

// Create a new observable
var obs = Observable.Range(10, 15);

// Does the observable contains any value?
obs.Any().Subscribe(Console.WriteLine);

// Does the observable contains any value that passes the specified predicate?
obs.Any(i => i > 100).Subscribe(Console.WriteLine);

// Does Observable.Empty contains any value?
Observable.Empty<int>().Any().Subscribe(Console.WriteLine);

Contains<T>(this IOservable<T>, T)

Contains<T> is a method quite similar to Any() but will only accept a value of T. Just like Any() this method returns an IObservable<bool>.

// Create a new observable
var obs = Observable.Range(10, 15);

// Does the observable contains the value 15?
obs.Contains(15).Subscribe(Console.WriteLine);

// Does Observable.Empty contains the value 5?
Observable.Empty<int>().Contains(5).Subscribe(Console.WriteLine);

IsEmpty<T>(this IObservable<T>)

This extension method only checks to see whether the stream is empty (actually the opposite of Any()). This method as well returns an IObservable<bool>.

// Create a new observable
var obs = Observable.Range(10, 15);

// Is the observable empty?
obs.IsEmpty().Subscribe(Console.WriteLine);

// Create a new observable and publish a value
var subject = new ReplaySubject<int>();
subject.OnNext(1);

// Is this observable empty?
subject.IsEmpty().Subscribe(Console.WriteLine);

// Is Observable.Empty really empty?
Observable.Empty<int>().IsEmpty().Subscribe(Console.WriteLine);

Filtering and Aggregating

Next we’ll explore some extension methods that provide some sort of filter to the stream or aggregate the data that is being returned. You’ll notice that these are similar to those used for IEnumerable<T>. We’ll just explore one of those basic method, and then give a list of other basic methods that don’t need further explanation.

First<T>

// Create a new observable
var obs = Observable.Range(10, 15);

// And get the first element of this observable
Console.WriteLine(obs.First());

// Try to get the first value of our Observable.Empty
// This will throw an InvalidOperationException("Sequence contains no elements.")
Console.WriteLine(Observable.Empty<int>().First());

The following methods are quite self-explanatory when you have experience with IEnumerable<T>. One should notice that some of them return IObservable<T> instead of T. The reason for this is that by making them return an IObservable<T> the asynchronous nature of Rx is preserved. If these methods would return just T, this would make those calls synchronous and these could become blocking calls. Some methods do just return T (First, FirstOrDefault, Last, LastOrDefault, Single, ..).

  • FirstOrDefault
  • Last
  • LastOrDefault
  • Single
  • Count
  • Min
  • Max
  • Sum
  • Average
  • GroupBy

Where<T>(this IObservable<T>,  Func<TSource, bool> predicate)

When talking about filtering it’s hard to ignore the ultimate filter where (which should be pretty familiar). In our example we’ll apply a filter that only prints where numbers where the modulo of 5 is 0.

// Create a new observable (10 -> 24)
var obs = Observable.Range(10, 15);

// Print only values where modulo 5 == 0
obs.Where(x => x % 5 == 0).Subscribe(Console.WriteLine);

Take<T>(this IObservable<T>, int)

Take will return you the first N publications specified by the integer value. Take(1) can be thought of as First with one large difference that it returns IObservable<T> instead of First that just returns T. In our example we’ll only take the first 3 values (begin 10, 11 and 12):

// Create a new observable
var obs = Observable.Range(10, 15);

// Take the first 3 items
obs.Take(3).Subscribe(Console.WriteLine);

Skip<T>(this IObservable<T>, int)

Skip will ignore the first N publications specified by the integer value. So while Take(3) returned the first 3 values (begin 10, 11 and 12), Skip(3) will ignore those and return the rest of the sequence.

// Create a new observable
var obs = Observable.Range(10, 15);

// Ignore the first 3 items
obs.Skip(3).Subscribe(Console.WriteLine);

DistinctUntilChanged<T>(this IObservable<T>)

Thus method will ignore all publications that have the same value as the previous value. In our example, the values 1, 2 and 3 will only be published once. This method can be very handy when handling user input that has (or can have) duplicates.

// Create a new observable
var obs = new Subject<int>();

// Subscribe to the observable
obs.DistinctUntilChanged().Subscribe(Console.WriteLine);

// Add some values
obs.OnNext(1);
obs.OnNext(1);
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(3);
obs.OnNext(3);
obs.OnNext(3);

Buffering and time shifting

There are different scenarios where the amount of data that is provided is so large that you don’t need all the data that is been pushed. Luckily Rx provides some great method to control the rate at which values are published.

BufferWithCount and BufferWithTime

These methods allow you to buffer a range of values, and republish those once the buffer is full. You can chose whether you want to specify to buffer a number of elements (BufferWithCount) or buffer all the values per timespan (BufferWithTime). Let’s see how we use this:

// Create a new observable
var obs = Observable.Range(10, 15);

// Create buffers of 4 values
obs.BufferWithCount(4).Subscribe(
        enumerable =>
        {
            Console.WriteLine("--- Buffer ---");

            foreach (var i in enumerable)
            {
                Console.WriteLine(i);
            }

            enumerable.ToList().ForEach(Console.WriteLine);
        }, () => Console.WriteLine("Obs Buffer Completed"));

// Create a new observable
var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));

// Create a buffer of 1 second
interval.BufferWithTime(TimeSpan.FromSeconds(1)).Subscribe(
    enumerable =>
    {
        Console.WriteLine("--- Buffer ---");

        foreach (var i in enumerable)
        {
            Console.WriteLine(i);
        }

    }, () => Console.WriteLine("Interval Buffer Completed"));

// Just to keep the console visible :-) 
Console.ReadKey();

Delay

This method will delay the entire Observable with the TimeSpan specified, or until the specified DateTime. In the example below we’ll subscribe twice to an Observable. In the first subscription we’ll delay the Observable with 5 seconds. In the second subscription we’ll delay to one minute.

// Create a new observable
var obs = Observable.Range(10, 15);

// Add a 5 second delay
obs.Delay(TimeSpan.FromSeconds(5)).Subscribe(Console.WriteLine);

// Delay observable to now + 1 minute
obs.Delay(DateTime.Now.AddMinutes(1)).Subscribe(Console.WriteLine);

Sample<T>(TimeSpan)

You can use the Sample<T> method to take one sample for the TimeSpan specified. 

// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(150));

// Only print a sample value every 1 second
obs.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

Throttle

The Throttle method is quite similar to the above methods except that it will ignore all values since the last published value if those occur before the specified timespan. In the first example we have a publishers that publishes a value every 100 milliseconds. Since we have subscribed with a Throttle of 200 milliseconds, no values are printed.

// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));

// Subscribe with a Throttle of 200 milliseconds
obs.Throttle(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

In the next example we’ll create a new observable (using Observable.Create) that generates new values at varying speeds. Since we’ll still subscribe with a throttle of 200 milliseconds, some values will get printed (actually, those where Thread.Sleep(..) was bigger than 200).

// Create a new observable
var obs = Observable.Create<int>(
    o =>
    {
        for (int i = 0; i < 100; i++)
        {
            o.OnNext(i);
            Thread.Sleep(i++ % 10 < 5 ? 100 : 300);
        }
        return () => { };
    });

// Subscribe with a Throttle of 200 milliseconds
obs.Throttle(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

This method can be handy when programming a auto-complete alike feature. When using Throttle on an Observable Event (e.g. a textbox), you only want to lookup the values when the user stops typing for more then some milliseconds, to decrease the number of lookups. We’ll explore this usage in detail in a later post.

Unsubscribing

Up till now we have discovered the basics of the Reactive Framework allowing us to create an IObservable, subscribe to it and perform some basic operations such as filtering, aggregating, … In addition to this, we explored some methods dealing with buffering and time shifting. A logical next step would be to look how we can unsubscribe from a subscription.

When you start looking for an Unsubscribe method, you’ll notice that such a method does not exist in the Reactive Framework. Instead of using an Unsubscribe method, Reactive Extensions will return an IDisposable every time a subscription is made. The best way to think of this IDisposable is as the subscription itself, thus disposing of the subscription will effectively unsubscribe as well. This way of working is (when you think of it) actually more logical then the unsubscribe-approach.

It’s important to note that when you call Dispose on the result of a subscribe call, this will not affect the underlying IObservable<T>, but just the instance of the subscription. This allows us to add and remove subscriptions to our IObservable<T> as we wish. In our example we start with 2 subscribers and then we dispose of one of the subscribers.

// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));

// Add the first subscription
var SubscriptionOne = obs.Subscribe(value => Console.WriteLine("Subscription 1 received: {0}",                                                                                       value));

// Add the second subscription
var SubscriptionTwo = obs.Subscribe(value => Console.WriteLine("Subscription 2 received: {0}",                                                                                       value));

// Let’s wait a bit
Thread.Sleep(500);

// Dispose of the first subscription
SubscriptionOne.Dispose();

// Write a notice to the console
Console.WriteLine("SubscriptionOne is disposed");

// Just to keep the console visible :-) 
Console.ReadLine();

When executing the code you should get an output like you can see below, making it clear that once we dispose of SubscriptionOne, the second subscription still is subscribed and receiving data from our IObservable:

image

In my above example it seems that all values of Interval are being generated by the same OnNext call (being only one call for both subscriptions). When we expand our sample tough, we can see that each subscription will have their own subscription on the Observable.Interval().

// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));

// Add the first subscription
var SubscriptionOne = obs.Subscribe(value => Console.WriteLine("Subscription 1 received: {0}",                                                                                        value));

// Let’s wait a bit
Thread.Sleep(500);

// Add the second subscription
var SubscriptionTwo = obs.Subscribe(value => Console.WriteLine("Subscription 2 received: {0}",                                                                                        value));

// Let’s wait a bit
Thread.Sleep(500);

// Dispose of the first subscription
SubscriptionOne.Dispose();

// Write a notice to the console
Console.WriteLine("SubscriptionOne is disposed");

// Just to keep the console visible :-) 
Console.ReadLine();

When look at the corresponding output it’s clear that each subscription has it’s own values.

image

So what are the benefits of using an IDisposable type instead of creating a new type or providing an unsubscribe method? Of course the obvious advantage is that it’s an existing type, this has as advantage that most people already know and understand this type. Secondly, having an unsubscribe method would mean that you have keep an eye on the state, as with event handlers, something you don’t need to worry about when using IDisposable since this is done for you.

Last (but not least), there’s an advantage of compositionality that we get using IDisposable; let’s assume this example:

var Obs1 = Observable.Empty<int>();
var Obs2 = Observable.Empty<int>();
var Obs3 = Observable.Empty<int>();

var MergeObs = Observable.Merge(Obs1, Obs2, Obs3);
var Disposable = MergeObs.Subscribe(Console.WriteLine);
Disposable.Dispose();

In this code we take 3 different Observables and merge those into the MergeObs Observable (more on merging in a later post); by using the compositionality of IDisposable, our Disposable object will be an CompositeDisposable containing the unsubscription handles for all 3 original Observables and handle the disposals of those when disposing of the Disposable object. Another side effect of Rx’s asynchronous nature is that it’s possible that for some of the Observables, the subscription hasn’t been made yet. When calling Dispose this would mean that you should dispose of an subscription that doesn’t exist yet. This is solved by using an MutableDisposable object that represents an IDisposable that isn’t known yet, but can be disposed before it’s know. Once the IDisposable gets set, it gets disposed immediately.

OnError and OnCompleted

Both OnError and OnCompleted will signify the completion of the stream. When a stream publishes one of these, it will be the last publications and no further calls to OnNext can be executed. Let’s try what happens:

// Create a new observable
var obs = new Subject<int>();

// Subscribe to our observable
obs.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));

// Publish a value
obs.OnNext(1);

// Complete the stream
obs.OnCompleted();

// Try to publish another value
obs.OnNext(2);

// Just to keep the console visible :-) 
Console.ReadLine();

As you’ll notice out first value get pushed at us, while the second value (after the OnCompleted) does not get published.

Want more?

Make sure to check out the Reactive Extensions homepage, the Rx Forum and stay tuned for more posts about Reactive Extensions.

22. September 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

IEnumerable and IObservable, a story of duality

IEnumerable has been around in the Microsoft .NET framework since the first versions. Reactive Extensions(Rx) a new library for composing asynchronous and event-based programs using observable collections. Before we dive into this exiting new world it’s handy to explore the duality that exists between IEnumerable and IObservable. IObservable is one of the new interfaces available since the .NET 4.0 framework. If you’re not familiar with IObservable, I propose you first read my previous post.

[more]

Remember IEnumerable?

Basically our IEnumerable<T> object provides us with a enumerator represented by the IEnumerator<T> interface with support for a simple iteration over a collection of any generic type. This behaviour is possible due to the implementation of the IEnumerator<T> interface. Let’s start with a example that’s just a iteration over a collection of animals where some of them have legs.

IEnumerable<Animal> animals = // I'm sure you get this ;-) 

var results =   from animal in animals
                where animal.Legs > 0
                select animal;

foreach (var a in results)
{
    Console.WriteLine(a.Name);
}

The above Animal object must of course implement IEnumerable<T> (in this case IEnumerable<Animal>). This IEnumerable<T> returns our IEnumerator object. This can be seen in it’s definition:

public interface IEnumerable<out T> : IEnumerable
{
    IEnumerator<T> GetEnumerator();
}


As you can see, the IEnumerable<T> exposes an IEnumerator<T> object each time the GetEnumerator method is called. When you look at the IEnumerator’s definition you’ll notice that it only consists of 3 methods. The Reset method was put in place primarily for COM interopability. The Current property keeps returning the same object until MoveNext is called. This MoveNext method sets Current to the next element if there is a next element available. 

public interface IEnumerator<T>
{
    T Current { get; }
    bool MoveNext();
    void Reset();
}

Let’s for a moment apply what we have learned to the first example. Let’s assume we have a zoo with a list of all animals. Due to some undefined magic, all gates are left open, so we need a list of all animals capable of escaping (let’s for the purpose of this example assume that all animals without legs can’t escape). The where-clause in the query expression will filter all those animals that have legs. In the foreach, we’ll encounter a series of subsequent MoveNext and Current calls. This will continue until MoveNext returns false (when Current is the last element).

So what about duality?

As said before, IObservable is ‘dual’ with IEnumerable. What is this thing called duality? Duality is a mathematical concept that can be found in different fields like formal logic (De Morgan’s law) or calculus and analytics (limit and colimit of functions).

IEnumerable and IObservable both deal with collections, but the main difference between them is how they present it to the developer. While IEnumerable’s approach is to keep asking “give me the next object” (pull). IObservable on the other hand provides this objects to its subscribers (push). The main difference is that with IEnumerable the consumer is in control of getting the next element, while in IObservable it’s the producter who’s in control.

Let’s take the IEnumerable’s definition to explore the duality in depth:

public interface IEnumerable<out T> : IEnumerable
{
    new IEnumerator<T> GetEnumerator();
}

In order to turn IEnumerable in a IObservable we first need dualize the methods. In this case the dual for GetEnumerator is Subscribe. Since GetEnumerator takes no arguments, Subscribe will return void.

Since GetEnumerator returns IEnumerator<T>, our Subscribe method will have the equivalent IObservable<T> as argument. Applying these changes get us to the IObservable interface:

public interface IObservable<out T> : IEnumerable
{
    void Subscribe(IObserver<T> observer);
}

Next let’s move on to our IEnumerator interface, where we’ll also begin with the basic definition. (Yes, we left our the reset method since it’s only there for legacy purposes).

public interface IEnumerator
{
    new T Current { get; }
    bool MoveNext();
}

So let’s start our transformation again. The current property (getter property) will be turned into a MoveNext property (and a setter):

public interface IEnumerator
{
    new T OnNext { set; }
    bool MoveNext();
}

But wait, since a property is basically just a method, and we’ll only use the setter-functionality of it; the final interface will contain a method OnNext.

public interface IEnumerator
{
    void OnNext(T value);
    bool MoveNext();
}

Our second method MoveNext, gives us a few additional questions. In the IEnumerator contract this method returns a bool (which makes absolutely sense), but that ain’t exactly true. Our MoveNext method can throw an exception (e.g. InvalidOperationException). In this case we would need two different return types (bool or Exception). Because of this, the dual form will actually be two methods: OnError (for the exceptions) and OnCompleted when MoveNext returns false. You may wondering about what’s happening when MoveNext() returns true. Actually, think about it carefully. Does such case concerns to subscribers? no it does not or we might say it is dual to OnNext()method. So finally will get following contracts:

public interface IObserver<in T>
{
     void OnCompleted();
     void OnError(Exception error);
     void OnNext(T value);
}

Now both of our interfaces are taking form, and when we put them together we got:

public interface IObservable<T> : IEnumerable
{
    void Subscribe(IObserver<T> observer);
}

public interface IObserver<T> : IDisposable, IEnumerator
{
     void OnCompleted();     void OnNext(T value);
     void OnError(Exception error);
}

When we look back we may notice how we forgot one interface in the road. On the way we have seen how IEnumerable.GetEnumerator turns into IObservable.Subscribe() step by step. However, what previously was IEnumerator and have became an IObserver both implements IDisposable interface as well. Actually we could say that IEnumerable.GetEnumerator() could be defined as:

IEnumerator<T>, IDisposable IEnumerator.GetEnumerator()

On the other hand, during such transformation we got a Subscribe() method but not UnSubscribe one. In the same way we want an observer to be subscribed to observable collection we might want to unsubscribe it. It could easily resolve by leaving IDisposable interface as returning type as follows:

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T> : IDisposable, IEnumerator
{
     void OnCompleted();     void OnNext(T value);
     void OnError(Exception error);
}

 

 

 

 

Concluding

The really interesting part here is (beyond the theoretical explanation) is that any developer could understand that an event or any other push-based notification or async technology can be handled with LINQ in the same way that whatever IEnumerable oject does. Of course the whole IObserverable story will  become even more valuable when using it in combination with Reactive Extensions (Rx), but more on that subject later.

24. August 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

The new kid on the block: IObservable

In the Microsoft .NET Framework 4.0 a new interface is available, strongly focused on the publish/subscribe pattern (commonly found in the mechanism for push-based notifications). The idea is all about two new generic interface IObservable<T> and IObserver<T>. IObservable is responsible to provide all the functionality to the publisher, and IObserver to the subscriber. These are also know as provider and observer. Just keep in mind what each of them is supposed to do and you’ll be just fine. The contracts of both interfaces are declared as follows:

public interface IObserver<in T>
{
     void OnCompleted();
     void OnError(Exception error);
     void OnNext(T value);
}
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

[more]

Please, observe me!

The main target of the IObservable interface is to provide elements to the observers (subscribers). These observables (the publishers) own just one method, used to subscribe to the observer. Each time the observable pushes an object to the observer, it will be caught by the observer’s OnNext method. Let’s jump into an example with a basic class Cyclist:

class Cyclist
{
    private readonly string _name;
    private long _bestTime;
    private long _lastTime;
    private TimeSpan _totalTime;

    public Cyclist(string name)
    {
        _name = name;
        _bestTime = long.MaxValue;
        _totalTime = new TimeSpan();
    }

    public TimeSpan TotalTime
    {
        get
        {
            return _totalTime;
        }
    }

    // Method to update all our statistics
    public void RegisterRound(long roundtime)
    {
        // Saving the last round time
        _lastTime = roundtime;

        // Is the last round the best time?
        _bestTime = Math.Min(roundtime, _bestTime);

        // Total time
        _totalTime += new TimeSpan(roundtime);
    }

    public override string ToString()
    {
        return string.Format("{0} \n- TotalTime: {3}\n- LastTime: {1}\n- BestTime: {2}\n",
            this._name, new TimeSpan(this._lastTime), new TimeSpan(this._bestTime),
                                                                         this._totalTime);
    }
}

In addition to this class we’ll have a class that describes a race (I suppose this isn’t a very common scenario for cyclists to just ride laps, but it’ for a greater purpose).

class Race
{
    private int _totalRounds;
    private int _currentRound;
    private List<Cyclist> _cyclists;

    public Race(int totalRounds)
    {
        _cyclists = new List<Cyclist>();
        _totalRounds = totalRounds;
    }

    public int TotalRounds
    {
        get { return _totalRounds; }
    }

    public int CurrentRound
    {
        get { return _currentRound; }
        set { _currentRound = value; }
    }

    public List<Cyclist> Cyclists
    {
        get { return _cyclists; }
        set { _cyclists = value; }
    }

}

Of course, our goal is to monitor (or ‘observe’) this race. Let’s say our race is called SuperRace. This race is will need to contain a Race object. This Race object will need to contain a generic list of all participants. And what’s a race without some observing journalists? We’ll add two of those as well (named CNN and BBC), who will be the observable. Thus these journalists will become our SuperRace’s observers. Our SuperRace is defined as follows:

class SuperRace : IObservable<Race>
{
    List<IObserver<Race>> display = new List<IObserver<Race>>();
    Race race = new Race(40);

    public void Start()
    {
        race.Cyclists = new List<Cyclist>
        {
            new Cyclist("George Shields"),
            new Cyclist("Edward Begay"),
            new Cyclist("Craig Foy")
        };

        new Thread(() =>
        {
            Running();
        }).Start(); // 3.. 2.. 1.. GO!!! 

    }

    private void Running()
    {
        Random rnd = new Random();

        // Round loop
        for (int round = 0; round < race.TotalRounds; round++)
        {

            foreach (var c in race.Cyclists)
            {
                c.RegisterRound(rnd.Next(1000000000, int.MaxValue));
            }

            // Increasing num of rounds done
            race.CurrentRound++;

            // Oervers notification
            foreach (var d in display)
            {
                d.OnNext(race);
            }

            // Wait for a second
            Thread.Sleep(1000);
        }

        // Completed. observers notification
        foreach (var d in display)
        {
            d.OnCompleted();
        }

    }

    public IDisposable Subscribe(IObserver<Race> observer)
    {
        // New observer
        display.Add(observer);
        return new Disposable(() =>  display.Remove(observer));
    }

    public class Disposable : IDisposable
    {
        private Action _disposeAction;

        public Disposable(Action dispose)
        {
            _disposeAction = dispose;
        }

        public void Dispose()
        {
            _disposeAction();

        }
    }
}

What just create 3 cyclists who are going to participate in the race. At the end a new thread is created and the race starts. The ThreadStart delegate points to a method called Running() and this method will hold the racing timeline.  The total rounds are set by default at 40 so during 40 rounds all three cyclists will be gathering elapsed time and for each lap all the observers are going to be notified. RaceMonitor will be displaying race results in ascending order. Before the next lap, thread gets slept for a 1 second in order to give more emotion to simulated racing ;-)

Let me observe you

So we have the first part of our story (the IObservable) all finished up. At the other side of the IObservable we had the IObserver (you know, the journalists). We’ll describe these journalists as a ‘RaceMonitor’ since they are monitoring the race. The class is pretty straightforward:

class RaceMonitor : IObserver<Race>
{
    public void OnCompleted()
    {
        Console.WriteLine("FINISH!");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine(error.Message);
    }

    public void OnNext(Race value)
    {
        Console.Clear();
        Console.WriteLine("Race Round {0}", value.CurrentRound);

        var cyclists = from cyclist in value.Cyclists
                        orderby cyclist.TotalTime
                        select cyclist;

        foreach (var cyclist in cyclists)
        {
            Console.WriteLine(cyclist.ToString());
        }

    }
}

The observers will be represented by this RaceMonitor class and will subscribe to our SuperRace trough the Subscribe method in the main method:

class Program
{
    static void Main(string[] args)
    {
        SuperRace superRace = new SuperRace();

        // Our observers
        RaceMonitor cnn = new RaceMonitor();
        RaceMonitor bbc = new RaceMonitor();

        // Subscribe the observers to the race
        superRace.Subscribe(cnn);
        superRace.Subscribe(bbc);

        // Let race start
        superRace.Start();

        Console.Read();
    }
}

And of course we just run the application to try it out.. and Craig Foy wins the race!

image

Conclusion

As you can see IObservable and IObserver provide an easy way to created publisher/subscriber based programs. Stay tuned for a post about the duality between IObservable and IEnumerable. When dealing with these patterns one will most likely use Reactive Extensions (Rx), but more on that subject later.

Many thanks to Bart De Smet for providing some great insights about this topic.

18. August 2010 by Jeroen Verhulst
Categories: Uncategorized | Leave a comment

← Older posts