为了进一步加深对线程的操作,本文介绍了使用多线程实现拷贝文件的一个案例,网络上虽然有很多多线程拷贝的案例,但是都存在重大 bug。我们独辟蹊径,首先将一个文件分段映射到内存(mmap),随后将每一段映射的内存通知给线程,由线程去对每一段已经映射的内存进行复制。具体实现代码如下:


代码实现

#include <stdio.h>// printf
#include <pthread.h>// pthread
#include <sys/types.h>// fstat
#include <sys/stat.h>// fstat
#include <fcntl.h>// open
#include <sys/mman.h>// mmap
#include <sys/types.h>// ftruncate
#include <unistd.h>// sleep ..

// 5 个线程
#define THREAD_COUNT 5

// 用以线程之间传递参数
struct tag_mmap
{
// 要读取源文件的起始内存位置
void* r_mem;
// 要写入目标文件的起始内存位置
void* w_mem;
// 要复制的大小
unsigned long long m_size;
};

void* threadfunc(void* arg)
{
// 解析结构体内容
struct tag_mmap* recv = (struct tag_mmap*)arg;
// 读的内存起始地址
char* r = recv->r_mem;
// 写的内存起始地址
char* w = recv->w_mem;

#if 0
// debug
printf(“thread id = %x, read = %x, write = %x, size = %lld\n”,
(unsigned int)pthread_self(),
(unsigned int)r,
(unsigned int)w,
recv->m_size);
#endif

// 一个字节一个字节的赋值,如果直接 memcpy 呢?
for (int i = 0; i < recv->m_size; i++)
{
*w++ = *r++;
}

return (void*)0;
}

int main(int argc, char* argv[])
{
// 取源文件大小
int rfd;
struct stat buf;
rfd = open(argv[1], O_RDONLY);
fstat(rfd, &buf);
printf(“src file size = %d\n”, (unsigned int)buf.st_size);

// 创建被写的文件
int wfd;
// 创建被写文件,如果不存在则创建,并截断文件
wfd = open(argv[2], O_RDWR O_CREAT O_TRUNC, 0644);
// 拓展文件
lseek(wfd, buf.st_size - 1, SEEK_SET);
// 最后一个字节写入数据,这样才真正拓展
write(wfd, “\0”, 1);

// 取平均数
unsigned long long nCount = buf.st_size / THREAD_COUNT;
// 4096整数倍,用来分割文件
unsigned long long nSplit = nCount - (nCount % 4096);

if (nCount < 4096 && nSplit == 0)
{
// 如果文件除以线程数以后小于页面的整数倍,那么就直接整个文件都映射进去处理
pthread_t tid;
struct tag_mmap single;
// 读文件映射
single.r_mem = mmap(NULL, buf.st_size, PROT_READ, MAP_SHARED, rfd, 0);
if (single.r_mem == MAP_FAILED)
{
perror(“mmap read”);
}
// 写文件映射
single.w_mem = mmap(NULL, buf.st_size, PROT_WRITE, MAP_SHARED, wfd, 0);
if (single.w_mem == MAP_FAILED)
{
perror(“mmap write”);
}
// 映射的长度
single.m_size = buf.st_size;
// 一个线程处理复制动作即可
pthread_create(&tid, NULL, threadfunc, (void*)&single);

pthread_join(tid, NULL);
munmap(single.r_mem, buf.st_size);
munmap(single.w_mem, buf.st_size);
}
else
{

// 每个mmap后的地址和大小结构体数组
struct tag_mmap mm[THREAD_COUNT];
// 线程ID
pthread_t tid[THREAD_COUNT];
// 开始映射内存
for (int i = 0; i < THREAD_COUNT && nCount > 4096; i++)
{
// 最后一段所有整数倍映射后剩余的内存大小
if (i + 1 == THREAD_COUNT)
{
// 读内存映射
mm[i].r_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit);
if (mm[i].r_mem == MAP_FAILED)
{
perror(“mmap read last”);
}
// 写内存映射
mm[i].w_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit);
if (mm[i].w_mem == MAP_FAILED)
{
perror(“mmap write last”);
}
// 映射的大小保存到结构体
mm[i].m_size = buf.st_size - i * nSplit;
pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]);
// printf(“i = %d, %lld —– %lld mem addr = 0x%x\n”, i, i * nSplit, buf.st_size - i * nSplit, (unsigned int)mm[i].r_mem);
}
// 前几段符合4096整数倍的
else
{
// 读内存映射
mm[i].r_mem = mmap(NULL, nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit);
if (mm[i].r_mem == MAP_FAILED)
{
perror(“mmap read fast”);
}
// 写内存映射
mm[i].w_mem = mmap(NULL, nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit);
if (mm[i].w_mem == MAP_FAILED)
{
perror(“mmap write fast”);
}
// 映射的大小保存到结构体
mm[i].m_size = nSplit;
pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]);
// printf(“i = %d, %lld —– %lld, mem addr = 0x%x\n”, i, i * nSplit, nSplit, (unsigned int)mm[i].r_mem);
}
}

// 回收线程
for (int join = 0; join < THREAD_COUNT; join++)
{
pthread_join(tid[join], NULL);
}

// 取消内存映射
for (int un = 0; un < THREAD_COUNT; un++)
{
munmap(mm[un].r_mem, mm[un].m_size);
munmap(mm[un].w_mem, mm[un].m_size);
}
}
// 关闭文件描述符
close(rfd);
close(wfd);

printf(“finish…\n”);
return 0;
}