linux/unix 多线程拷贝文件示例

为了进一步加深对线程的操作,本文介绍了使用多线程实现拷贝文件的一个案例,网络上虽然有很多多线程拷贝的案例,但是都存在重大 bug。我们独辟蹊径,首先将一个文件分段映射到内存(mmap),随后将每一段映射的内存通知给线程,由线程去对每一段已经映射的内存进行复制。具体实现代码如下:


代码实现

#include <stdio.h>			// printf
#include <pthread.h>		// pthread
#include <sys/types.h>		// fstat
#include <sys/stat.h>		// fstat
#include <fcntl.h>			// open
#include <sys/mman.h>		// mmap
#include <sys/types.h>		// ftruncate
#include <unistd.h>			// sleep ..

// 5 个线程
#define THREAD_COUNT 5

// 用以线程之间传递参数
struct tag_mmap
{
	// 要读取源文件的起始内存位置
	void* r_mem;
	// 要写入目标文件的起始内存位置
	void* w_mem;
	// 要复制的大小
	unsigned long long m_size;
};

void* threadfunc(void* arg)
{
	// 解析结构体内容
	struct tag_mmap* recv = (struct tag_mmap*)arg;
	// 读的内存起始地址
	char* r = recv->r_mem;
	// 写的内存起始地址
	char* w = recv->w_mem;

#if 0
	// debug
	printf("thread id = %x, read = %x, write = %x, size = %lld\n", 
			(unsigned int)pthread_self(),
			(unsigned int)r,
			(unsigned int)w,
			recv->m_size);
#endif

	// 一个字节一个字节的赋值,如果直接 memcpy 呢?
	for (int i = 0; i < recv->m_size; i++)
	{
		*w++ = *r++;
	}

	return (void*)0;
}

int main(int argc, char* argv[])
{
	// 取源文件大小
	int rfd;
	struct stat buf;
	rfd = open(argv[1], O_RDONLY);
	fstat(rfd, &buf);
	printf("src file size = %d\n", (unsigned int)buf.st_size);

	// 创建被写的文件
	int wfd;
	// 创建被写文件,如果不存在则创建,并截断文件
	wfd = open(argv[2], O_RDWR | O_CREAT | O_TRUNC, 0644);
	// 拓展文件
	lseek(wfd, buf.st_size - 1, SEEK_SET);
	// 最后一个字节写入数据,这样才真正拓展
	write(wfd, "\0", 1);

	// 取平均数
	unsigned long long nCount = buf.st_size / THREAD_COUNT;
	// 4096整数倍,用来分割文件
	unsigned long long nSplit = nCount - (nCount % 4096);

	if (nCount < 4096 && nSplit == 0)
	{
		// 如果文件除以线程数以后小于页面的整数倍,那么就直接整个文件都映射进去处理
		pthread_t tid;
		struct tag_mmap single;
		// 读文件映射
		single.r_mem = mmap(NULL, buf.st_size, PROT_READ, MAP_SHARED, rfd, 0);
		if (single.r_mem == MAP_FAILED)
		{
			perror("mmap read");
		}
		// 写文件映射
		single.w_mem = mmap(NULL, buf.st_size, PROT_WRITE, MAP_SHARED, wfd, 0);
		if (single.w_mem == MAP_FAILED)
		{
			perror("mmap write");
		}
		// 映射的长度
		single.m_size = buf.st_size;
		// 一个线程处理复制动作即可
		pthread_create(&tid, NULL, threadfunc, (void*)&single);

		pthread_join(tid, NULL);
		munmap(single.r_mem, buf.st_size);
		munmap(single.w_mem, buf.st_size);
	}
	else
	{

		// 每个mmap后的地址和大小结构体数组
		struct tag_mmap mm[THREAD_COUNT];
		// 线程ID
		pthread_t tid[THREAD_COUNT];
		// 开始映射内存
		for (int i = 0; i < THREAD_COUNT && nCount > 4096; i++)
		{
			// 最后一段所有整数倍映射后剩余的内存大小
			if (i + 1 == THREAD_COUNT)
			{
				// 读内存映射
				mm[i].r_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit);
				if (mm[i].r_mem == MAP_FAILED)
				{
					perror("mmap read last");
				}
				// 写内存映射
				mm[i].w_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit);
				if (mm[i].w_mem == MAP_FAILED)
				{
					perror("mmap write last");
				}
				// 映射的大小保存到结构体
				mm[i].m_size = buf.st_size - i * nSplit;
				pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]);
				// printf("i = %d, %lld ----- %lld mem addr = 0x%x\n", i, i * nSplit, buf.st_size - i * nSplit, (unsigned int)mm[i].r_mem);
			}
			// 前几段符合4096整数倍的
			else
			{
				// 读内存映射
				mm[i].r_mem = mmap(NULL, nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit);
				if (mm[i].r_mem == MAP_FAILED)
				{
					perror("mmap read fast");
				}
				// 写内存映射
				mm[i].w_mem = mmap(NULL, nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit);
				if (mm[i].w_mem == MAP_FAILED)
				{
					perror("mmap write fast");
				}
				// 映射的大小保存到结构体
				mm[i].m_size = nSplit;
				pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]);
				// printf("i = %d, %lld ----- %lld, mem addr = 0x%x\n", i, i * nSplit, nSplit, (unsigned int)mm[i].r_mem);
			}
		}

		// 回收线程
		for (int join = 0; join < THREAD_COUNT; join++)
		{
			pthread_join(tid[join], NULL);
		}

		// 取消内存映射
		for (int un = 0; un < THREAD_COUNT; un++)
		{
			munmap(mm[un].r_mem, mm[un].m_size);
			munmap(mm[un].w_mem, mm[un].m_size);
		}
	}
	// 关闭文件描述符
	close(rfd);
	close(wfd);

	printf("finish...\n");
	return 0;
}

 

 

评论