为了进一步加深对线程的操作,本文介绍了使用多线程实现拷贝文件的一个案例,网络上虽然有很多多线程拷贝的案例,但是都存在重大 bug。我们独辟蹊径,首先将一个文件分段映射到内存(mmap),随后将每一段映射的内存通知给线程,由线程去对每一段已经映射的内存进行复制。具体实现代码如下:
代码实现
#include <stdio.h> // printf #include <pthread.h> // pthread #include <sys/types.h> // fstat #include <sys/stat.h> // fstat #include <fcntl.h> // open #include <sys/mman.h> // mmap #include <sys/types.h> // ftruncate #include <unistd.h> // sleep .. // 5 个线程 #define THREAD_COUNT 5 // 用以线程之间传递参数 struct tag_mmap { // 要读取源文件的起始内存位置 void* r_mem; // 要写入目标文件的起始内存位置 void* w_mem; // 要复制的大小 unsigned long long m_size; }; void* threadfunc(void* arg) { // 解析结构体内容 struct tag_mmap* recv = (struct tag_mmap*)arg; // 读的内存起始地址 char* r = recv->r_mem; // 写的内存起始地址 char* w = recv->w_mem; #if 0 // debug printf("thread id = %x, read = %x, write = %x, size = %lld\n", (unsigned int)pthread_self(), (unsigned int)r, (unsigned int)w, recv->m_size); #endif // 一个字节一个字节的赋值,如果直接 memcpy 呢? for (int i = 0; i < recv->m_size; i++) { *w++ = *r++; } return (void*)0; } int main(int argc, char* argv[]) { // 取源文件大小 int rfd; struct stat buf; rfd = open(argv[1], O_RDONLY); fstat(rfd, &buf); printf("src file size = %d\n", (unsigned int)buf.st_size); // 创建被写的文件 int wfd; // 创建被写文件,如果不存在则创建,并截断文件 wfd = open(argv[2], O_RDWR | O_CREAT | O_TRUNC, 0644); // 拓展文件 lseek(wfd, buf.st_size - 1, SEEK_SET); // 最后一个字节写入数据,这样才真正拓展 write(wfd, "\0", 1); // 取平均数 unsigned long long nCount = buf.st_size / THREAD_COUNT; // 4096整数倍,用来分割文件 unsigned long long nSplit = nCount - (nCount % 4096); if (nCount < 4096 && nSplit == 0) { // 如果文件除以线程数以后小于页面的整数倍,那么就直接整个文件都映射进去处理 pthread_t tid; struct tag_mmap single; // 读文件映射 single.r_mem = mmap(NULL, buf.st_size, PROT_READ, MAP_SHARED, rfd, 0); if (single.r_mem == MAP_FAILED) { perror("mmap read"); } // 写文件映射 single.w_mem = mmap(NULL, buf.st_size, PROT_WRITE, MAP_SHARED, wfd, 0); if (single.w_mem == MAP_FAILED) { perror("mmap write"); } // 映射的长度 single.m_size = buf.st_size; // 一个线程处理复制动作即可 pthread_create(&tid, NULL, threadfunc, (void*)&single); pthread_join(tid, NULL); munmap(single.r_mem, buf.st_size); munmap(single.w_mem, buf.st_size); } else { // 每个mmap后的地址和大小结构体数组 struct tag_mmap mm[THREAD_COUNT]; // 线程ID pthread_t tid[THREAD_COUNT]; // 开始映射内存 for (int i = 0; i < THREAD_COUNT && nCount > 4096; i++) { // 最后一段所有整数倍映射后剩余的内存大小 if (i + 1 == THREAD_COUNT) { // 读内存映射 mm[i].r_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit); if (mm[i].r_mem == MAP_FAILED) { perror("mmap read last"); } // 写内存映射 mm[i].w_mem = mmap(NULL, buf.st_size - i * nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit); if (mm[i].w_mem == MAP_FAILED) { perror("mmap write last"); } // 映射的大小保存到结构体 mm[i].m_size = buf.st_size - i * nSplit; pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]); // printf("i = %d, %lld ----- %lld mem addr = 0x%x\n", i, i * nSplit, buf.st_size - i * nSplit, (unsigned int)mm[i].r_mem); } // 前几段符合4096整数倍的 else { // 读内存映射 mm[i].r_mem = mmap(NULL, nSplit, PROT_READ, MAP_SHARED, rfd, i * nSplit); if (mm[i].r_mem == MAP_FAILED) { perror("mmap read fast"); } // 写内存映射 mm[i].w_mem = mmap(NULL, nSplit, PROT_WRITE, MAP_SHARED, wfd, i * nSplit); if (mm[i].w_mem == MAP_FAILED) { perror("mmap write fast"); } // 映射的大小保存到结构体 mm[i].m_size = nSplit; pthread_create(&tid[i], NULL, threadfunc, (void*)&mm[i]); // printf("i = %d, %lld ----- %lld, mem addr = 0x%x\n", i, i * nSplit, nSplit, (unsigned int)mm[i].r_mem); } } // 回收线程 for (int join = 0; join < THREAD_COUNT; join++) { pthread_join(tid[join], NULL); } // 取消内存映射 for (int un = 0; un < THREAD_COUNT; un++) { munmap(mm[un].r_mem, mm[un].m_size); munmap(mm[un].w_mem, mm[un].m_size); } } // 关闭文件描述符 close(rfd); close(wfd); printf("finish...\n"); return 0; }