//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen //Permission is hereby granted, free of charge, to any person //obtaining a copy of this software and associated documentation //files (the "Software"), to deal in the Software without //restriction, including without limitation the rights to use, //copy, modify, merge, publish, distribute, sublicense, and/or sell //copies of the Software, and to permit persons to whom the //Software is furnished to do so, subject to the following //conditions: //The above copyright notice and this permission notice shall be //included in all copies or substantial portions of the Software. //THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, //EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES //OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND //NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT //HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, //WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING //FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR //OTHER DEALINGS IN THE SOFTWARE. using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Net; using System.Text; using System.Net.Sockets; using System.Threading; namespace Apollo.Common.Cache { /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal delegate T UseSocket<T>(PooledSocket socket); internal delegate void UseSocket(PooledSocket socket); /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal class PooledSocket : IDisposable { private SocketPool socketPool; private Socket socket; private Stream stream; public readonly DateTime Created; public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout) { this.socketPool = socketPool; Created = DateTime.Now; //Set up the socket. socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout); socket.ReceiveTimeout = sendReceiveTimeout; socket.SendTimeout = sendReceiveTimeout; //Do not use Nagle's Algorithm socket.NoDelay = true; //Establish connection socket.Connect(endPoint); //Wraps two layers of streams around the socket for communication. stream = new BufferedStream(new NetworkStream(socket, false)); } /// <summary> /// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool. /// </summary> public void Dispose() { socketPool.Return(this); } /// <summary> /// This method closes the underlying stream and socket. /// </summary> public void Close() { if (stream != null) { try { stream.Close(); } catch (Exception e) { Console.WriteLine("Error closing stream: " + socketPool.Host); } stream = null; } if (socket != null) { try { socket.Shutdown(SocketShutdown.Both); } catch (Exception e) { Console.WriteLine("Error shutting down socket: " + socketPool.Host); } try { socket.Close(); } catch (Exception e) { Console.WriteLine("Error closing socket: " + socketPool.Host); } socket = null; } } /// <summary> /// Checks if the underlying socket and stream is connected and available. /// </summary> public bool IsAlive { get { return socket != null && socket.Connected && stream.CanRead; } } /// <summary> /// Writes a string to the socket encoded in UTF8 format. /// </summary> public void Write(string str) { Write(Encoding.UTF8.GetBytes(str)); } /// <summary> /// Writes an array of bytes to the socket and flushes the stream. /// </summary> public void Write(byte[] bytes) { stream.Write(bytes, 0, bytes.Length); stream.Flush(); } /// <summary> /// Reads from the socket until the sequence ' ' is encountered, /// and returns everything up to but not including that sequence as a UTF8-encoded string /// </summary> public string ReadLine() { MemoryStream buffer = new MemoryStream(); int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { buffer.WriteByte(13); gotReturn = false; } } if (b == 13) { gotReturn = true; } else { buffer.WriteByte((byte)b); } } return Encoding.UTF8.GetString(buffer.GetBuffer()); } /// <summary> /// Reads a response line from the socket, checks for general memcached errors, and returns the line. /// If an error is encountered, this method will throw an exception. /// </summary> public string ReadResponse() { string response = ReadLine(); if (String.IsNullOrEmpty(response)) { throw new Exception("Received empty response."); } if (response.StartsWith("ERROR") || response.StartsWith("CLIENT_ERROR") || response.StartsWith("SERVER_ERROR")) { throw new Exception("Server returned " + response); } return response; } /// <summary> /// Fills the given byte array with data from the socket. /// </summary> public void Read(byte[] bytes) { if (bytes == null) { return; } int readBytes = 0; while (readBytes < bytes.Length) { readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes)); } } /// <summary> /// Reads from the socket until the sequence ' ' is encountered. /// </summary> public void SkipUntilEndOfLine() { int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { gotReturn = false; } } if (b == 13) { gotReturn = true; } } } /// <summary> /// Resets this PooledSocket by making sure the incoming buffer of the socket is empty. /// If there was any leftover data, this method return true. /// </summary> public bool Reset() { if (socket.Available > 0) { byte[] b = new byte[socket.Available]; Read(b); return true; } return false; } } /// <summary> /// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects. /// This class contains the server-selection logic, and contains methods for executing a block of code on /// a socket from the server corresponding to a given key. /// </summary> internal class ServerPool { //Expose the socket pools. private SocketPool[] hostList; internal SocketPool[] HostList { get { return hostList; } } private Dictionary<uint, SocketPool> hostDictionary; private uint[] hostKeys; //Internal configuration properties private int sendReceiveTimeout = 2000; private uint maxPoolSize = 10; private uint minPoolSize = 5; private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30); internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } } internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } } internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } } internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } } /// <summary> /// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools. /// </summary> internal ServerPool(string[] hosts) { hostDictionary = new Dictionary<uint, SocketPool>(); List<SocketPool> pools = new List<SocketPool>(); List<uint> keys = new List<uint>(); foreach (string host in hosts) { //Create pool SocketPool pool = new SocketPool(this, host.Trim()); //Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys. for (int i = 0; i < 250; i++) { uint key = (uint)i; if (!hostDictionary.ContainsKey(key)) { hostDictionary[key] = pool; keys.Add(key); } } pools.Add(pool); } //Hostlist should contain the list of all pools that has been created. hostList = pools.ToArray(); //Hostkeys should contain the list of all key for all pools that have been created. //This array forms the server key continuum that we use to lookup which server a //given item key hash should be assigned to. keys.Sort(); hostKeys = keys.ToArray(); } /// <summary> /// Given an item key hash, this method returns the serverpool which is closest on the server key continuum. /// </summary> internal SocketPool GetSocketPool(uint hash) { //Quick return if we only have one host. if (hostList.Length == 1) { return hostList[0]; } //New "ketama" host selection. int i = Array.BinarySearch(hostKeys, hash); //If not exact match... if (i < 0) { //Get the index of the first item bigger than the one searched for. i = ~i; //If i is bigger than the last index, it was bigger than the last item = use the first item. if (i >= hostKeys.Length) { i = 0; } } return hostDictionary[hostKeys[i]]; } internal SocketPool GetSocketPool(string host) { return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; }); } /// <summary> /// This method executes the given delegate on a socket from the server that corresponds to the given hash. /// If anything causes an error, the given defaultValue will be returned instead. /// This method takes care of disposing the socket properly once the delegate has executed. /// </summary> internal T Execute<T>(uint hash, T defaultValue, UseSocket<T> use) { return Execute(GetSocketPool(hash), defaultValue, use); } internal T Execute<T>(SocketPool pool, T defaultValue, UseSocket<T> use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { return use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute<T>: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } return defaultValue; } internal void Execute(SocketPool pool, UseSocket use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } } /// <summary> /// This method executes the given delegate on all servers. /// </summary> internal void ExecuteAll(UseSocket use) { foreach (SocketPool socketPool in hostList) { Execute(socketPool, use); } } } /// <summary> /// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for /// acquiring or returning PooledSockets. /// </summary> [DebuggerDisplay("[ Host: {Host} ]")] internal class SocketPool { /// <summary> /// If the host stops responding, we mark it as dead for this amount of seconds, /// and we double this for each consecutive failed retry. If the host comes alive /// again, we reset this to 1 again. /// </summary> private int deadEndPointSecondsUntilRetry = 1; private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes private ServerPool owner; private IPEndPoint endPoint; private Queue<PooledSocket> queue; //Debug variables and properties private int newsockets = 0; private int failednewsockets = 0; private int reusedsockets = 0; private int deadsocketsinpool = 0; private int deadsocketsonreturn = 0; private int dirtysocketsonreturn = 0; private int acquired = 0; public int NewSockets { get { return newsockets; } } public int FailedNewSockets { get { return failednewsockets; } } public int ReusedSockets { get { return reusedsockets; } } public int DeadSocketsInPool { get { return deadsocketsinpool; } } public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } } public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } } public int Acquired { get { return acquired; } } public int Poolsize { get { return queue.Count; } } //Public variables and properties public readonly string Host; private bool isEndPointDead = false; public bool IsEndPointDead { get { return isEndPointDead; } } private DateTime deadEndPointRetryTime; public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } } internal SocketPool(ServerPool owner, string host) { Host = host; this.owner = owner; endPoint = getEndPoint(host); queue = new Queue<PooledSocket>(); } /// <summary> /// This method parses the given string into an IPEndPoint. /// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception. /// </summary> private static IPEndPoint getEndPoint(string host) { //Parse port, default to 11211. int port = 11211; if (host.Contains(":")) { string[] split = host.Split(new char[] { ':' }); if (!Int32.TryParse(split[1], out port)) { throw new ArgumentException("Unable to parse host: " + host); } host = split[0]; } //Parse host string. IPAddress address; if (IPAddress.TryParse(host, out address)) { //host string successfully resolved as an IP address. } else { //See if we can resolve it as a hostname try { address = Dns.GetHostEntry(host).AddressList[0]; } catch (Exception e) { Console.WriteLine("Unable to resolve host: " + host); return null; } } return new IPEndPoint(address, port); } /// <summary> /// Gets a socket from the pool. /// If there are no free sockets, a new one will be created. If something goes /// wrong while creating the new socket, this pool's endpoint will be marked as dead /// and all subsequent calls to this method will return null until the retry interval /// has passed. /// </summary> internal PooledSocket Acquire() { //Do we have free sockets in the pool? //if so - return the first working one. //if not - create a new one. Interlocked.Increment(ref acquired); lock (queue) { while (queue.Count > 0) { PooledSocket socket = queue.Dequeue(); if (socket != null && socket.IsAlive) { Interlocked.Increment(ref reusedsockets); return socket; } Interlocked.Increment(ref deadsocketsinpool); } } Interlocked.Increment(ref newsockets); //If we know the endpoint is dead, check if it is time for a retry, otherwise return null. if (isEndPointDead) { if (DateTime.Now > deadEndPointRetryTime) { //Retry isEndPointDead = false; } else { //Still dead return null; } } //Try to create a new socket. On failure, mark endpoint as dead and return null. try { PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout); //Reset retry timer on success. deadEndPointSecondsUntilRetry = 1; return socket; } catch (Exception e) { Interlocked.Increment(ref failednewsockets); Console.WriteLine("Error connecting to: " + endPoint.Address); //Mark endpoint as dead isEndPointDead = true; //Retry in 2 minutes deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry); if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry) { deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time } return null; } } /// <summary> /// Returns a socket to the pool. /// If the socket is dead, it will be destroyed. /// If there are more than MaxPoolSize sockets in the pool, it will be destroyed. /// If there are less than MinPoolSize sockets in the pool, it will always be put back. /// If there are something inbetween those values, the age of the socket is checked. /// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be /// put back in the pool. /// </summary> internal void Return(PooledSocket socket) { //If the socket is dead, destroy it. if (!socket.IsAlive) { Interlocked.Increment(ref deadsocketsonreturn); socket.Close(); } else { //Clean up socket if (socket.Reset()) { Interlocked.Increment(ref dirtysocketsonreturn); } //Check pool size. if (queue.Count >= owner.MaxPoolSize) { //If the pool is full, destroy the socket. socket.Close(); } else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge) { //If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it. socket.Close(); } else { //Put the socket back in the pool. lock (queue) { queue.Enqueue(socket); } } } } } }
Server Pool
//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen //Permission is hereby granted, free of charge, to any person //obtaining a copy of this software and associated documentation //files (the "Software"), to deal in the Software without //restriction, including without limitation the rights to use, //copy, modify, merge, publish, distribute, sublicense, and/or sell //copies of the Software, and to permit persons to whom the //Software is furnished to do so, subject to the following //conditions: //The above copyright notice and this permission notice shall be //included in all copies or substantial portions of the Software. //THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, //EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES //OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND //NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT //HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, //WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING //FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR //OTHER DEALINGS IN THE SOFTWARE. using System.Net.Sockets; using System.Net; using System.IO; using System; using System.Threading; using System.Collections.Generic; using System.Text; namespace Apollo.Common.Cache { /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal class PooledSocket : IDisposable { private SocketPool socketPool; private Socket socket; private Stream stream; public readonly DateTime Created; public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout) { this.socketPool = socketPool; Created = DateTime.Now; //Set up the socket. socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout); socket.ReceiveTimeout = sendReceiveTimeout; socket.SendTimeout = sendReceiveTimeout; //Do not use Nagle's Algorithm socket.NoDelay = true; //Establish connection socket.Connect(endPoint); //Wraps two layers of streams around the socket for communication. stream = new BufferedStream(new NetworkStream(socket, false)); } /// <summary> /// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool. /// </summary> public void Dispose() { socketPool.Return(this); } /// <summary> /// This method closes the underlying stream and socket. /// </summary> public void Close() { if (stream != null) { try { stream.Close(); } catch (Exception e) { Console.WriteLine("Error closing stream: " + socketPool.Host); } stream = null; } if (socket != null) { try { socket.Shutdown(SocketShutdown.Both); } catch (Exception e) { Console.WriteLine("Error shutting down socket: " + socketPool.Host); } try { socket.Close(); } catch (Exception e) { Console.WriteLine("Error closing socket: " + socketPool.Host); } socket = null; } } /// <summary> /// Checks if the underlying socket and stream is connected and available. /// </summary> public bool IsAlive { get { return socket != null && socket.Connected && stream.CanRead; } } /// <summary> /// Writes a string to the socket encoded in UTF8 format. /// </summary> public void Write(string str) { Write(Encoding.UTF8.GetBytes(str)); } /// <summary> /// Writes an array of bytes to the socket and flushes the stream. /// </summary> public void Write(byte[] bytes) { stream.Write(bytes, 0, bytes.Length); stream.Flush(); } /// <summary> /// Reads from the socket until the sequence ' ' is encountered, /// and returns everything up to but not including that sequence as a UTF8-encoded string /// </summary> public string ReadLine() { MemoryStream buffer = new MemoryStream(); int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { buffer.WriteByte(13); gotReturn = false; } } if (b == 13) { gotReturn = true; } else { buffer.WriteByte((byte)b); } } return Encoding.UTF8.GetString(buffer.GetBuffer()); } /// <summary> /// Reads a response line from the socket, checks for general memcached errors, and returns the line. /// If an error is encountered, this method will throw an exception. /// </summary> public string ReadResponse() { string response = ReadLine(); if (String.IsNullOrEmpty(response)) { throw new Exception("Received empty response."); } if (response.StartsWith("ERROR") || response.StartsWith("CLIENT_ERROR") || response.StartsWith("SERVER_ERROR")) { throw new Exception("Server returned " + response); } return response; } /// <summary> /// Fills the given byte array with data from the socket. /// </summary> public void Read(byte[] bytes) { if (bytes == null) { return; } int readBytes = 0; while (readBytes < bytes.Length) { readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes)); } } /// <summary> /// Reads from the socket until the sequence ' ' is encountered. /// </summary> public void SkipUntilEndOfLine() { int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { gotReturn = false; } } if (b == 13) { gotReturn = true; } } } /// <summary> /// Resets this PooledSocket by making sure the incoming buffer of the socket is empty. /// If there was any leftover data, this method return true. /// </summary> public bool Reset() { if (socket.Available > 0) { byte[] b = new byte[socket.Available]; Read(b); return true; } return false; } } internal delegate T UseSocket<T>(PooledSocket socket); internal delegate void UseSocket(PooledSocket socket); /// <summary> /// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects. /// This class contains the server-selection logic, and contains methods for executing a block of code on /// a socket from the server corresponding to a given key. /// </summary> internal class ServerPool { //Expose the socket pools. private SocketPool[] hostList; internal SocketPool[] HostList { get { return hostList; } } private Dictionary<uint, SocketPool> hostDictionary; private uint[] hostKeys; //Internal configuration properties private int sendReceiveTimeout = 2000; private uint maxPoolSize = 10; private uint minPoolSize = 5; private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30); internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } } internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } } internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } } internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } } /// <summary> /// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools. /// </summary> internal ServerPool(string[] hosts) { hostDictionary = new Dictionary<uint, SocketPool>(); List<SocketPool> pools = new List<SocketPool>(); List<uint> keys = new List<uint>(); foreach (string host in hosts) { //Create pool SocketPool pool = new SocketPool(this, host.Trim()); //Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys. for (int i = 0; i < 250; i++) { uint key = (uint)i; if (!hostDictionary.ContainsKey(key)) { hostDictionary[key] = pool; keys.Add(key); } } pools.Add(pool); } //Hostlist should contain the list of all pools that has been created. hostList = pools.ToArray(); //Hostkeys should contain the list of all key for all pools that have been created. //This array forms the server key continuum that we use to lookup which server a //given item key hash should be assigned to. keys.Sort(); hostKeys = keys.ToArray(); } /// <summary> /// Given an item key hash, this method returns the serverpool which is closest on the server key continuum. /// </summary> internal SocketPool GetSocketPool(uint hash) { //Quick return if we only have one host. if (hostList.Length == 1) { return hostList[0]; } //New "ketama" host selection. int i = Array.BinarySearch(hostKeys, hash); //If not exact match... if (i < 0) { //Get the index of the first item bigger than the one searched for. i = ~i; //If i is bigger than the last index, it was bigger than the last item = use the first item. if (i >= hostKeys.Length) { i = 0; } } return hostDictionary[hostKeys[i]]; } internal SocketPool GetSocketPool(string host) { return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; }); } /// <summary> /// This method executes the given delegate on a socket from the server that corresponds to the given hash. /// If anything causes an error, the given defaultValue will be returned instead. /// This method takes care of disposing the socket properly once the delegate has executed. /// </summary> internal T Execute<T>(uint hash, T defaultValue, UseSocket<T> use) { return Execute(GetSocketPool(hash), defaultValue, use); } internal T Execute<T>(SocketPool pool, T defaultValue, UseSocket<T> use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { return use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute<T>: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } return defaultValue; } internal void Execute(SocketPool pool, UseSocket use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } } /// <summary> /// This method executes the given delegate on all servers. /// </summary> internal void ExecuteAll(UseSocket use) { foreach (SocketPool socketPool in hostList) { Execute(socketPool, use); } } } /// <summary> /// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for /// acquiring or returning PooledSockets. /// </summary> internal class SocketPool { /// <summary> /// If the host stops responding, we mark it as dead for this amount of seconds, /// and we double this for each consecutive failed retry. If the host comes alive /// again, we reset this to 1 again. /// </summary> private int deadEndPointSecondsUntilRetry = 1; private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes private ServerPool owner; private IPEndPoint endPoint; private Queue<PooledSocket> queue; //Debug variables and properties private int newsockets = 0; private int failednewsockets = 0; private int reusedsockets = 0; private int deadsocketsinpool = 0; private int deadsocketsonreturn = 0; private int dirtysocketsonreturn = 0; private int acquired = 0; public int NewSockets { get { return newsockets; } } public int FailedNewSockets { get { return failednewsockets; } } public int ReusedSockets { get { return reusedsockets; } } public int DeadSocketsInPool { get { return deadsocketsinpool; } } public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } } public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } } public int Acquired { get { return acquired; } } public int Poolsize { get { return queue.Count; } } //Public variables and properties public readonly string Host; private bool isEndPointDead = false; public bool IsEndPointDead { get { return isEndPointDead; } } private DateTime deadEndPointRetryTime; public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } } internal SocketPool(ServerPool owner, string host) { Host = host; this.owner = owner; endPoint = getEndPoint(host); queue = new Queue<PooledSocket>(); } /// <summary> /// This method parses the given string into an IPEndPoint. /// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception. /// </summary> private static IPEndPoint getEndPoint(string host) { //Parse port, default to 11211. int port = 11211; if (host.Contains(":")) { string[] split = host.Split(new char[] { ':' }); if (!Int32.TryParse(split[1], out port)) { throw new ArgumentException("Unable to parse host: " + host); } host = split[0]; } //Parse host string. IPAddress address; if (IPAddress.TryParse(host, out address)) { //host string successfully resolved as an IP address. } else { //See if we can resolve it as a hostname try { address = Dns.GetHostEntry(host).AddressList[0]; } catch (Exception e) { Console.WriteLine("Unable to resolve host: " + host); return null; } } return new IPEndPoint(address, port); } /// <summary> /// Gets a socket from the pool. /// If there are no free sockets, a new one will be created. If something goes /// wrong while creating the new socket, this pool's endpoint will be marked as dead /// and all subsequent calls to this method will return null until the retry interval /// has passed. /// </summary> internal PooledSocket Acquire() { //Do we have free sockets in the pool? //if so - return the first working one. //if not - create a new one. Interlocked.Increment(ref acquired); lock (queue) { while (queue.Count > 0) { PooledSocket socket = queue.Dequeue(); if (socket != null && socket.IsAlive) { Interlocked.Increment(ref reusedsockets); return socket; } Interlocked.Increment(ref deadsocketsinpool); } } Interlocked.Increment(ref newsockets); //If we know the endpoint is dead, check if it is time for a retry, otherwise return null. if (isEndPointDead) { if (DateTime.Now > deadEndPointRetryTime) { //Retry isEndPointDead = false; } else { //Still dead return null; } } //Try to create a new socket. On failure, mark endpoint as dead and return null. try { PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout); //Reset retry timer on success. deadEndPointSecondsUntilRetry = 1; return socket; } catch (Exception e) { Interlocked.Increment(ref failednewsockets); Console.WriteLine("Error connecting to: " + endPoint.Address); //Mark endpoint as dead isEndPointDead = true; //Retry in 2 minutes deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry); if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry) { deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time } return null; } } /// <summary> /// Returns a socket to the pool. /// If the socket is dead, it will be destroyed. /// If there are more than MaxPoolSize sockets in the pool, it will be destroyed. /// If there are less than MinPoolSize sockets in the pool, it will always be put back. /// If there are something inbetween those values, the age of the socket is checked. /// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be /// put back in the pool. /// </summary> internal void Return(PooledSocket socket) { //If the socket is dead, destroy it. if (!socket.IsAlive) { Interlocked.Increment(ref deadsocketsonreturn); socket.Close(); } else { //Clean up socket if (socket.Reset()) { Interlocked.Increment(ref dirtysocketsonreturn); } //Check pool size. if (queue.Count >= owner.MaxPoolSize) { //If the pool is full, destroy the socket. socket.Close(); } else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge) { //If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it. socket.Close(); } else { //Put the socket back in the pool. lock (queue) { queue.Enqueue(socket); } } } } } }
Multi Receive
/* C# Network Programming by Richard Blum Publisher: Sybex ISBN: 0782141765 */ using System; using System.Net; using System.Net.Sockets; using System.Text; public class MultiRecv { public static void Main() { Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); Console.WriteLine("Ready to receive..."); IPEndPoint iep = new IPEndPoint(IPAddress.Any, 9050); EndPoint ep = (EndPoint)iep; sock.Bind(iep); sock.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(IPAddress.Parse("224.100.0.1"))); byte[] data = new byte[1024]; int recv = sock.ReceiveFrom(data, ref ep); string stringData = Encoding.ASCII.GetString(data, 0, recv); Console.WriteLine("received: {0} from: {1}", stringData, ep.ToString()); sock.Close(); } }
Multi Send
using System; using System.Net; using System.Net.Sockets; using System.Text; public class MultiSend { public static void Main() { Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); IPEndPoint iep = new IPEndPoint(IPAddress.Parse("224.100.0.1"), 9050); byte[] data = Encoding.ASCII.GetBytes("This is a test message"); server.SendTo(data, iep); server.Close(); } }
New Multi Send
using System; using System.Net; using System.Net.Sockets; using System.Text; public class NewMultiSend { public static void Main() { Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); IPEndPoint iep = new IPEndPoint(IPAddress.Any, 9051); IPEndPoint iep2 = new IPEndPoint(IPAddress.Parse("224.100.0.1"), 9050); server.Bind(iep); byte[] data = Encoding.ASCII.GetBytes("This is a test message"); server.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, new MulticastOption(IPAddress.Parse("224.100.0.1"))); server.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 50); server.SendTo(data, iep2); server.Close(); } }
Creating Socket Connections
/* * C# Programmers Pocket Consultant * Author: Gregory S. MacBeth * Email: gmacbeth@comporium.net * Create Date: June 27, 2003 * Last Modified Date: * Version: 1 */ using System; using System.Net; using System.Net.Sockets; using System.Text; namespace Client.Chapter_14___Networking_and_WWW { public class CreatingSocketConnections { [STAThread] static void Main(string[] args) { TcpClient MyClient = new TcpClient(); MyClient.Connect("http://www.kutayzorlu.com/java2s/com", 10000); NetworkStream MyNetStream = MyClient.GetStream(); if(MyNetStream.CanWrite && MyNetStream.CanRead) { // Does a simple write. Byte[] sendBytes = Encoding.ASCII.GetBytes("Is anybody there"); MyNetStream.Write(sendBytes, 0, sendBytes.Length); // Reads the NetworkStream into a byte buffer. byte[] bytes = new byte[MyClient.ReceiveBufferSize]; MyNetStream.Read(bytes, 0, (int) MyClient.ReceiveBufferSize); // Returns the data received from the host to the console. string returndata = Encoding.ASCII.GetString(bytes); Console.WriteLine("This is what the host returned to you: " + returndata); } else if (!MyNetStream.CanRead) { Console.WriteLine("You can not write data to this stream"); MyClient.Close(); } else if (!MyNetStream.CanWrite) { Console.WriteLine("You can not read data from this stream"); MyClient.Close(); } } } }
Socket property
using System; using System.Net; using System.Net.Sockets; public class SockProp { public static void Main() { IPAddress ia = IPAddress.Parse("127.0.0.1"); IPEndPoint ie = new IPEndPoint(ia, 8000); Socket test = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); Console.WriteLine("AddressFamily: {0}",test.AddressFamily); Console.WriteLine("SocketType: {0}",test.SocketType); Console.WriteLine("ProtocolType: {0}",test.ProtocolType); Console.WriteLine("Blocking: {0}", test.Blocking); test.Blocking = false; Console.WriteLine("new Blocking: {0}",test.Blocking); Console.WriteLine("Connected: {0}", test.Connected); test.Bind(ie); IPEndPoint iep = (IPEndPoint)test.LocalEndPoint; Console.WriteLine("Local EndPoint: {0}",iep.ToString()); test.Close(); } }