通用排序服务daemon,模型示例。
情景描述:
1. 需要排序的数据以 key=>value 的形式组织
2. 数据随时到达,尽可能的做到实时排序
3. 只需保留 Top n 的数据
典型应用:各种排行榜,搜索排行,点击排行等等
算法细节:
1. 数据使用 udp 包传送 (示例中还没有加上网络操作部分)
2. mmap 内存到文件。daemon 只负责排序部分的工作,输出排序结果由其他程序读取文件完成
3. 使用结构体数组存储 Top n 的已排序部分
4. 新数据到达时,使用插入排序
Todo list:
1. 增加网络接收部分
2. 增加网络输出 daemon
3. 使用链表存储 Top n ,方便插入排序。当需要输出时使用另外一个线程或子进程将链表拷贝到 mmap 内存区
4. 多 domain 测试
5. 支持 name 为字符串的情况,key 为 name 的 hash
6. 增量 value ,需要保存所有的数据,而不是 Top n
7. 配置文件支持
/**
* SinaSorter
*
* by tangfulin#126.com
*
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define MAXNAMELEN 31
#define MAXDOMAIN 31
#define DEBUG 1
typedef struct {
unsigned long key;
unsigned long value;
//char name[MAXNAMELEN];
} RECORD;
int sizer = sizeof(RECORD);
int domainTotal=1;
char domainName[MAXDOMAIN][MAXNAMELEN]={};
int totalRecord[MAXDOMAIN]={0};
RECORD *head[MAXDOMAIN]={NULL};
RECORD *tail[MAXDOMAIN]={NULL};
int file[MAXDOMAIN]={0};
int init()
{
FILE *fp;
RECORD *r;
int i;
if(DEBUG) printf("(init)RECORD size:%d\n", sizeof(RECORD));
domainTotal = 1;
for (i=0; i
{
if(DEBUG) printf("(init)domain:%d\n", i);
sprintf(domainName[i], "domain_%d.dat", i);
totalRecord[i] = 200;
head[i] = NULL;
if(DEBUG) printf("(init)file:%s\n", domainName[i]);
if(DEBUG) printf("(init)totalRecord:%d\n", totalRecord[i]);
//file init
fp = fopen(domainName[i], "r");
if (fp == NULL)
{// file not exist
if(DEBUG) printf("(init)file dose not exist, creat ... \n");
r = (RECORD *) calloc(totalRecord[i], sizeof(RECORD));
fp = fopen(domainName[i], "w+");
fwrite(r, sizeof(RECORD), totalRecord[i], fp);
fclose(fp);
free(r);
}
else
{
fclose(fp);
}
file[i] = open(domainName[i], O_RDWR);
head[i] = (RECORD *)mmap(0, totalRecord[i] * sizeof(RECORD),
PROT_READ | PROT_WRITE, MAP_SHARED, file[i], 0);
close(file[i]);
tail[i] = head[i] + totalRecord[i] - 1;
if(DEBUG) printf("(init)mmap at addr: %x end at: %x\n", (int)head[i], (int)tail[i]);
}
return 0;
}
int clear()
{
int i;
for (i=0; i
{
if (-1 == msync((void *)head[i], totalRecord[i] * sizeof(RECORD), MS_SYNC))
{
fprintf(stderr, "(clear)msync return error with errorno: %d (%s)\n", errno, strerror(errno));
}
if (-1 == munmap((void *)head[i], totalRecord[i] * sizeof(RECORD)))
{
fprintf(stderr, "(clear)munmap return error with errorno: %d (%s)\n", errno, strerror(errno));
}
//close(file[i]);
}
if(DEBUG) printf("(clear) done ... \n");
return 0;
}
int getRecord(RECORD* rp)
{
srand((int) time(NULL));
rp->key = random()%1000;
rp->value = random()%1000;
return 0;
}
RECORD* findPosition(const int domain, const int value)
{
RECORD *rp;
rp = head[domain];
while(rp->value > value)
{
rp += 1;
}
return rp;
}
RECORD* findRecord(const int domain, RECORD* const tpos, const int key)
{
RECORD *rp;
rp = tpos;
while (rp < tail[domain])
{
if (rp->key == key)
{
return rp;
}
else
{
rp += 1;
}
}
return rp;
}
int insertRecord(int domain, RECORD* rp)
{
RECORD *fpos, *tpos;
RECORD *endpos;
void *to, *from;
int size;
endpos = tail[domain];
if (rp->value < endpos->value)
{
if(DEBUG) printf("(insertRecord)value too small: %ld < %ld \n", rp->value, endpos->value);
return 0;
}
// target position for this RECORD
tpos = findPosition(domain, rp->value);
// origin position of this key, be totalRecord[i]-1 if not exist
// search from tpos
fpos = findRecord(domain, tpos, rp->key);
if (fpos == tpos)
{
if(DEBUG) printf("(insertRecord)fpos equal to tpos: %x \n", (int)fpos);
tpos->key = rp->key;
tpos->value = rp->value;
return 0;
}
if(DEBUG) printf("(insertRecord)target tpos: %x origin fpos: %x (endpos:%x)\n", (int)tpos, (int)fpos, (int)endpos );
to = (void *)(tpos+1);
from = (void *)tpos;
size = (int)((fpos - tpos)*sizeof(RECORD));
memmove(to, from, size);
if(DEBUG) printf("(insertRecord)memmove: from %x to: %x bytes:%d\n", (int)from, (int)to, size);
tpos->key = rp->key;
tpos->value = rp->value;
return 0;
}
int showdomain(int domain)
{
int j, ret;
RECORD *rp;
ret = msync((void *)head[domain], totalRecord[domain] * sizeof(RECORD), MS_SYNC);
printf("(showdomain)domain:%d\n", domain);
printf("(showdomain)msync return:%d\n", ret);
rp = head[domain];
for(j=0; j
{
printf("(showdomain)pos:%d %x key:%ld value:%ld\n", j, (int)rp, rp->key, rp->value);
rp += 1;
}
return 0;
}
int main()
{
RECORD rec;
init();
showdomain(0);
while(getchar() != EOF )
{
getRecord(&rec);
if(DEBUG) printf("\n(main)get RECORD: %ld %ld\n", rec.key, rec.value);
insertRecord(0, &rec);
showdomain(0);
}
clear();
return 0;
}
您还没有登录,请登录后继续操作。