/* C# Network Programming by Richard Blum Publisher: Sybex ISBN: 0782141765 */ using System; using System.Net; using System.Net.Sockets; using System.Text; public class NetworkStreamTcpClient { public static void Main() { byte[] data = new byte[1024]; string input, stringData; int recv; IPEndPoint ipep = new IPEndPoint( IPAddress.Parse("127.0.0.1"), 9050); Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); try { server.Connect(ipep); } catch (SocketException e) { Console.WriteLine("Unable to connect to server."); Console.WriteLine(e.ToString()); return; } NetworkStream ns = new NetworkStream(server); if (ns.CanRead) { recv = ns.Read(data, 0, data.Length); stringData = Encoding.ASCII.GetString(data, 0, recv); Console.WriteLine(stringData); } else { Console.WriteLine("Error: Can't read from this socket"); ns.Close(); server.Close(); return; } while(true) { input = Console.ReadLine(); if (input == "exit") break; if (ns.CanWrite) { ns.Write(Encoding.ASCII.GetBytes(input), 0, input.Length); ns.Flush(); } recv = ns.Read(data, 0, data.Length); stringData = Encoding.ASCII.GetString(data, 0, recv); Console.WriteLine(stringData); } Console.WriteLine("Disconnecting from server..."); ns.Close(); server.Shutdown(SocketShutdown.Both); server.Close(); } }
C# Network
Uses a TcpClient to handle HTTP
using System; using System.Text; using System.IO; using System.Net.Sockets; public class TryTcp { public static void Main(String [] args) { TcpClient client = new TcpClient("www.kutayzorlu.com/java2s/com", 80); NetworkStream stream = client.GetStream(); byte[] send = Encoding.ASCII.GetBytes("GET HTTP/1.0 "); stream.Write(send, 0, send.Length); byte[] bytes = new byte[client.ReceiveBufferSize]; int count = stream.Read(bytes, 0, (int)client.ReceiveBufferSize); String data = Encoding.ASCII.GetString(bytes); char[] unused = {(char)data[count]}; Console.WriteLine(data.TrimEnd(unused)); stream.Close(); client.Close(); } }
Cisco Router
/* C# Network Programming by Richard Blum Publisher: Sybex ISBN: 0782141765 */ using System.Net; using System.Net.Sockets; using System; using System.Drawing; using System.IO; using System.Threading; using System.Windows.Forms; public class CiscoRouter : Form { private TextBox host; private TextBox community; private ListBox results; private Thread monitor; private FileStream fs; private StreamWriter sw; public CiscoRouter() { Text = "Cisco Router Utilization"; Size = new Size(400, 380); Label label1 = new Label(); label1.Parent = this; label1.Text = "Host:"; label1.AutoSize = true; label1.Location = new Point(10, 30); host = new TextBox(); host.Parent = this; host.Size = new Size(170, 2 * Font.Height); host.Location = new Point(40, 27); Label label2 = new Label(); label2.Parent = this; label2.Text = "Community:"; label2.AutoSize = true; label2.Location = new Point(10, 60); community = new TextBox(); community.Parent = this; community.Size = new Size(170, 2 * Font.Height); community.Location = new Point(75, 57); results = new ListBox(); results.Parent = this; results.Location = new Point(10, 85); results.Size = new Size(360, 18 * Font.Height); Button start = new Button(); start.Parent = this; start.Text = "Start"; start.Location = new Point(250, 52); start.Size = new Size(5 * Font.Height, 2 * Font.Height); start.Click += new EventHandler(ButtonStartOnClick); Button stop = new Button(); stop.Parent = this; stop.Text = "Stop"; stop.Location = new Point(320, 52); stop.Size = new Size(5 * Font.Height, 2 * Font.Height); stop.Click += new EventHandler(ButtonStopOnClick); } void ButtonStartOnClick(Object obj, EventArgs ea) { monitor = new Thread(new ThreadStart(checkRouter)); monitor.IsBackground = true; monitor.Start(); } void ButtonStopOnClick(Object obj, EventArgs ea) { monitor.Abort(); sw.Close(); fs.Close(); } void checkRouter() { int commlength, miblength, datastart, cpuUtil; SNMP conn = new SNMP(); byte[] response = new byte[1024]; DateTime time; string logFile = "routerlog.txt"; fs = new FileStream(logFile, FileMode.OpenOrCreate, FileAccess.ReadWrite); sw = new StreamWriter(fs); while (true) { response = conn.get("get", host.Text, community.Text, "1.3.6.1.4.1.9.2.1.58.0"); if (response[0] == 0xff) { results.Items.Add("No reponse from host"); sw.WriteLine("No response from host"); sw.Flush(); break; } commlength = Convert.ToInt16(response[6]); miblength = Convert.ToInt16(response[23 + commlength]); datastart = 26 + commlength + miblength; cpuUtil = Convert.ToInt16(response[datastart]); time = DateTime.Now; results.Items.Add(time + " CPU Utilization: " + cpuUtil + "%"); sw.WriteLine("{0} CPU Utilization: {1}%", time, cpuUtil); sw.Flush(); Thread.Sleep(5 * 60000); } } public static void Main() { Application.Run(new CiscoRouter()); } } class SNMP { public SNMP() { } public byte[] get(string request, string host, string community, string mibstring) { byte[] packet = new byte[1024]; byte[] mib = new byte[1024]; int snmplen; int comlen = community.Length; string[] mibvals = mibstring.Split('.'); int miblen = mibvals.Length; int cnt = 0, temp, i; int orgmiblen = miblen; int pos = 0; // Convert the string MIB into a byte array of integer values // Unfortunately, values over 128 require multiple bytes // which also increases the MIB length for (i = 0; i < orgmiblen; i++) { temp = Convert.ToInt16(mibvals[i]); if (temp > 127) { mib[cnt] = Convert.ToByte(128 + (temp / 128)); mib[cnt + 1] = Convert.ToByte(temp - ((temp / 128) * 128)); cnt += 2; miblen++; } else { mib[cnt] = Convert.ToByte(temp); cnt++; } } snmplen = 29 + comlen + miblen - 1; //Length of entire SNMP packet //The SNMP sequence start packet[pos++] = 0x30; //Sequence start packet[pos++] = Convert.ToByte(snmplen - 2); //sequence size //SNMP version packet[pos++] = 0x02; //Integer type packet[pos++] = 0x01; //length packet[pos++] = 0x00; //SNMP version 1 //Community name packet[pos++] = 0x04; // String type packet[pos++] = Convert.ToByte(comlen); //length //Convert community name to byte array byte[] data = Encoding.ASCII.GetBytes(community); for (i = 0; i < data.Length; i++) { packet[pos++] = data[i]; } //Add GetRequest or GetNextRequest value if (request == "get") packet[pos++] = 0xA0; else packet[pos++] = 0xA1; packet[pos++] = Convert.ToByte(20 + miblen - 1); //Size of total MIB //Request ID packet[pos++] = 0x02; //Integer type packet[pos++] = 0x04; //length packet[pos++] = 0x00; //SNMP request ID packet[pos++] = 0x00; packet[pos++] = 0x00; packet[pos++] = 0x01; //Error status packet[pos++] = 0x02; //Integer type packet[pos++] = 0x01; //length packet[pos++] = 0x00; //SNMP error status //Error index packet[pos++] = 0x02; //Integer type packet[pos++] = 0x01; //length packet[pos++] = 0x00; //SNMP error index //Start of variable bindings packet[pos++] = 0x30; //Start of variable bindings sequence packet[pos++] = Convert.ToByte(6 + miblen - 1); // Size of variable binding packet[pos++] = 0x30; //Start of first variable bindings sequence packet[pos++] = Convert.ToByte(6 + miblen - 1 - 2); // size packet[pos++] = 0x06; //Object type packet[pos++] = Convert.ToByte(miblen - 1); //length //Start of MIB packet[pos++] = 0x2b; //Place MIB array in packet for(i = 2; i < miblen; i++) packet[pos++] = Convert.ToByte(mib[i]); packet[pos++] = 0x05; //Null object value packet[pos++] = 0x00; //Null //Send packet to destination Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); sock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 5000); IPHostEntry ihe = Dns.Resolve(host); IPEndPoint iep = new IPEndPoint(ihe.AddressList[0], 161); EndPoint ep = (EndPoint)iep; sock.SendTo(packet, snmplen, SocketFlags.None, iep); //Receive response from packet try { int recv = sock.ReceiveFrom(packet, ref ep); } catch (SocketException) { packet[0] = 0xff; } return packet; } public string getnextMIB(byte[] mibin) { string output = "1.3"; int commlength = mibin[6]; int mibstart = 6 + commlength + 17; //find the start of the mib section //The MIB length is the length defined in the SNMP packet // minus 1 to remove the ending .0, which is not used int miblength = mibin[mibstart] - 1; mibstart += 2; //skip over the length and 0x2b values int mibvalue; for(int i = mibstart; i < mibstart + miblength; i++) { mibvalue = Convert.ToInt16(mibin[i]); if (mibvalue > 128) { mibvalue = (mibvalue/128)*128 + Convert.ToInt16(mibin[i+1]); i++; } output += "." + mibvalue; } return output; } }
Trace Route
/* C# Network Programming by Richard Blum Publisher: Sybex ISBN: 0782141765 */ using System; using System.Net; using System.Net.Sockets; using System.Text; public class TraceRoute { public static void Main(string[] argv) { byte[] data = new byte[1024]; int recv, timestart, timestop; Socket host = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp); IPHostEntry iphe = Dns.Resolve(argv[0]); IPEndPoint iep = new IPEndPoint(iphe.AddressList[0], 0); EndPoint ep = (EndPoint)iep; ICMP packet = new ICMP(); packet.Type = 0x08; packet.Code = 0x00; packet.Checksum = 0; Buffer.BlockCopy(BitConverter.GetBytes(1), 0, packet.Message, 0, 2); Buffer.BlockCopy(BitConverter.GetBytes(1), 0, packet.Message, 2, 2); data = Encoding.ASCII.GetBytes("test packet"); Buffer.BlockCopy(data, 0, packet.Message, 4, data.Length); packet.MessageSize = data.Length + 4; int packetsize = packet.MessageSize + 4; UInt16 chcksum = packet.getChecksum(); packet.Checksum = chcksum; host.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, 3000); int badcount = 0; for (int i = 1; i < 50; i++) { host.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.IpTimeToLive, i); timestart = Environment.TickCount; host.SendTo(packet.getBytes(), packetsize, SocketFlags.None, iep); try { data = new byte[1024]; recv = host.ReceiveFrom(data, ref ep); timestop = Environment.TickCount; ICMP response = new ICMP(data, recv); if (response.Type == 11) Console.WriteLine("hop {0}: response from {1}, {2}ms", i, ep.ToString(), timestop-timestart); if (response.Type == 0) { Console.WriteLine("{0} reached in {1} hops, {2}ms.", ep.ToString(), i, timestop-timestart); break; } badcount = 0; } catch (SocketException) { Console.WriteLine("hop {0}: No response from remote host", i); badcount++; if (badcount == 5) { Console.WriteLine("Unable to contact remote host"); break; } } } host.Close(); } } class ICMP { public byte Type; public byte Code; public UInt16 Checksum; public int MessageSize; public byte[] Message = new byte[1024]; public ICMP() { } public ICMP(byte[] data, int size) { Type = data[20]; Code = data[21]; Checksum = BitConverter.ToUInt16(data, 22); MessageSize = size - 24; Buffer.BlockCopy(data, 24, Message, 0, MessageSize); } public byte[] getBytes() { byte[] data = new byte[MessageSize + 9]; Buffer.BlockCopy(BitConverter.GetBytes(Type), 0, data, 0, 1); Buffer.BlockCopy(BitConverter.GetBytes(Code), 0, data, 1, 1); Buffer.BlockCopy(BitConverter.GetBytes(Checksum), 0, data, 2, 2); Buffer.BlockCopy(Message, 0, data, 4, MessageSize); return data; } public UInt16 getChecksum() { UInt32 chcksm = 0; byte[] data = getBytes(); int packetsize = MessageSize + 8; int index = 0; while ( index < packetsize) { chcksm += Convert.ToUInt32(BitConverter.ToUInt16(data, index)); index += 2; } chcksm = (chcksm >> 16) + (chcksm & 0xffff); chcksm += (chcksm >> 16); return (UInt16)(~chcksm); } }
Listening For Sockets
/* * C# Programmers Pocket Consultant * Author: Gregory S. MacBeth * Email: gmacbeth@comporium.net * Create Date: June 27, 2003 * Last Modified Date: * Version: 1 */ using System; using System.Net; using System.Text; using System.Net.Sockets; namespace Client.Chapter_14___Networking_and_WWW { public class ListeningForSockets { [STAThread] static void Main(string[] args) { int PortNumber = 10000; TcpListener MyListener = new TcpListener(PortNumber); MyListener.Start(); //Console.WriteLine("Waiting For Connection"); TcpClient MyClient = MyListener.AcceptTcpClient(); Console.WriteLine("Connection Accepted"); NetworkStream MyNetStream = MyClient.GetStream(); String Response = "Connection Has been accepted"; Byte[] SendTheseBytes = Encoding.ASCII.GetBytes(Response); MyNetStream.Write(SendTheseBytes, 0, SendTheseBytes.Length); MyClient.Close(); MyListener.Stop(); } } }
SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for acquiring or returning PooledSockets.
//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen //Permission is hereby granted, free of charge, to any person //obtaining a copy of this software and associated documentation //files (the "Software"), to deal in the Software without //restriction, including without limitation the rights to use, //copy, modify, merge, publish, distribute, sublicense, and/or sell //copies of the Software, and to permit persons to whom the //Software is furnished to do so, subject to the following //conditions: //The above copyright notice and this permission notice shall be //included in all copies or substantial portions of the Software. //THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, //EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES //OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND //NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT //HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, //WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING //FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR //OTHER DEALINGS IN THE SOFTWARE. using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Net; using System.Text; using System.Net.Sockets; using System.Threading; namespace Apollo.Common.Cache { /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal delegate T UseSocket<T>(PooledSocket socket); internal delegate void UseSocket(PooledSocket socket); /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal class PooledSocket : IDisposable { private SocketPool socketPool; private Socket socket; private Stream stream; public readonly DateTime Created; public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout) { this.socketPool = socketPool; Created = DateTime.Now; //Set up the socket. socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout); socket.ReceiveTimeout = sendReceiveTimeout; socket.SendTimeout = sendReceiveTimeout; //Do not use Nagle's Algorithm socket.NoDelay = true; //Establish connection socket.Connect(endPoint); //Wraps two layers of streams around the socket for communication. stream = new BufferedStream(new NetworkStream(socket, false)); } /// <summary> /// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool. /// </summary> public void Dispose() { socketPool.Return(this); } /// <summary> /// This method closes the underlying stream and socket. /// </summary> public void Close() { if (stream != null) { try { stream.Close(); } catch (Exception e) { Console.WriteLine("Error closing stream: " + socketPool.Host); } stream = null; } if (socket != null) { try { socket.Shutdown(SocketShutdown.Both); } catch (Exception e) { Console.WriteLine("Error shutting down socket: " + socketPool.Host); } try { socket.Close(); } catch (Exception e) { Console.WriteLine("Error closing socket: " + socketPool.Host); } socket = null; } } /// <summary> /// Checks if the underlying socket and stream is connected and available. /// </summary> public bool IsAlive { get { return socket != null && socket.Connected && stream.CanRead; } } /// <summary> /// Writes a string to the socket encoded in UTF8 format. /// </summary> public void Write(string str) { Write(Encoding.UTF8.GetBytes(str)); } /// <summary> /// Writes an array of bytes to the socket and flushes the stream. /// </summary> public void Write(byte[] bytes) { stream.Write(bytes, 0, bytes.Length); stream.Flush(); } /// <summary> /// Reads from the socket until the sequence ' ' is encountered, /// and returns everything up to but not including that sequence as a UTF8-encoded string /// </summary> public string ReadLine() { MemoryStream buffer = new MemoryStream(); int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { buffer.WriteByte(13); gotReturn = false; } } if (b == 13) { gotReturn = true; } else { buffer.WriteByte((byte)b); } } return Encoding.UTF8.GetString(buffer.GetBuffer()); } /// <summary> /// Reads a response line from the socket, checks for general memcached errors, and returns the line. /// If an error is encountered, this method will throw an exception. /// </summary> public string ReadResponse() { string response = ReadLine(); if (String.IsNullOrEmpty(response)) { throw new Exception("Received empty response."); } if (response.StartsWith("ERROR") || response.StartsWith("CLIENT_ERROR") || response.StartsWith("SERVER_ERROR")) { throw new Exception("Server returned " + response); } return response; } /// <summary> /// Fills the given byte array with data from the socket. /// </summary> public void Read(byte[] bytes) { if (bytes == null) { return; } int readBytes = 0; while (readBytes < bytes.Length) { readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes)); } } /// <summary> /// Reads from the socket until the sequence ' ' is encountered. /// </summary> public void SkipUntilEndOfLine() { int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { gotReturn = false; } } if (b == 13) { gotReturn = true; } } } /// <summary> /// Resets this PooledSocket by making sure the incoming buffer of the socket is empty. /// If there was any leftover data, this method return true. /// </summary> public bool Reset() { if (socket.Available > 0) { byte[] b = new byte[socket.Available]; Read(b); return true; } return false; } } /// <summary> /// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects. /// This class contains the server-selection logic, and contains methods for executing a block of code on /// a socket from the server corresponding to a given key. /// </summary> internal class ServerPool { //Expose the socket pools. private SocketPool[] hostList; internal SocketPool[] HostList { get { return hostList; } } private Dictionary<uint, SocketPool> hostDictionary; private uint[] hostKeys; //Internal configuration properties private int sendReceiveTimeout = 2000; private uint maxPoolSize = 10; private uint minPoolSize = 5; private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30); internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } } internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } } internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } } internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } } /// <summary> /// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools. /// </summary> internal ServerPool(string[] hosts) { hostDictionary = new Dictionary<uint, SocketPool>(); List<SocketPool> pools = new List<SocketPool>(); List<uint> keys = new List<uint>(); foreach (string host in hosts) { //Create pool SocketPool pool = new SocketPool(this, host.Trim()); //Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys. for (int i = 0; i < 250; i++) { uint key = (uint)i; if (!hostDictionary.ContainsKey(key)) { hostDictionary[key] = pool; keys.Add(key); } } pools.Add(pool); } //Hostlist should contain the list of all pools that has been created. hostList = pools.ToArray(); //Hostkeys should contain the list of all key for all pools that have been created. //This array forms the server key continuum that we use to lookup which server a //given item key hash should be assigned to. keys.Sort(); hostKeys = keys.ToArray(); } /// <summary> /// Given an item key hash, this method returns the serverpool which is closest on the server key continuum. /// </summary> internal SocketPool GetSocketPool(uint hash) { //Quick return if we only have one host. if (hostList.Length == 1) { return hostList[0]; } //New "ketama" host selection. int i = Array.BinarySearch(hostKeys, hash); //If not exact match... if (i < 0) { //Get the index of the first item bigger than the one searched for. i = ~i; //If i is bigger than the last index, it was bigger than the last item = use the first item. if (i >= hostKeys.Length) { i = 0; } } return hostDictionary[hostKeys[i]]; } internal SocketPool GetSocketPool(string host) { return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; }); } /// <summary> /// This method executes the given delegate on a socket from the server that corresponds to the given hash. /// If anything causes an error, the given defaultValue will be returned instead. /// This method takes care of disposing the socket properly once the delegate has executed. /// </summary> internal T Execute<T>(uint hash, T defaultValue, UseSocket<T> use) { return Execute(GetSocketPool(hash), defaultValue, use); } internal T Execute<T>(SocketPool pool, T defaultValue, UseSocket<T> use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { return use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute<T>: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } return defaultValue; } internal void Execute(SocketPool pool, UseSocket use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } } /// <summary> /// This method executes the given delegate on all servers. /// </summary> internal void ExecuteAll(UseSocket use) { foreach (SocketPool socketPool in hostList) { Execute(socketPool, use); } } } /// <summary> /// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for /// acquiring or returning PooledSockets. /// </summary> [DebuggerDisplay("[ Host: {Host} ]")] internal class SocketPool { /// <summary> /// If the host stops responding, we mark it as dead for this amount of seconds, /// and we double this for each consecutive failed retry. If the host comes alive /// again, we reset this to 1 again. /// </summary> private int deadEndPointSecondsUntilRetry = 1; private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes private ServerPool owner; private IPEndPoint endPoint; private Queue<PooledSocket> queue; //Debug variables and properties private int newsockets = 0; private int failednewsockets = 0; private int reusedsockets = 0; private int deadsocketsinpool = 0; private int deadsocketsonreturn = 0; private int dirtysocketsonreturn = 0; private int acquired = 0; public int NewSockets { get { return newsockets; } } public int FailedNewSockets { get { return failednewsockets; } } public int ReusedSockets { get { return reusedsockets; } } public int DeadSocketsInPool { get { return deadsocketsinpool; } } public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } } public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } } public int Acquired { get { return acquired; } } public int Poolsize { get { return queue.Count; } } //Public variables and properties public readonly string Host; private bool isEndPointDead = false; public bool IsEndPointDead { get { return isEndPointDead; } } private DateTime deadEndPointRetryTime; public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } } internal SocketPool(ServerPool owner, string host) { Host = host; this.owner = owner; endPoint = getEndPoint(host); queue = new Queue<PooledSocket>(); } /// <summary> /// This method parses the given string into an IPEndPoint. /// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception. /// </summary> private static IPEndPoint getEndPoint(string host) { //Parse port, default to 11211. int port = 11211; if (host.Contains(":")) { string[] split = host.Split(new char[] { ':' }); if (!Int32.TryParse(split[1], out port)) { throw new ArgumentException("Unable to parse host: " + host); } host = split[0]; } //Parse host string. IPAddress address; if (IPAddress.TryParse(host, out address)) { //host string successfully resolved as an IP address. } else { //See if we can resolve it as a hostname try { address = Dns.GetHostEntry(host).AddressList[0]; } catch (Exception e) { Console.WriteLine("Unable to resolve host: " + host); return null; } } return new IPEndPoint(address, port); } /// <summary> /// Gets a socket from the pool. /// If there are no free sockets, a new one will be created. If something goes /// wrong while creating the new socket, this pool's endpoint will be marked as dead /// and all subsequent calls to this method will return null until the retry interval /// has passed. /// </summary> internal PooledSocket Acquire() { //Do we have free sockets in the pool? //if so - return the first working one. //if not - create a new one. Interlocked.Increment(ref acquired); lock (queue) { while (queue.Count > 0) { PooledSocket socket = queue.Dequeue(); if (socket != null && socket.IsAlive) { Interlocked.Increment(ref reusedsockets); return socket; } Interlocked.Increment(ref deadsocketsinpool); } } Interlocked.Increment(ref newsockets); //If we know the endpoint is dead, check if it is time for a retry, otherwise return null. if (isEndPointDead) { if (DateTime.Now > deadEndPointRetryTime) { //Retry isEndPointDead = false; } else { //Still dead return null; } } //Try to create a new socket. On failure, mark endpoint as dead and return null. try { PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout); //Reset retry timer on success. deadEndPointSecondsUntilRetry = 1; return socket; } catch (Exception e) { Interlocked.Increment(ref failednewsockets); Console.WriteLine("Error connecting to: " + endPoint.Address); //Mark endpoint as dead isEndPointDead = true; //Retry in 2 minutes deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry); if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry) { deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time } return null; } } /// <summary> /// Returns a socket to the pool. /// If the socket is dead, it will be destroyed. /// If there are more than MaxPoolSize sockets in the pool, it will be destroyed. /// If there are less than MinPoolSize sockets in the pool, it will always be put back. /// If there are something inbetween those values, the age of the socket is checked. /// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be /// put back in the pool. /// </summary> internal void Return(PooledSocket socket) { //If the socket is dead, destroy it. if (!socket.IsAlive) { Interlocked.Increment(ref deadsocketsonreturn); socket.Close(); } else { //Clean up socket if (socket.Reset()) { Interlocked.Increment(ref dirtysocketsonreturn); } //Check pool size. if (queue.Count >= owner.MaxPoolSize) { //If the pool is full, destroy the socket. socket.Close(); } else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge) { //If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it. socket.Close(); } else { //Put the socket back in the pool. lock (queue) { queue.Enqueue(socket); } } } } } }
Server Pool
//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen //Permission is hereby granted, free of charge, to any person //obtaining a copy of this software and associated documentation //files (the "Software"), to deal in the Software without //restriction, including without limitation the rights to use, //copy, modify, merge, publish, distribute, sublicense, and/or sell //copies of the Software, and to permit persons to whom the //Software is furnished to do so, subject to the following //conditions: //The above copyright notice and this permission notice shall be //included in all copies or substantial portions of the Software. //THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, //EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES //OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND //NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT //HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, //WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING //FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR //OTHER DEALINGS IN THE SOFTWARE. using System.Net.Sockets; using System.Net; using System.IO; using System; using System.Threading; using System.Collections.Generic; using System.Text; namespace Apollo.Common.Cache { /// <summary> /// The PooledSocket class encapsulates a socket connection to a specified memcached server. /// It contains a buffered stream for communication, and methods for sending and retrieving /// data from the memcached server, as well as general memcached error checking. /// </summary> internal class PooledSocket : IDisposable { private SocketPool socketPool; private Socket socket; private Stream stream; public readonly DateTime Created; public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout) { this.socketPool = socketPool; Created = DateTime.Now; //Set up the socket. socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout); socket.ReceiveTimeout = sendReceiveTimeout; socket.SendTimeout = sendReceiveTimeout; //Do not use Nagle's Algorithm socket.NoDelay = true; //Establish connection socket.Connect(endPoint); //Wraps two layers of streams around the socket for communication. stream = new BufferedStream(new NetworkStream(socket, false)); } /// <summary> /// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool. /// </summary> public void Dispose() { socketPool.Return(this); } /// <summary> /// This method closes the underlying stream and socket. /// </summary> public void Close() { if (stream != null) { try { stream.Close(); } catch (Exception e) { Console.WriteLine("Error closing stream: " + socketPool.Host); } stream = null; } if (socket != null) { try { socket.Shutdown(SocketShutdown.Both); } catch (Exception e) { Console.WriteLine("Error shutting down socket: " + socketPool.Host); } try { socket.Close(); } catch (Exception e) { Console.WriteLine("Error closing socket: " + socketPool.Host); } socket = null; } } /// <summary> /// Checks if the underlying socket and stream is connected and available. /// </summary> public bool IsAlive { get { return socket != null && socket.Connected && stream.CanRead; } } /// <summary> /// Writes a string to the socket encoded in UTF8 format. /// </summary> public void Write(string str) { Write(Encoding.UTF8.GetBytes(str)); } /// <summary> /// Writes an array of bytes to the socket and flushes the stream. /// </summary> public void Write(byte[] bytes) { stream.Write(bytes, 0, bytes.Length); stream.Flush(); } /// <summary> /// Reads from the socket until the sequence ' ' is encountered, /// and returns everything up to but not including that sequence as a UTF8-encoded string /// </summary> public string ReadLine() { MemoryStream buffer = new MemoryStream(); int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { buffer.WriteByte(13); gotReturn = false; } } if (b == 13) { gotReturn = true; } else { buffer.WriteByte((byte)b); } } return Encoding.UTF8.GetString(buffer.GetBuffer()); } /// <summary> /// Reads a response line from the socket, checks for general memcached errors, and returns the line. /// If an error is encountered, this method will throw an exception. /// </summary> public string ReadResponse() { string response = ReadLine(); if (String.IsNullOrEmpty(response)) { throw new Exception("Received empty response."); } if (response.StartsWith("ERROR") || response.StartsWith("CLIENT_ERROR") || response.StartsWith("SERVER_ERROR")) { throw new Exception("Server returned " + response); } return response; } /// <summary> /// Fills the given byte array with data from the socket. /// </summary> public void Read(byte[] bytes) { if (bytes == null) { return; } int readBytes = 0; while (readBytes < bytes.Length) { readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes)); } } /// <summary> /// Reads from the socket until the sequence ' ' is encountered. /// </summary> public void SkipUntilEndOfLine() { int b; bool gotReturn = false; while ((b = stream.ReadByte()) != -1) { if (gotReturn) { if (b == 10) { break; } else { gotReturn = false; } } if (b == 13) { gotReturn = true; } } } /// <summary> /// Resets this PooledSocket by making sure the incoming buffer of the socket is empty. /// If there was any leftover data, this method return true. /// </summary> public bool Reset() { if (socket.Available > 0) { byte[] b = new byte[socket.Available]; Read(b); return true; } return false; } } internal delegate T UseSocket<T>(PooledSocket socket); internal delegate void UseSocket(PooledSocket socket); /// <summary> /// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects. /// This class contains the server-selection logic, and contains methods for executing a block of code on /// a socket from the server corresponding to a given key. /// </summary> internal class ServerPool { //Expose the socket pools. private SocketPool[] hostList; internal SocketPool[] HostList { get { return hostList; } } private Dictionary<uint, SocketPool> hostDictionary; private uint[] hostKeys; //Internal configuration properties private int sendReceiveTimeout = 2000; private uint maxPoolSize = 10; private uint minPoolSize = 5; private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30); internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } } internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } } internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } } internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } } /// <summary> /// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools. /// </summary> internal ServerPool(string[] hosts) { hostDictionary = new Dictionary<uint, SocketPool>(); List<SocketPool> pools = new List<SocketPool>(); List<uint> keys = new List<uint>(); foreach (string host in hosts) { //Create pool SocketPool pool = new SocketPool(this, host.Trim()); //Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys. for (int i = 0; i < 250; i++) { uint key = (uint)i; if (!hostDictionary.ContainsKey(key)) { hostDictionary[key] = pool; keys.Add(key); } } pools.Add(pool); } //Hostlist should contain the list of all pools that has been created. hostList = pools.ToArray(); //Hostkeys should contain the list of all key for all pools that have been created. //This array forms the server key continuum that we use to lookup which server a //given item key hash should be assigned to. keys.Sort(); hostKeys = keys.ToArray(); } /// <summary> /// Given an item key hash, this method returns the serverpool which is closest on the server key continuum. /// </summary> internal SocketPool GetSocketPool(uint hash) { //Quick return if we only have one host. if (hostList.Length == 1) { return hostList[0]; } //New "ketama" host selection. int i = Array.BinarySearch(hostKeys, hash); //If not exact match... if (i < 0) { //Get the index of the first item bigger than the one searched for. i = ~i; //If i is bigger than the last index, it was bigger than the last item = use the first item. if (i >= hostKeys.Length) { i = 0; } } return hostDictionary[hostKeys[i]]; } internal SocketPool GetSocketPool(string host) { return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; }); } /// <summary> /// This method executes the given delegate on a socket from the server that corresponds to the given hash. /// If anything causes an error, the given defaultValue will be returned instead. /// This method takes care of disposing the socket properly once the delegate has executed. /// </summary> internal T Execute<T>(uint hash, T defaultValue, UseSocket<T> use) { return Execute(GetSocketPool(hash), defaultValue, use); } internal T Execute<T>(SocketPool pool, T defaultValue, UseSocket<T> use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { return use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute<T>: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } return defaultValue; } internal void Execute(SocketPool pool, UseSocket use) { PooledSocket sock = null; try { //Acquire a socket sock = pool.Acquire(); //Use the socket as a parameter to the delegate and return its result. if (sock != null) { use(sock); } } catch (Exception e) { Console.WriteLine("Error in Execute: " + pool.Host); //Socket is probably broken if (sock != null) { sock.Close(); } } finally { if (sock != null) { sock.Dispose(); } } } /// <summary> /// This method executes the given delegate on all servers. /// </summary> internal void ExecuteAll(UseSocket use) { foreach (SocketPool socketPool in hostList) { Execute(socketPool, use); } } } /// <summary> /// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for /// acquiring or returning PooledSockets. /// </summary> internal class SocketPool { /// <summary> /// If the host stops responding, we mark it as dead for this amount of seconds, /// and we double this for each consecutive failed retry. If the host comes alive /// again, we reset this to 1 again. /// </summary> private int deadEndPointSecondsUntilRetry = 1; private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes private ServerPool owner; private IPEndPoint endPoint; private Queue<PooledSocket> queue; //Debug variables and properties private int newsockets = 0; private int failednewsockets = 0; private int reusedsockets = 0; private int deadsocketsinpool = 0; private int deadsocketsonreturn = 0; private int dirtysocketsonreturn = 0; private int acquired = 0; public int NewSockets { get { return newsockets; } } public int FailedNewSockets { get { return failednewsockets; } } public int ReusedSockets { get { return reusedsockets; } } public int DeadSocketsInPool { get { return deadsocketsinpool; } } public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } } public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } } public int Acquired { get { return acquired; } } public int Poolsize { get { return queue.Count; } } //Public variables and properties public readonly string Host; private bool isEndPointDead = false; public bool IsEndPointDead { get { return isEndPointDead; } } private DateTime deadEndPointRetryTime; public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } } internal SocketPool(ServerPool owner, string host) { Host = host; this.owner = owner; endPoint = getEndPoint(host); queue = new Queue<PooledSocket>(); } /// <summary> /// This method parses the given string into an IPEndPoint. /// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception. /// </summary> private static IPEndPoint getEndPoint(string host) { //Parse port, default to 11211. int port = 11211; if (host.Contains(":")) { string[] split = host.Split(new char[] { ':' }); if (!Int32.TryParse(split[1], out port)) { throw new ArgumentException("Unable to parse host: " + host); } host = split[0]; } //Parse host string. IPAddress address; if (IPAddress.TryParse(host, out address)) { //host string successfully resolved as an IP address. } else { //See if we can resolve it as a hostname try { address = Dns.GetHostEntry(host).AddressList[0]; } catch (Exception e) { Console.WriteLine("Unable to resolve host: " + host); return null; } } return new IPEndPoint(address, port); } /// <summary> /// Gets a socket from the pool. /// If there are no free sockets, a new one will be created. If something goes /// wrong while creating the new socket, this pool's endpoint will be marked as dead /// and all subsequent calls to this method will return null until the retry interval /// has passed. /// </summary> internal PooledSocket Acquire() { //Do we have free sockets in the pool? //if so - return the first working one. //if not - create a new one. Interlocked.Increment(ref acquired); lock (queue) { while (queue.Count > 0) { PooledSocket socket = queue.Dequeue(); if (socket != null && socket.IsAlive) { Interlocked.Increment(ref reusedsockets); return socket; } Interlocked.Increment(ref deadsocketsinpool); } } Interlocked.Increment(ref newsockets); //If we know the endpoint is dead, check if it is time for a retry, otherwise return null. if (isEndPointDead) { if (DateTime.Now > deadEndPointRetryTime) { //Retry isEndPointDead = false; } else { //Still dead return null; } } //Try to create a new socket. On failure, mark endpoint as dead and return null. try { PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout); //Reset retry timer on success. deadEndPointSecondsUntilRetry = 1; return socket; } catch (Exception e) { Interlocked.Increment(ref failednewsockets); Console.WriteLine("Error connecting to: " + endPoint.Address); //Mark endpoint as dead isEndPointDead = true; //Retry in 2 minutes deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry); if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry) { deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time } return null; } } /// <summary> /// Returns a socket to the pool. /// If the socket is dead, it will be destroyed. /// If there are more than MaxPoolSize sockets in the pool, it will be destroyed. /// If there are less than MinPoolSize sockets in the pool, it will always be put back. /// If there are something inbetween those values, the age of the socket is checked. /// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be /// put back in the pool. /// </summary> internal void Return(PooledSocket socket) { //If the socket is dead, destroy it. if (!socket.IsAlive) { Interlocked.Increment(ref deadsocketsonreturn); socket.Close(); } else { //Clean up socket if (socket.Reset()) { Interlocked.Increment(ref dirtysocketsonreturn); } //Check pool size. if (queue.Count >= owner.MaxPoolSize) { //If the pool is full, destroy the socket. socket.Close(); } else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge) { //If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it. socket.Close(); } else { //Put the socket back in the pool. lock (queue) { queue.Enqueue(socket); } } } } } }