Debug swcsa
[SXSI/TextCollection.git] / incbwt / parallel_build.cpp
1 #include <algorithm>
2 #include <cstdlib>
3 #include <fstream>
4 #include <iostream>
5 #include <vector>
6
7 #ifdef MULTITHREAD_SUPPORT
8 #include <omp.h>
9 #endif
10
11 #include "rlcsa_builder.h"
12 #include "misc/utils.h"
13
14
15 using namespace CSA;
16
17
18 double indexParts(std::vector<std::string>& filename, usint threads, Parameters& parameters);
19
20 const int MAX_THREADS = 64;
21
22
23 int
24 main(int argc, char** argv)
25 {
26   std::cout << "Parallel RLCSA builder" << std::endl;
27   if(argc < 3)
28   {
29     std::cout << "Usage: parallel_build [-n] listname output [threads]" << std::endl;
30     std::cout << "  -n   do not merge the indexes" << std::endl;
31     return 1;
32   }
33
34   int list_parameter = 1, output_parameter = 2, threads_parameter = 3;
35   bool do_merge = true;
36   if(std::string("-n").compare(argv[1]) == 0)
37   {
38     list_parameter++; output_parameter++; threads_parameter++;
39     do_merge = false;
40     std::cout << "Option '-n' specified. Partial indexes will not be merged." << std::endl;
41   }
42
43   std::ifstream filelist(argv[list_parameter], std::ios_base::binary);
44   if(!filelist)
45   {
46     std::cerr << "Error opening file list!" << std::endl;
47     return 2;
48   }
49   std::vector<std::string> files;
50   readRows(filelist, files, true);
51   filelist.close();
52   std::cout << "Input files: " << files.size() << std::endl;
53
54   std::string base_name = argv[output_parameter];
55   std::cout << "Output: " << base_name << std::endl;
56
57   usint threads = 1;
58   if(argc > threads_parameter)
59   {
60     threads = std::min(MAX_THREADS, std::max(atoi(argv[threads_parameter]), 1));
61   }
62   std::cout << "Threads: " << threads << std::endl; 
63   std::cout << std::endl;
64
65   std::string parameters_name = base_name + PARAMETERS_EXTENSION;
66   Parameters parameters;
67   parameters.set(RLCSA_BLOCK_SIZE);
68   parameters.set(SAMPLE_RATE);
69   parameters.set(SUPPORT_LOCATE);
70   parameters.set(SUPPORT_DISPLAY);
71   parameters.read(parameters_name);
72   parameters.print();
73
74   double start = readTimer();
75   double megabytes = indexParts(files, threads, parameters);
76
77   RLCSABuilder builder(parameters.get(RLCSA_BLOCK_SIZE), parameters.get(SAMPLE_RATE), 0, threads);
78   if(do_merge)
79   {
80     std::cout << "Phase 2: Merging the indexes" << std::endl;
81     for(std::vector<std::string>::iterator iter = files.begin(); iter != files.end(); iter++)
82     {
83       std::cout << "Increment: " << *iter << std::endl;
84       builder.insertFromFile(*iter);
85     }
86     std::cout << std::endl;
87   
88     RLCSA* index = builder.getRLCSA();
89     if(index != 0 && index->isOk())
90     {
91       index->printInfo();
92       index->reportSize(true);
93       index->writeTo(base_name);
94       parameters.write(parameters_name);
95     }
96     delete index;
97   }
98
99   double stop = readTimer();
100   std::cout << megabytes << " megabytes indexed in " << (stop - start) << " seconds (" << (megabytes / (stop - start)) << " MB/s)." << std::endl;
101   if(do_merge)
102   {
103     std::cout << "Search time:  " << builder.getSearchTime() << " seconds" << std::endl;
104     std::cout << "Sort time:    " << builder.getSortTime() << " seconds" << std::endl;
105     std::cout << "Merge time:   " << builder.getMergeTime() << " seconds" << std::endl;
106   }
107   std::cout << std::endl;
108
109   return 0;
110 }
111
112
113 double
114 indexParts(std::vector<std::string>& filenames, usint threads, Parameters& parameters)
115 {
116   double start = readTimer();
117   std::cout << "Phase 1: Building indexes for input files" << std::endl;
118   usint block_size = parameters.get(RLCSA_BLOCK_SIZE);
119   usint sample_rate = parameters.get(SAMPLE_RATE);
120   usint total_size = 0;
121
122   std::ifstream* input_file;
123   usint size;
124   std::string parameters_name;
125   RLCSA* index;
126   uchar* data;
127   sint i;
128
129   #ifdef MULTITHREAD_SUPPORT
130   omp_set_num_threads(threads);
131   #pragma omp parallel private(input_file, size, parameters_name, index, data)
132   {
133     #pragma omp for schedule(dynamic, 1)
134   #endif
135     for(i = 0; i < (sint)(filenames.size()); i++)
136     {
137       #ifdef MULTITHREAD_SUPPORT
138       #pragma omp critical
139       {
140       #endif
141         size = 0; data = 0;
142         std::cout << "Input: " << filenames[i] << std::endl;
143         input_file = new std::ifstream(filenames[i].c_str(), std::ios_base::binary);
144         if(input_file == 0)
145         {
146           std::cerr << "Error opening input file " << filenames[i] << "!" << std::endl;
147         }
148         else
149         {
150           size = fileSize(*input_file);
151           data = new uchar[size];
152           input_file->read((char*)data, size);
153           delete input_file;
154         }
155       #ifdef MULTITHREAD_SUPPORT
156       }
157       #endif
158
159       if(size > 0)
160       {
161         index = new RLCSA(data, size, block_size, sample_rate, true, true);
162         if(index != 0 && index->isOk()) { index->writeTo(filenames[i]); }
163         delete index;
164
165         #ifdef MULTITHREAD_SUPPORT
166         #pragma omp critical
167         {
168         #endif
169           total_size += size;
170           parameters_name = filenames[i] + PARAMETERS_EXTENSION;
171           parameters.write(parameters_name);
172         #ifdef MULTITHREAD_SUPPORT
173         }
174         #endif
175       }
176       else
177       {
178         #ifdef MULTITHREAD_SUPPORT
179         #pragma omp critical
180         {
181         #endif
182           std::cerr << "Warning: Empty input file " << filenames[i] << "!" << std::endl;
183         #ifdef MULTITHREAD_SUPPORT
184         }
185         #endif
186       }
187     }
188   #ifdef MULTITHREAD_SUPPORT
189   }
190   #endif
191
192   double stop = readTimer();
193   double megabytes = total_size / (double)MEGABYTE;
194   std::cout << "Indexed " << megabytes << " megabytes in " << (stop - start) << " seconds (" << (megabytes / (stop - start)) << " MB/s)." << std::endl;
195   std::cout << std::endl;
196
197   return megabytes;
198 }