Aug 11

        重叠I/O的基本原理是让应用程序使用重叠的数据结构,投递一个或多个I/O请求。

        在winsock中的重叠数据结构定义是WSAOVERLAPPED,用来完成重叠I/O的方式有两种,1事件通知 2完成实例

        在事件通知方式中,通过WSAOVERLAPPED结构将每个socket和事件关联在一起,通过调用WSARcev等非阻塞函数,将WSAOVERLAPPED结构体与其关联。调用WSAWaitForMultipleEvents函数等待事件,这个函数可以放到另外一个线程中进行处理,当数据到达后会存放到之前WSARcev中的buf中。

        编程顺序如下:

        1 创建一个套接字,开始在指定的端口上侦听

        2 接收一个入站的连接请求

        3 为接受的套接字新建一个WSAOVERLAPPED结构,并为该结构分配一个事件对象句柄。也将该事件对象分配给一个事件数组,以便稍后由WSAWaitForMultipleEvents函数使用。

        4 将WSAOVERLAPPED结构指定参数,在套接字上投递一个异步WSARecv请求。

        5 使用步骤3的数组,调用WSAWaitForMultipleEvents函数,并等待与重叠调用关联在一起的事件。

        6 使用WSAGetOverlappedResult函数,判断重叠调用的返回状态

        7 函数完成后,针对事件数组,调用WSAResetEvent函数,重设事件对象,并对完成的事件请求进行处理。

        8 在套接字上重新投递另一个WSARecv请求

        9 重复步骤5-8

 

#include <winsock2.h>
#include <stdio.h>

#define PORT    5500
#define MSGSIZE 1024

#pragma comment(lib, "ws2_32.lib")

typedef struct
{
	WSAOVERLAPPED overlap;
	WSABUF        Buffer;
	char          szMessage[MSGSIZE];
	DWORD         NumberOfBytesRecvd;
	DWORD         Flags;
}PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;

int                     g_iTotalConn = 0;
SOCKET                  g_CliSocketArr[MAXIMUM_WAIT_OBJECTS];
WSAEVENT                g_CliEventArr[MAXIMUM_WAIT_OBJECTS];
LPPER_IO_OPERATION_DATA g_pPerIODataArr[MAXIMUM_WAIT_OBJECTS];

DWORD WINAPI WorkerThread(LPVOID);
void Cleanup(int);

int main()
{
	WSADATA     wsaData;
	SOCKET      sListen, sClient;
	SOCKADDR_IN local, client;
	DWORD       dwThreadId;
	int         iaddrSize = sizeof(SOCKADDR_IN);

	// Initialize Windows Socket library
	WSAStartup(0x0202, &wsaData);

	// Create listening socket
	sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

	// Bind
	local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
	local.sin_family = AF_INET;
	local.sin_port = htons(PORT);
	bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));

	// Listen
	listen(sListen, 3);

	// Create worker thread
	CreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);

	while (TRUE)
	{
		// Accept a connection
		sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
		printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

		g_CliSocketArr[g_iTotalConn] = sClient;

		// Allocate a PER_IO_OPERATION_DATA structure
		g_pPerIODataArr[g_iTotalConn] = (LPPER_IO_OPERATION_DATA)HeapAlloc(
			GetProcessHeap(),
			HEAP_ZERO_MEMORY,
			sizeof(PER_IO_OPERATION_DATA));
		g_pPerIODataArr[g_iTotalConn]->Buffer.len = MSGSIZE;
		g_pPerIODataArr[g_iTotalConn]->Buffer.buf = g_pPerIODataArr[g_iTotalConn]->szMessage;
		g_CliEventArr[g_iTotalConn] = g_pPerIODataArr[g_iTotalConn]->overlap.hEvent = WSACreateEvent();

		// Launch an asynchronous operation
		WSARecv(
			g_CliSocketArr[g_iTotalConn],
			&g_pPerIODataArr[g_iTotalConn]->Buffer,
			1,
			&g_pPerIODataArr[g_iTotalConn]->NumberOfBytesRecvd,
			&g_pPerIODataArr[g_iTotalConn]->Flags,
			&g_pPerIODataArr[g_iTotalConn]->overlap,
			NULL);

		g_iTotalConn++;
	}

	closesocket(sListen);
	WSACleanup();
	return 0;
}

DWORD WINAPI WorkerThread(LPVOID lpParam)
{
	int   ret, index;
	DWORD cbTransferred;

	while (TRUE)
	{
		ret = WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);
		if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)
		{
			continue;
		}

		index = ret - WSA_WAIT_EVENT_0;
		WSAResetEvent(g_CliEventArr[index]);

		WSAGetOverlappedResult(
			g_CliSocketArr[index],
			&g_pPerIODataArr[index]->overlap,
			&cbTransferred,
			TRUE,
			&g_pPerIODataArr[g_iTotalConn]->Flags);

		if (cbTransferred == 0)
		{
			// The connection was closed by client
			Cleanup(index);
		}
		else
		{
			// g_pPerIODataArr[index]->szMessage contains the received data
			g_pPerIODataArr[index]->szMessage[cbTransferred] = '\0';
			send(g_CliSocketArr[index], g_pPerIODataArr[index]->szMessage,cbTransferred, 0);

			// Launch another asynchronous operation
			WSARecv(
				g_CliSocketArr[index],
				&g_pPerIODataArr[index]->Buffer,
				1,
				&g_pPerIODataArr[index]->NumberOfBytesRecvd,
				&g_pPerIODataArr[index]->Flags,
				&g_pPerIODataArr[index]->overlap,
				NULL);
		}
	}

	return 0;
}

void Cleanup(int index)
{
	closesocket(g_CliSocketArr[index]);
	WSACloseEvent(g_CliEventArr[index]);
	HeapFree(GetProcessHeap(), 0, g_pPerIODataArr[index]);

	if (index < g_iTotalConn - 1)
	{
		g_CliSocketArr[index] = g_CliSocketArr[g_iTotalConn - 1];
		g_CliEventArr[index] = g_CliEventArr[g_iTotalConn - 1];
		g_pPerIODataArr[index] = g_pPerIODataArr[g_iTotalConn - 1];
	}

	g_pPerIODataArr[--g_iTotalConn] = NULL;
}
Jul 4

         路由表式用来决定如何将包从一个子网送到另一个子网的,换句话说就是从一个网卡收到的包应该送到哪一个网卡上去

        路由表的每一行至少有目标网络号、netmask、到一个子网应该使用的网卡这3条信息。到一个网卡收到一个包时,他会扫描路由表的每一行,用里面的netmask和包里的目标IP做&逻辑运算,找到目标网络号。如果此网络号和这一行里的网络号相同,就将这条路由保留下来作为备用的路由。如果已经有备用路由了就将这两条备用路由的网络号最长的留下来,另一条丢掉。如此接着扫描下一行直到结束。如果扫描结束任没找到路由就用默认路由。确定路由后直接将包发送到对应的网卡上去。在具体的实现中,路由表可能包含更多的信息为选路由算法的细节所用。

Jun 9

 快速排序详细分析

 

注:REF[n]为参考资料,列于文章结尾。

看了编程珠玑Programming Perls第11章关于快速排序的讨论,发现自己长年用库函数,已经忘了快排怎么写。于是整理下思路和资料,把至今所了解的快排的方方面面记录与此。

 

纲要

  1. 算法描述
  2. 时间复杂度分析
  3. 具体实现细节
    1. 划分
      1. 选取枢纽元
        1. 固定位置
        2. 随机选取
        3. 三数取中
      2. 分割
        1. 单向扫描
        2. 双向扫描
        3. Hoare的双向扫描
        4. 改进的双向扫描
        5. 双向扫描的其他问题
    2. 分治
      1. 尾递归
  4. 参考文献

一、算法描述(Algorithm Description)

快速排序由C.A.R.Hoare于1962年提出,算法相当简单精炼,基本策略是随机分治。
首先选取一个枢纽元(pivot),然后将数据划分成左右两部分,左边的大于(或等于)枢纽元,右边的小于(或等于枢纽元),最后递归处理左右两部分。
分治算法一般分成三个部分:分解、解决以及合并。快排是就地排序,所以就不需要合并了。只需要划分(partition)和解决(递归)两个步骤。因为划分的结果决定递归的位置,所以Partition是整个算法的核心。

对数组S排序的形式化的描述如下(REF[1]):

  1. 如果S中的元素个数是0或1,则返回
  2. 取S中任意一元素v,称之为枢纽元
  3. 将S-{v}(S中其余元素),划分成两个不相交的集合:S1={x∈S-{v}|x<=v} 和 S2={x∈S-{v}|x>=v}
  4. 返回{quicksort(S1) , v , quicksort(S2)}

二、时间复杂度分析(Time Complexity)

快速排序最佳运行时间O(nlogn),最坏运行时间O(N^2),随机化以后期望运行时间O(nlogn),关于这些任何一本算法数据结构书上都有证明,就不写在这了,一下两点很重要:

  1. 选取枢纽元的不同, 决定了快排算法时间复杂度的数量级;
  2. 划分方法的划分方法总是O(n), 所以其具体实现的不同只影响算法时间复杂度的系数。

所以诉时间复杂度的分析都是围绕枢纽元的位置展开讨论的。

三、具体实现细节(Details of Implementaion)

1、划分(Partirion)

void QuickSort(T A[], int p, int q)
{
    if (p < q)
    {
        int q = Partition(A, p, q);
        QuickSort(A, p, q-1);
        QuickSort(A, q+1, r);
    }
}

划分又分成两个步骤:选取枢纽元按枢纽元将数组分成左右两部分

a.选取枢纽元(Pivot Selection)

固定位置

同样是为了方便,将选取枢纽元单独提出来成一个函数:select_pivot(T A[], int p, int q),该函数从A[p...q]中选取一个枢纽元并返回,且枢纽元放置在左端(A[p]的位置)。

int select_pivot(T A[], int p, int q)
{
    return A[p];
}

但是实际应用中,数据往往是部分有序的,如果仍用两端的元素最为枢纽元,则会产生很不好的划分,使算法退化成O(n^2)。所以要采用一些手段避免这种情况,我知道的有“随机选取法”和“三数取中法”。

随机选取
int select_pivot_random(T A[], int p, int q)
{
    int i = randInt(p, q);
    swap(A[p], A[i]);
    return A[p];
}

 

其中randInt(p, q)随机返回[p, q]中的一个数,C/C++里可由stdlib.h中的rand函数模拟。

三数取中
即取三个元素的中间数作为枢纽元,一般是取左端、右断和中间三个数,也可以随机选取。(REF[1])
int select_pivot_median3(T A[], int p, int q)
{
    int m = (p + q)/2;
    /* swap to ensure A[m] <= A[p] <= A[q] */
    if (A[p] < A[m]) swap(A[p], A[m]);
    if (A[q] < A[m]) swap(A[q], A[m]);
    if (A[q] < A[p]) swap(A[q], A[p]);
    return A[p]; 
}

b.按枢纽元将数组分成左右两部分

虽然说分割方法只影响算法时间复杂度的系数,但是一个好系数也是比较重要的。这也就是为什么实际应用中宁愿选择可能退化成O(n^2)的快速排序,也不用稳定的堆排序(堆排序交换次数太多,导致系数很大)。

常见的分割方法有三种:

单向扫描
int partition(T A[], int p, int q)
{
    int x = select_pivot(A, p, q);
    int m = p, j;
    for (int j = p+1; j <= q; ++j)
        /* invariant : A[p+1...m] < A[p] && A[m+1, i-1] >= x[q] */
        if (A[j] <= x)
            swap(A[++m], j) 
    swap(A[p], A[m]); 
    /* A[p...m-1] < A[m] <= A[m+1...u] */
    return m; 

 

顺便废话几句,在看国外的书的时候,发现老外在分析和测试算法尤其是循环时,非常重视不变量(invariant)的使用。确立一个不变量,在循环开始之前和结束之后检查这个不变量,是一个很好的保持算法正确性的手段。

事实上第一种算法需要的交换次数比较多,而且如果采用选取左端元素作为枢纽元的方法,该算法在输入数组中元素全部相同时退化成O(n^2)。第二种方法可以避免这个问题。

双向扫描
int partition(T A[], int p, int q)
{
    int x = select_pivot(A, p, q);
    int i = p, j = q + 1;
    for ( ; ; )
    {
        do ++i; while (i <= q && A[i] < x);
        do --j; while (A[j] > x);
        if (i > j) break;
        swap(A[i], A[j]);
    }
    swap(A[p], A[j]);
    return j;
}

双向扫描可以正常处理所有元素相同的情况,而且交换次数比单向扫描要少。

Hoare的双向扫描
int partition(T A[], int p, int q)
{
    int x = select_pivot(A, p, q);
    int i = p - 1, j = q + 1;
    for ( ; ; )
    {
        do --j; while (A[j] > x);
        do ++i; while (A[i] < x);
        if (i < j) swap(A[i], A[j])
        else return j;
    } 
}

需要注意的是,返回值j并不是枢纽元的位置,但是仍然保证了A[p..j] <= A[j+1...q]。这种方法在效率上于双向扫描差别甚微,只是代码相对更为紧凑,并且用A[p]做哨兵元素减少了内层循环的一个if测试。

改进的双向扫描

枢纽元保存在一个临时变量中,这样左端的位置可视为空闲。j从右向左扫描,直到A[j]小于等于枢纽元,检查i、j是否相交并将A[j]赋给空闲位置A[i],这时A[j]变成空闲位置;i从左向右扫描,直到A[i]大于等于枢纽元,检查i、j是否相交并将A[i]赋给空闲位置A[j],然后A[i]变成空闲位置。重复上述过程,最后直到i、j相交跳出循环。最后把枢纽元放到空闲位置上。

int partition(T A[], int p, int q)
{
    int x = select_pivot(A, p, q);
    int i = p, j = q;
    for ( ; ; )
    {
        while (i < j && A[j] > x) --j;
        A[i] = A[j];
        while (i < j && A[i] < x) ++i;
        A[j] = A[i];
    }
    A[i] = x;  // i == j
    return i;
} 

这种类似迭代的方法,每次只需一次赋值,减少了内存读写次数,而前面几种的方法一次交换需要三次赋值操作。由于没有哨兵元素,不得不在内层循环里判断i、j是否相交,实际上反而增加了很多内存读取操作。但是由于循环计数器往往被放在寄存器了,而如果待排数组很大,访问其元素会频繁的cache miss,所以用计数器的访问次数换取待排数组的访存是值得的。

关于双向扫描的几个问题

1.内层循环中的while测试是用“严格大于/小于”还是”大于等于/小于等于”。

一般的想法是用大于等于/小于等于,忽略与枢纽元相同的元素,这样可以减少不必要的交换,因为这些元素无论放在哪一边都是一样的。但是如果遇到所有元素都一样的情况,这种方法每次都会产生最坏的划分,也就是一边1个元素,令一边n-1个元素,使得时间复杂度变成O(N^2)。而如果用严格大于/小于,虽然两边指针每此只挪动1位,但是它们会在正中间相遇,产生一个最好的划分,时间复杂度为log(2,n)。

另一个因素是,如果将枢纽元放在数组两端,用严格大于/小于就可以将枢纽元作为一个哨兵元素,从而减少内层循环的一个测试。
由以上两点,内层循环中的while测试一般用“严格大于/小于”。

2.对于小数组特殊处理

按照上面的方法,递归会持续到分区只有一个元素。而事实上,当分割到一定大小后,继续分割的效率比插入排序要差。由统计方法得到的数值是50左右(REF[3]),也有采用20的(REF[1]), 这样原先的QuickSort就可以写成这样

void QuickSort(T A[], int p, int q)
{
    if (q - p > cutoff) //cutoff is constant 
    {
        int q = Partition(A, p, q);
        QuickSort(A, p, q-1);
        QuickSort(A, q+1, r);
    }
    else
        InsertionSort(T A[], p, q); //user insertion sort for small arrays
}

二、分治

分治这里看起来没什么可说的,就是一枢纽元为中心,左右递归,实际上也有一些技巧。

1.尾递归(Tail recursion)

 

快排算法和大多数分治排序算法一样,都有两次递归调用。但是快排与归并排序不同,归并的递归则在函数一开始, 快排的递归在函数尾部,这就使得快排代码可以实施尾递归优化。第一次递归以后,变量p就没有用处了, 也就是说第二次递归可以用迭代控制结构代替。虽然这种优化一般是有编译器实施,但是也可以人为的模拟:

void QuickSort(T A[], int p, int q)
{
    while (p < q)
    {
        int m = Partition(A, p, q);
        QuickSort(A, p, m-1);
        p = m + 1;
    } 
}

采用这种方法可以缩减堆栈深度,由原来的O(n)缩减为O(logn)。

 

三、参考文献:

  • [1]Mark Allen Weiss. Data Structures and Algorithms Analysis in C++. Pearson Education, Third Edition, 2006.
  • [2]Thomas H. Cormen, Charles E. Leiserson, Ronald L. Rivest, Clifford Stein. Introduction to Algorithms. MIT Press, 2001, Second Edtion, 2001.
  • [3]Jon Bently. Programming Pearls. Addison Wesley, Second Edition, 2000.

jackaldire.com/200908/quick-sort-analysis/

Jun 9

 随机函数的实现

        在编程过程中经常要用到随机数,通过调用库函数rand()来产生随机数。在调用rand()函数前一般要先调用 srand (unsigned int seed)产生随机种子。seed的选取一般可以采用系统时间来设置。产生随机数的范围在0~RAND_MAX。RAND_MAX定义在stdlib.h,其值为32767。

        假如我们要产生10个0~10之间的随机数,可以通过以下方式

 

#include<stdlib.h>
#include<stdio.h>
#include<time.h>

main()
{
    int i,j;
    srand((int)time(0));
    for(i=0;i<10;i++)
    {
        j=rand()%10;
        printf(" %d ",j);
    }
}

 

        关于srand(unsigned int seed)函数和rand()函数是库函数提供给我们的接口,也可以定义自己的函数来模拟这两个函数的实现

 

#include <time.h>
#include <stdlib.h>
#include <stdio.h>

unsigned long int next = 1;

void LJ_srand(unsigned int seed)
{
	next = seed;
}

int LJ_rand()
{
	next = next*1103515+12345;
	return (unsigned int)(next/65536)%32767;
}

main ()
{
	int i;
LJ_srand( time(NULL) );   
 for( i = 0; i < 10; i++ )      
	 printf( "Random number #%d: %d\n", i,
                 LJ_rand()%10);
}

 

rand()函数和srand()函数之间的通信是通过一个全局变量next来交互的,不调用srand()函数时,默认的next的值为1,那么每次产生的随机数是一样的。

参考c programing language

 

 

 

 

Jun 6

 

  计算大整数的模幂比如 $b^n\,mod\,m$时 $b^n$会非常大,直接去计算不是好的方法。

 

可以把n用二进制展开式表示 n = $\left(a_{k-1}a_{k-2}...a_1a_0\right)_2$

那么问题就变成了$b^n = b^{a_{k-1} \cdot 2^{k-1}+\cdots+a_1\cdot2+a_0} = b^{a_{k-1} \cdot 2^{k-1}}{\cdots}b^{a_1^2}{\cdot}b^{a_0}$

所以要计算$b^n$,就要先算出$b, b^2, (b^2)^2=b^4, (b^4)^2=b^8,\cdots, b^{2^{k-1}}$

再把这些当$a_j = 1$时的$b^{2^j}$的项乘起来。

根据公式$ab\,mood \,m = ((a\,mod\,m)(b\,mod\,m))mod\,m$

所以只需把$a_j = 1$的各项$b^{2^j}$ mod m乘起来再除以m取余就能得到结果了。

算法伪码如下:

procedure modular exponentiation(b: integer, n = $\left(a_{k-1}a_{k-2}...a_1a_0\right)_2$, m: positive integer)

x := 1

power := b mod m

for i := 0 to k-1

begin

          if $a_i$ = 1 then x := (x * power) mod m

          power := (power * power) mod m

end

 

 针对上面的算法来举个例子

 

计算a的b次方对9907取模的值。

输入

第一行有一个正整数T,表示有T组测试数据。

接下来T行,每行有一组测试数据,包含两个整数a和b。

其中T<10000,0<a,b<2^31。

样例输入

3

1 2

2 3

3 4

样例输出

1

8

81

 十进制转二进制函数

int DToB (unsigned long int decimal,int *binary)

{

  if(binary==NULL)

     return -1;

  int i = 0;

  while(decimal>0)

  {

     binary[i] = decimal%2;

      decimal = decimal/2;

     i++;

  }

  return i;

}

主函数代码如下:

 int main()

{

  int T,i;

  unsigned long int a,b;

  int res;

  int binary[32];

  scanf("%d",&T);

  if(T>100000||T<0)

     return -1;

  for (i=0;i<T;i++)

  {

    scanf("%lu %lu",&a,&b);

    if(a>=214783648||a<0)

       return -1;

    if(b>=214783648||b<0)

       return -1;


    res = DToB(b,binary);

    if(res==-1)

       return -1;

    int p = 1;

    for(;res>=0;res--)

    {

      p = (p*p)%9907;

      if(binary[res]==1)

      p = (p*a)%9907;

    }

    printf("%d\n",p);

  }

  return 0;

 }

这个例子中的算法在实现上和上面说的稍微有点出入,一个是从低位到高位计算幂取模,另外一个则是从高位到低位来计算

个人感觉还是从低到高的算法比较容易理解,但两种都收集下。