source: proto/parabix2/pablo_template_multithreads.cpp @ 4355

Last change on this file since 4355 was 2547, checked in by ksherdy, 7 years ago

Updated templates for new perfsec.h

File size: 18.0 KB
RevLine 
[2142]1/*
2
3* OVERALL
4
5Coarse grained software pipeline implementation
6of xmlwf application using the pthreads library.
7
8xmlwf template logic manually partitioned into four hard coded stages. A stage
9implements a portion of the logic of the xmlwf application logic. A stage is broken
10into a number of segment size passes. A pass executes a number of blocks. 
11Passes are manually balanced and execute under separate threads.
12
13Circular queue implementation. Each queue entry contains a 'SEGMENT' of
14parallel data 'stream' structs. The source buffer is a member of the set of
15parallel data structs. Circular queue implemented as hard coded circular
16queue of size 8.
17
18* SYNCHRONIZATION
19
20A pass can progress up to the minimum of tail of the (8 segment) circular queue
21or the prior segment of the previous stage.
22
23BUFFER MANAGMENT / FILE MANAGEMENT
24
25Two buffers: Overlap buffer, Source buffer.
26
27Source Buffer Size: BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2
28
29Source Buffer Layout per Circular Queue Entry:
30
31| OVERLAP_BUFSIZE (16 bytes) | OVERLAP_BUFSIZE (16 bytes) | (BLOCK_SIZE * (SEGMENT_BLOCKS+1)) |
32^                            ^
33|                            |
34srcbuf                       srcptr 
35
36Overlap Buffer Layout:
37
38| OVERLAP_BUFSIZE * 2 (32 bytes)                          |
39
40
41* PROCESSING MODEL
42
43Slurps complete XML file.
44
45Counts the number of complete file segments.
46
47Creates thread for each stage.
48
49Threads process multiple segments as passes of a stage.
50
51*/
52
53
[1051]54#include <stdio.h>
55#include <stdlib.h>
56#include <errno.h>
57#include <sys/types.h>
58#include <sys/stat.h>
59#include <pthread.h>
60#include "../lib/lib_simd.h"
61#define BLOCK_SIZE 128
62#define SEGMENT_BLOCKS 128
63#define SEGMENT_SIZE (BLOCK_SIZE * SEGMENT_BLOCKS)
64#define OVERLAP_BUFSIZE 16
65
[1906]66#include "../lib/builtins.hpp"
[1051]67#include "../lib/carryQ.h"
68#include "xmldecl.h"
69#include "xml_error.c"
70#include "xmldecl.c"
71#include "namechars.h"
72
73#include "../lib/perflib/perfsec.h"
74#include "../lib/s2p.h"
75
[1055]76#include "TagMatcher_multithreads.h"
[1057]77#include "LineColTracker_multithreads.h"
[1051]78BitBlock EOF_mask = simd_const_1(1);
79
80#ifdef BUFFER_PROFILING
81        BOM_Table * parser_timer;
[1058]82#ifdef ANALYZE
83        BOM_Table * T1_total_timer;
84        BOM_Table * T2_total_timer;
85        BOM_Table * T3_total_timer;
86        BOM_Table * T4_total_timer;
87        BOM_Table * T1_work_timer;
88        BOM_Table * T2_work_timer;
89        BOM_Table * T3_work_timer;
90        BOM_Table * T4_work_timer;     
[1051]91#endif
[2547]92/*
93#ifdef BUFFER_PROFILING
94    BOM_Table * parser_timer;
95#elif PAPI
96                #define PAPI_EVENTS_COUNT 2
97                int PAPI_EVENTS[PAPI_EVENTS_COUNT] = {PAPI_TOT_CYC, PAPI_BR_MSP};       
98    CC * parser_timer;
99#else
100    void * parser_timer;
[1058]101#endif
[2547]102*/
103#endif
[1051]104LineColTracker tracker;
105TagMatcher matcher;
106
107int block_base=0;
108int buffer_base=0;
109char * source;
110
111static inline int StreamScan(ScanBlock * stream, int blk_count, int ProcessPos(int)) {
112        int blk;
113        int block_pos = 0;
114
115        for (blk = 0; blk < blk_count; blk++) {
[1906]116                scanword_t s = stream[blk];
[1051]117                while(s) {
[1906]118                        int code = (ProcessPos(scan_forward_zeroes(s) + block_pos));
[1051]119                        if (code) return code;
120                        s = s & (s-1);  // clear rightmost bit.
121                }
122                block_pos += 8 * sizeof(ScanBlock);
123        }
124        return 0;
125}
126
127static inline void ReportError(const char * error_msg, int error_pos_in_block) {
128  int error_line, error_column;
129  tracker.get_Line_and_Column(error_pos_in_block, error_line, error_column);
130  fprintf(stderr, "%s at line %i, column %i\n", error_msg, error_line, error_column);
131}
132
133
134static inline int NameStrt_check(int pos) {
135        int block_pos = block_base + pos;
136        if(XML_10_UTF8_NameStrt_bytes((unsigned char*)&source[block_pos]) == 0){
137              ReportError("name start error", pos);
138              exit(-1);
139        }
140        return 0;
141}
142
143static inline int Name_check(int pos) {
144        int block_pos = block_base + pos;
145        if(XML_10_UTF8_NameChar_bytes((unsigned char*)&source[block_pos]) == 0){
146              ReportError("name error", pos);
147              exit(-1);
148        }
149        return 0;
150}
151
152static inline int PIName_check(int pos) {
153        int block_pos = block_base + pos;
154        int file_pos = block_pos+buffer_base;
155        if (at_XxMmLll<ASCII>((unsigned char*)&source[block_pos]) && (source[block_pos+3]=='?' || source[block_pos+3]<= ' ')) {
156              // "<?xml" legal at start of file.
157              if ((file_pos == 2) && at_XmlDecl_start<ASCII>((unsigned char*)&source[0])) return 0;
158              ReportError("[Xx][Mm][Ll] illegal as PI name", pos);
159              exit(-1);
160        }
161        return 0;
162}
163
164static inline int CD_check(int pos) {
165        int block_pos = block_base + pos;
166        if (!at_CDATA1<ASCII>((unsigned char*)&source[block_pos])){
167              ReportError("CDATA error", pos);
168              exit(-1);
169        }
170        return 0;
171}
172
173static inline int GenRef_check(int pos) {
174        int block_pos = block_base + pos;
175        unsigned char* s = (unsigned char*)&source[block_pos];
176        if (!(at_Ref_gt<ASCII>(s)||at_Ref_lt<ASCII>(s)||at_Ref_amp<ASCII>(s)||at_Ref_quot<ASCII>(s)||at_Ref_apos<ASCII>(s))){
177              ReportError("Undefined reference", pos);
178              exit(-1);
179        }
180        return 0;
181}
182
183static inline int HexRef_check(int pos) {
184        int block_pos = block_base + pos;
185        unsigned char* s = (unsigned char*)&source[block_pos];
186        int ch_val = 0;
187        while(at_HexDigit<ASCII>(s)){
188          ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
189          if (ch_val> 0x10FFFF ){
190            ReportError("Illegal character reference", pos);
191            exit(-1);
192          }
193          s++;
194        }
195        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
196          ReportError("Illegal character reference", pos);
197          exit(-1);
198        }
199        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
200          ReportError("Illegal XML 1.0 character reference", pos);
201          exit(-1);
202        }
203        return 0;
204}
205
206static inline int DecRef_check(int pos) {
207        int block_pos = block_base + pos;
208        unsigned char* s = (unsigned char*)&source[block_pos];
209        int ch_val = 0;
210        while(at_HexDigit<ASCII>(s)){
211          ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
212          if (ch_val> 0x10FFFF ){
213            ReportError("Illegal character reference", pos);
214            exit(-1);
215          }
216          s++;
217        }
218        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
219          ReportError("Illegal character reference", pos);
220          exit(-1);
221        }
222        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
223          ReportError("Illegal XML 1.0 character reference", pos);
224          exit(-1);
225        }
226        return 0;
227}
228
229static inline int AttRef_check(int pos) {
230        int block_pos = block_base + pos;
231        unsigned char* s = (unsigned char*)&source[block_pos];
232        int ch_val = 0;
233        if(s[0]=='#'){
234          s++;
235          if(s[0]=='x' || s[0]=='X'){
236            s++;
237            while(at_HexDigit<ASCII>(s)){
238              ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
239              s++;
240            }
241          }
242          else{
243            while(at_HexDigit<ASCII>(s)){
244              ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
245              s++;
246            }
247          }
248          if (ch_val==60){
249            ReportError("Attribute values contain '<' characters after reference expansion", pos);
250            exit(-1);
251          }
252        }
253        else if(at_Ref_lt<ASCII>(s)){
254          ReportError("Attribute values contain '<' characters after reference expansion", pos);
255          exit(-1);
256        }
257        return 0;
258}
259
260
261
262@global
263
264static inline void s2p_do_block(BytePack U8[], Basis_bits & basis_bits) {
265  s2p(U8[0], U8[1], U8[2], U8[3], U8[4], U8[5], U8[6], U8[7],
266        basis_bits.bit_0, basis_bits.bit_1, basis_bits.bit_2, basis_bits.bit_3, basis_bits.bit_4, basis_bits.bit_5, basis_bits.bit_6, basis_bits.bit_7);
267}
268
269static inline void s2p_do_final_block(BytePack U8[], Basis_bits & basis_bits, BitBlock EOF_mask) {
270  s2p_do_block(U8, basis_bits);
271  basis_bits.bit_0 = simd_and(basis_bits.bit_0, EOF_mask);
272  basis_bits.bit_1 = simd_and(basis_bits.bit_1, EOF_mask);
273  basis_bits.bit_2 = simd_and(basis_bits.bit_2, EOF_mask);
274  basis_bits.bit_3 = simd_and(basis_bits.bit_3, EOF_mask);
275  basis_bits.bit_4 = simd_and(basis_bits.bit_4, EOF_mask);
276  basis_bits.bit_5 = simd_and(basis_bits.bit_5, EOF_mask);
277  basis_bits.bit_6 = simd_and(basis_bits.bit_6, EOF_mask);
278  basis_bits.bit_7 = simd_and(basis_bits.bit_7, EOF_mask);
279}
280
281static inline void s2p_do_segment(BytePack U8[], Basis_bits basis_bits[]) {
282  for (int i = 0; i < SEGMENT_BLOCKS; i++)
283           s2p_do_block(&U8[8*i], basis_bits[i]);
284}
285
286static inline void postprocess_do_block(Lex & lex, CtCDPI_Callouts & ctCDPI_Callouts, Ref_Callouts & ref_Callouts, Check_streams & check_streams, int chars_avail){
287            tracker.StoreNewlines(lex.LF);
288
289                if (bitblock_has_bit(simd_or(check_streams.non_ascii_name_starts, check_streams.non_ascii_names))) {
290                  StreamScan((ScanBlock *) &check_streams.non_ascii_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), NameStrt_check);
291                  StreamScan((ScanBlock *) &check_streams.non_ascii_names, sizeof(BitBlock)/sizeof(ScanBlock), Name_check);
292                }
293
294                if (bitblock_has_bit(ctCDPI_Callouts.PI_name_starts)){
295                  StreamScan((ScanBlock *) &ctCDPI_Callouts.PI_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), PIName_check);
296                }
297
298                if (bitblock_has_bit(ctCDPI_Callouts.CD_starts)){
299                  StreamScan((ScanBlock *) &ctCDPI_Callouts.CD_starts, sizeof(BitBlock)/sizeof(ScanBlock), CD_check);
300                }
301
302                if (bitblock_has_bit(ref_Callouts.GenRef_starts)){
303                  StreamScan((ScanBlock *) &ref_Callouts.GenRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), GenRef_check);
304                }
305
306                if (bitblock_has_bit(ref_Callouts.DecRef_starts)){
307                  StreamScan((ScanBlock *) &ref_Callouts.DecRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), DecRef_check);
308                }
309
310                if (bitblock_has_bit(ref_Callouts.HexRef_starts)){
311                  StreamScan((ScanBlock *) &ref_Callouts.HexRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), HexRef_check);
312                }
313
314                if (bitblock_has_bit(check_streams.att_refs)){
315                  StreamScan((ScanBlock *) &check_streams.att_refs, sizeof(BitBlock)/sizeof(ScanBlock), AttRef_check);
316                }
317
318                if (bitblock_has_bit(check_streams.error_mask)) {
319                  int errpos = count_forward_zeroes(check_streams.error_mask);
320                  ReportError("error found", errpos);
321              exit(-1);
322                }
323
324                matcher.store_streams(check_streams.tag_marks, check_streams.name_follows, check_streams.misc_mask, chars_avail);
325                tracker.AdvanceBlock();
326}
327
328static inline void postprocess_do_segment(Lex lex[], CtCDPI_Callouts ctCDPI_Callouts[], Ref_Callouts ref_Callouts[], Check_streams check_streams[]){
329
330  for (int i = 0; i < SEGMENT_BLOCKS; i++){
331           postprocess_do_block(lex[i], ctCDPI_Callouts[i], ref_Callouts[i], check_streams[i], SEGMENT_SIZE);
332           block_base += BLOCK_SIZE;
333  }
334
335        matcher.StreamScan(SEGMENT_SIZE);
336        matcher.Advance_segment();
337        tracker.Advance_buffer();
338}
339
340struct Para_data{
341
342          char srcbuf[BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2];
343
344          struct U8 u8[SEGMENT_BLOCKS];
345
346          struct Lex lex[SEGMENT_BLOCKS];
347
348          struct Scope1 scope1[SEGMENT_BLOCKS];
349
350          struct CtCDPI_Callouts ctCDPI_Callouts[SEGMENT_BLOCKS];
351
352          struct Ref_Callouts ref_Callouts[SEGMENT_BLOCKS];
353
354          struct Tag_Callouts tag_Callouts[SEGMENT_BLOCKS];
355
356          struct Basis_bits basis_bits[SEGMENT_BLOCKS];
357
358          struct Check_streams check_streams[SEGMENT_BLOCKS];
359
360          struct Xml_names xml_names[SEGMENT_BLOCKS];
361};
362
[1055]363  Validate_utf8 validate_utf8;
364  Parse_refs parse_refs;
365  Parse_tags parse_tags;
366  Parse_CtCDPI parse_CtCDPI;
367  Do_check_streams do_check_streams;
368  Classify_bytes classify_bytes;
369  Add_scope_streams add_scope_streams;
370  Validate_xml_names validate_xml_names;
[1051]371
[1055]372
[1051]373int file_segs=0;
374FILE *infile;
375FILE *outfile;
376
377template<typename T>
378class spsc_queue
379{
380public:
381  spsc_queue()
382  {
[1058]383          head1_ =  head2_ = head3_ = head4_ = 1;
384      tail_ = 0;
[1051]385  }
386
387  ~spsc_queue()
388  {
[1058]389
[1051]390  }
391
[1058]392  bool pass1()
[1051]393  {
[1058]394          while(head1_==tail_)
395          __sync_synchronize();
[1051]396
[1058]397#ifdef ANALYZE
398PERF_SEC_START(T1_work_timer);
399#endif
400          char * srcptr = n[head1_].srcbuf + OVERLAP_BUFSIZE;
401          memmove(n[head1_].srcbuf, overlap_buffer, 2*overlap_bytes);
402          int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
403          memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
404          overlap_bytes = OVERLAP_BUFSIZE;
405          s2p_do_segment((BytePack *) srcptr, n[head1_].basis_bits);
406          classify_bytes.do_segment(n[head1_].basis_bits, n[head1_].lex);
[1055]407
[1058]408#ifdef ANALYZE
409PERF_SEC_END(T1_work_timer, SEGMENT_SIZE);
410#endif
411          head1_ = (head1_+1)%8;
412
[1051]413  }
[1058]414
[1051]415  bool pass2()
416  {
[1058]417          while(head2_==head1_)
418          __sync_synchronize();
[1051]419
[1058]420#ifdef ANALYZE
421PERF_SEC_START(T2_work_timer);
422#endif
423          validate_utf8.do_segment(n[head2_].basis_bits, n[head2_].u8);
424          add_scope_streams.do_segment(n[head2_].lex, n[head2_].scope1);
425          parse_CtCDPI.do_segment(n[head2_].ctCDPI_Callouts, n[head2_].lex, n[head2_].scope1, n[head2_].check_streams);
426          parse_refs.do_segment(n[head2_].lex, n[head2_].scope1, n[head2_].ctCDPI_Callouts, n[head2_].ref_Callouts);
[1051]427
[1058]428#ifdef ANALYZE
429PERF_SEC_END(T2_work_timer, SEGMENT_SIZE);
430#endif
431          head2_ = (head2_+1)%8;
432
[1051]433  }
434
435  bool pass3()
436  {
[1058]437          while(head3_==head2_)
438          __sync_synchronize();
[1051]439
[1058]440#ifdef ANALYZE
441PERF_SEC_START(T3_work_timer);
442#endif
443         parse_tags.do_segment(n[head3_].lex, n[head3_].scope1, n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts);
444         validate_xml_names.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].ref_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].xml_names, n[head3_].check_streams);
445                  do_check_streams.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].scope1, n[head3_].ref_Callouts, n[head3_].xml_names, n[head3_].check_streams);
446          head3_ = (head3_+1)%8;
447
448#ifdef ANALYZE
449PERF_SEC_END(T3_work_timer, SEGMENT_SIZE);
450#endif
[1051]451  }
452
453  bool pass4()
454  {
[1058]455          while(head4_==head3_)
456          __sync_synchronize();
457
458#ifdef ANALYZE
459PERF_SEC_START(T4_work_timer);
460#endif
461          source = n[head4_].srcbuf+OVERLAP_BUFSIZE;
[1051]462          matcher.setSrc(source);
[1058]463          postprocess_do_segment(n[head4_].lex, n[head4_].ctCDPI_Callouts, n[head4_].ref_Callouts, n[head4_].check_streams);
[1051]464          block_base = 0;
465
[1058]466#ifdef ANALYZE
467PERF_SEC_END(T4_work_timer, SEGMENT_SIZE);
468#endif
469          head4_ = (head4_+1)%8;
470          tail_ = (tail_+1)%8;
[1051]471
[1058]472  }
[1051]473
474  int overlap_bytes;
475
476  char overlap_buffer[2*OVERLAP_BUFSIZE];
477
478
[1058]479private:
[1051]480
[1058]481  int tail_; // tail of the queue
482  int head1_; // head of the queue
483  int head2_;
484  int head3_;
485  int head4_;
486  T  n[8];
[1051]487
[1058]488
[1051]489  spsc_queue(spsc_queue const&);
490  spsc_queue& operator = (spsc_queue const&);
491};
492
493spsc_queue<struct Para_data> q;
494
495void *stageOne(void *threadid)
496{
497   long tid = (long)threadid;
498   for(int i=0;i<file_segs;i++){
[1058]499#ifdef ANALYZE
500PERF_SEC_START(T1_total_timer);
501#endif
[1051]502           q.pass1();
503//         printf("thread %ld: %i\n",tid,i);
[1058]504#ifdef ANALYZE
505PERF_SEC_END(T1_total_timer, SEGMENT_SIZE);
506#endif
[1051]507   }
508   pthread_exit(NULL);
509}
510void *stageTwo(void *threadid)
511{
512   long tid = (long)threadid;
513   for(int i=0;i<file_segs;i++){
[1058]514#ifdef ANALYZE
515PERF_SEC_START(T2_total_timer);
516#endif
517           q.pass2();
[1051]518//         printf("thread %ld: %i\n",tid,i);
[1058]519#ifdef ANALYZE
520PERF_SEC_END(T2_total_timer, SEGMENT_SIZE);
521#endif
[1051]522   }
523   pthread_exit(NULL);
524}
525
526void *stageThree(void *threadid)
527{
528   long tid = (long)threadid;
529   for(int i=0;i<file_segs;i++){
[1058]530#ifdef ANALYZE
531PERF_SEC_START(T3_total_timer);
532#endif
533           q.pass3();
[1051]534//         printf("thread %ld: %i\n",tid,i);
[1058]535#ifdef ANALYZE
536PERF_SEC_END(T3_total_timer, SEGMENT_SIZE);
537#endif
[1051]538   }
539   pthread_exit(NULL);
540}
541void *stageFour(void *threadid)
542{
543   long tid = (long)threadid;
544   for(int i=0;i<file_segs;i++){
[1058]545#ifdef ANALYZE
546PERF_SEC_START(T4_total_timer);
547#endif
548           q.pass4();
[1051]549//         printf("thread %ld: %i\n",tid,i);
[1058]550#ifdef ANALYZE
551PERF_SEC_END(T4_total_timer, SEGMENT_SIZE);
552#endif
[1051]553   }
554   pthread_exit(NULL);
555}
556
557#define NUM_THREADS 4
558
559int
560main(int argc, char * argv[]) {
561        char * infilename, * outfilename;
562        FILE *tmpinfile, *tmpoutfile;
563        struct stat fileinfo;
564
565    pthread_t threads[NUM_THREADS];
566    int rc;
567    long t=0;
568    void *status;
569
570        if (argc < 2) {
571                printf("Usage: %s <filename> [<outputfile>]\n", argv[0]);
572                exit(-1);
573        }
574
575        infilename = argv[1];
576        stat(infilename, &fileinfo);
577        file_segs = fileinfo.st_size/SEGMENT_SIZE;
578        infile = fopen(infilename, "rb");
579        if (!infile) {
580                fprintf(stderr, "Error: cannot open %s for input.\n", infilename);
581                exit(-1);
582        }
583
584        if (argc < 3) outfile = stdout;
585        else {
586                outfilename = argv[2];
587                outfile = fopen(outfilename, "wb");
588                if (!outfile) {
589                        fprintf(stderr, "Error: cannot open %s for writing.\n", outfilename);
590                        exit(-1);
591                }
592        }
593
594        PERF_SEC_INIT(parser_timer);
[1058]595#ifdef ANALYZE
596        PERF_SEC_INIT(T1_total_timer);
597        PERF_SEC_INIT(T2_total_timer);
598        PERF_SEC_INIT(T3_total_timer);
599        PERF_SEC_INIT(T4_total_timer);
600        PERF_SEC_INIT(T1_work_timer);
601        PERF_SEC_INIT(T2_work_timer);
602        PERF_SEC_INIT(T3_work_timer);
603        PERF_SEC_INIT(T4_work_timer);
604#endif
[1051]605
606        PERF_SEC_START(parser_timer);
607
608
609          rc = pthread_create(&threads[t], NULL, stageOne, (void *)t);
610          if (rc){
611                  printf("ERROR; return code from pthread_create() is %d\n", rc);
612                  exit(-1);
613          }
614          t++;
615          rc = pthread_create(&threads[t], NULL, stageTwo, (void *)t);
616          if (rc){
617                  printf("ERROR; return code from pthread_create() is %d\n", rc);
618                  exit(-1);
619          }
620
621          t++;
622          rc = pthread_create(&threads[t], NULL, stageThree, (void *)t);
623          if (rc){
624                  printf("ERROR; return code from pthread_create() is %d\n", rc);
625                  exit(-1);
626          }
627
628          t++;
629          rc = pthread_create(&threads[t], NULL, stageFour, (void *)t);
630          if (rc){
631                  printf("ERROR; return code from pthread_create() is %d\n", rc);
632                  exit(-1);
633          }
634
635          for(t=0; t<NUM_THREADS; t++) {
636             rc = pthread_join(threads[t], &status);
637             if (rc) {
638                printf("ERROR; return code from pthread_join() is %d\n", rc);
639                exit(-1);
640             }
641//           printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
642          }
643
644
645        PERF_SEC_END(parser_timer, fileinfo.st_size/SEGMENT_SIZE*SEGMENT_SIZE);
646
647        PERF_SEC_DUMP(parser_timer);
[1058]648#ifdef ANALYZE
649printf("\nT1_total_timer\n");
650        PERF_SEC_DUMP(T1_total_timer);
651printf("\nT2_total_timer\n");
652        PERF_SEC_DUMP(T2_total_timer);
653printf("\nT3_total_timer\n");
654        PERF_SEC_DUMP(T3_total_timer);
655printf("\nT4_total_timer\n");
656        PERF_SEC_DUMP(T4_total_timer);
657printf("\nT1_work_timer\n");
658        PERF_SEC_DUMP(T1_work_timer);
659printf("\nT2_work_timer\n");
660        PERF_SEC_DUMP(T2_work_timer);
661printf("\nT3_work_timer\n");
662        PERF_SEC_DUMP(T3_work_timer);
663printf("\nT4_work_timer\n");
664        PERF_SEC_DUMP(T4_work_timer);
665#endif
[1051]666
667        PERF_SEC_DESTROY(parser_timer);
[1058]668#ifdef ANALYZE
669        PERF_SEC_DESTROY(T1_total_timer);
670        PERF_SEC_DESTROY(T2_total_timer);
671        PERF_SEC_DESTROY(T3_total_timer);
672        PERF_SEC_DESTROY(T4_total_timer);
673        PERF_SEC_DESTROY(T1_work_timer);
674        PERF_SEC_DESTROY(T2_work_timer);
675        PERF_SEC_DESTROY(T3_work_timer);
676        PERF_SEC_DESTROY(T4_work_timer);
677#endif
[1051]678//      printf("count = %i\n", q.count);
679
680        pthread_exit(NULL);
681
682        fclose(infile);
683        fclose(outfile);
684        return(0);
685}
Note: See TracBrowser for help on using the repository browser.