source: proto/parabix2/pablo_template_multithreads.cpp @ 1058

Last change on this file since 1058 was 1058, checked in by lindanl, 8 years ago

multithreads

File size: 16.4 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <errno.h>
4#include <sys/types.h>
5#include <sys/stat.h>
6#include <pthread.h>
7#include "../lib/lib_simd.h"
8#define BLOCK_SIZE 128
9#define SEGMENT_BLOCKS 128
10#define SEGMENT_SIZE (BLOCK_SIZE * SEGMENT_BLOCKS)
11#define OVERLAP_BUFSIZE 16
12
13typedef uint64_t ScanBlock;
14typedef SIMD_type BytePack;
15typedef SIMD_type BitBlock;
16
17#include "../lib/carryQ.h"
18#include "xmldecl.h"
19#include "xml_error.c"
20#include "xmldecl.c"
21#include "namechars.h"
22
23#include "../lib/perflib/perfsec.h"
24#include "../lib/s2p.h"
25
26#include "TagMatcher_multithreads.h"
27#include "LineColTracker_multithreads.h"
28BitBlock EOF_mask = simd_const_1(1);
29
30#ifdef BUFFER_PROFILING
31        BOM_Table * parser_timer;
32#ifdef ANALYZE
33        BOM_Table * T1_total_timer;
34        BOM_Table * T2_total_timer;
35        BOM_Table * T3_total_timer;
36        BOM_Table * T4_total_timer;
37        BOM_Table * T1_work_timer;
38        BOM_Table * T2_work_timer;
39        BOM_Table * T3_work_timer;
40        BOM_Table * T4_work_timer;     
41#endif
42#endif
43LineColTracker tracker;
44TagMatcher matcher;
45
46int block_base=0;
47int buffer_base=0;
48char * source;
49
50static inline int StreamScan(ScanBlock * stream, int blk_count, int ProcessPos(int)) {
51        int blk;
52        int block_pos = 0;
53
54        for (blk = 0; blk < blk_count; blk++) {
55                ScanBlock s = stream[blk];
56                while(s) {
57                        int code = (ProcessPos(cfzl(s) + block_pos));
58                        if (code) return code;
59                        s = s & (s-1);  // clear rightmost bit.
60                }
61                block_pos += 8 * sizeof(ScanBlock);
62        }
63        return 0;
64}
65
66
67static inline void ReportError(const char * error_msg, int error_pos_in_block) {
68  int error_line, error_column;
69  tracker.get_Line_and_Column(error_pos_in_block, error_line, error_column);
70  fprintf(stderr, "%s at line %i, column %i\n", error_msg, error_line, error_column);
71}
72
73
74static inline int NameStrt_check(int pos) {
75        int block_pos = block_base + pos;
76        if(XML_10_UTF8_NameStrt_bytes((unsigned char*)&source[block_pos]) == 0){
77              ReportError("name start error", pos);
78              exit(-1);
79        }
80        return 0;
81}
82
83static inline int Name_check(int pos) {
84        int block_pos = block_base + pos;
85        if(XML_10_UTF8_NameChar_bytes((unsigned char*)&source[block_pos]) == 0){
86              ReportError("name error", pos);
87              exit(-1);
88        }
89        return 0;
90}
91
92static inline int PIName_check(int pos) {
93        int block_pos = block_base + pos;
94        int file_pos = block_pos+buffer_base;
95        if (at_XxMmLll<ASCII>((unsigned char*)&source[block_pos]) && (source[block_pos+3]=='?' || source[block_pos+3]<= ' ')) {
96              // "<?xml" legal at start of file.
97              if ((file_pos == 2) && at_XmlDecl_start<ASCII>((unsigned char*)&source[0])) return 0;
98              ReportError("[Xx][Mm][Ll] illegal as PI name", pos);
99              exit(-1);
100        }
101        return 0;
102}
103
104static inline int CD_check(int pos) {
105        int block_pos = block_base + pos;
106        if (!at_CDATA1<ASCII>((unsigned char*)&source[block_pos])){
107              ReportError("CDATA error", pos);
108              exit(-1);
109        }
110        return 0;
111}
112
113static inline int GenRef_check(int pos) {
114        int block_pos = block_base + pos;
115        unsigned char* s = (unsigned char*)&source[block_pos];
116        if (!(at_Ref_gt<ASCII>(s)||at_Ref_lt<ASCII>(s)||at_Ref_amp<ASCII>(s)||at_Ref_quot<ASCII>(s)||at_Ref_apos<ASCII>(s))){
117              ReportError("Undefined reference", pos);
118              exit(-1);
119        }
120        return 0;
121}
122
123static inline int HexRef_check(int pos) {
124        int block_pos = block_base + pos;
125        unsigned char* s = (unsigned char*)&source[block_pos];
126        int ch_val = 0;
127        while(at_HexDigit<ASCII>(s)){
128          ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
129          if (ch_val> 0x10FFFF ){
130            ReportError("Illegal character reference", pos);
131            exit(-1);
132          }
133          s++;
134        }
135        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
136          ReportError("Illegal character reference", pos);
137          exit(-1);
138        }
139        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
140          ReportError("Illegal XML 1.0 character reference", pos);
141          exit(-1);
142        }
143        return 0;
144}
145
146static inline int DecRef_check(int pos) {
147        int block_pos = block_base + pos;
148        unsigned char* s = (unsigned char*)&source[block_pos];
149        int ch_val = 0;
150        while(at_HexDigit<ASCII>(s)){
151          ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
152          if (ch_val> 0x10FFFF ){
153            ReportError("Illegal character reference", pos);
154            exit(-1);
155          }
156          s++;
157        }
158        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
159          ReportError("Illegal character reference", pos);
160          exit(-1);
161        }
162        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
163          ReportError("Illegal XML 1.0 character reference", pos);
164          exit(-1);
165        }
166        return 0;
167}
168
169static inline int AttRef_check(int pos) {
170        int block_pos = block_base + pos;
171        unsigned char* s = (unsigned char*)&source[block_pos];
172        int ch_val = 0;
173        if(s[0]=='#'){
174          s++;
175          if(s[0]=='x' || s[0]=='X'){
176            s++;
177            while(at_HexDigit<ASCII>(s)){
178              ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
179              s++;
180            }
181          }
182          else{
183            while(at_HexDigit<ASCII>(s)){
184              ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
185              s++;
186            }
187          }
188          if (ch_val==60){
189            ReportError("Attribute values contain '<' characters after reference expansion", pos);
190            exit(-1);
191          }
192        }
193        else if(at_Ref_lt<ASCII>(s)){
194          ReportError("Attribute values contain '<' characters after reference expansion", pos);
195          exit(-1);
196        }
197        return 0;
198}
199
200
201
202@global
203
204static inline void s2p_do_block(BytePack U8[], Basis_bits & basis_bits) {
205  s2p(U8[0], U8[1], U8[2], U8[3], U8[4], U8[5], U8[6], U8[7],
206        basis_bits.bit_0, basis_bits.bit_1, basis_bits.bit_2, basis_bits.bit_3, basis_bits.bit_4, basis_bits.bit_5, basis_bits.bit_6, basis_bits.bit_7);
207}
208
209static inline void s2p_do_final_block(BytePack U8[], Basis_bits & basis_bits, BitBlock EOF_mask) {
210  s2p_do_block(U8, basis_bits);
211  basis_bits.bit_0 = simd_and(basis_bits.bit_0, EOF_mask);
212  basis_bits.bit_1 = simd_and(basis_bits.bit_1, EOF_mask);
213  basis_bits.bit_2 = simd_and(basis_bits.bit_2, EOF_mask);
214  basis_bits.bit_3 = simd_and(basis_bits.bit_3, EOF_mask);
215  basis_bits.bit_4 = simd_and(basis_bits.bit_4, EOF_mask);
216  basis_bits.bit_5 = simd_and(basis_bits.bit_5, EOF_mask);
217  basis_bits.bit_6 = simd_and(basis_bits.bit_6, EOF_mask);
218  basis_bits.bit_7 = simd_and(basis_bits.bit_7, EOF_mask);
219}
220
221static inline void s2p_do_segment(BytePack U8[], Basis_bits basis_bits[]) {
222  for (int i = 0; i < SEGMENT_BLOCKS; i++)
223           s2p_do_block(&U8[8*i], basis_bits[i]);
224}
225
226static inline void postprocess_do_block(Lex & lex, CtCDPI_Callouts & ctCDPI_Callouts, Ref_Callouts & ref_Callouts, Check_streams & check_streams, int chars_avail){
227            tracker.StoreNewlines(lex.LF);
228
229                if (bitblock_has_bit(simd_or(check_streams.non_ascii_name_starts, check_streams.non_ascii_names))) {
230                  StreamScan((ScanBlock *) &check_streams.non_ascii_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), NameStrt_check);
231                  StreamScan((ScanBlock *) &check_streams.non_ascii_names, sizeof(BitBlock)/sizeof(ScanBlock), Name_check);
232                }
233
234                if (bitblock_has_bit(ctCDPI_Callouts.PI_name_starts)){
235                  StreamScan((ScanBlock *) &ctCDPI_Callouts.PI_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), PIName_check);
236                }
237
238                if (bitblock_has_bit(ctCDPI_Callouts.CD_starts)){
239                  StreamScan((ScanBlock *) &ctCDPI_Callouts.CD_starts, sizeof(BitBlock)/sizeof(ScanBlock), CD_check);
240                }
241
242                if (bitblock_has_bit(ref_Callouts.GenRef_starts)){
243                  StreamScan((ScanBlock *) &ref_Callouts.GenRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), GenRef_check);
244                }
245
246                if (bitblock_has_bit(ref_Callouts.DecRef_starts)){
247                  StreamScan((ScanBlock *) &ref_Callouts.DecRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), DecRef_check);
248                }
249
250                if (bitblock_has_bit(ref_Callouts.HexRef_starts)){
251                  StreamScan((ScanBlock *) &ref_Callouts.HexRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), HexRef_check);
252                }
253
254                if (bitblock_has_bit(check_streams.att_refs)){
255                  StreamScan((ScanBlock *) &check_streams.att_refs, sizeof(BitBlock)/sizeof(ScanBlock), AttRef_check);
256                }
257
258                if (bitblock_has_bit(check_streams.error_mask)) {
259                  int errpos = count_forward_zeroes(check_streams.error_mask);
260                  ReportError("error found", errpos);
261              exit(-1);
262                }
263
264                matcher.store_streams(check_streams.tag_marks, check_streams.name_follows, check_streams.misc_mask, chars_avail);
265                tracker.AdvanceBlock();
266}
267
268static inline void postprocess_do_segment(Lex lex[], CtCDPI_Callouts ctCDPI_Callouts[], Ref_Callouts ref_Callouts[], Check_streams check_streams[]){
269
270  for (int i = 0; i < SEGMENT_BLOCKS; i++){
271           postprocess_do_block(lex[i], ctCDPI_Callouts[i], ref_Callouts[i], check_streams[i], SEGMENT_SIZE);
272           block_base += BLOCK_SIZE;
273  }
274
275        matcher.StreamScan(SEGMENT_SIZE);
276        matcher.Advance_segment();
277        tracker.Advance_buffer();
278}
279
280struct Para_data{
281
282          char srcbuf[BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2];
283
284          struct U8 u8[SEGMENT_BLOCKS];
285
286          struct Lex lex[SEGMENT_BLOCKS];
287
288          struct Scope1 scope1[SEGMENT_BLOCKS];
289
290          struct CtCDPI_Callouts ctCDPI_Callouts[SEGMENT_BLOCKS];
291
292          struct Ref_Callouts ref_Callouts[SEGMENT_BLOCKS];
293
294          struct Tag_Callouts tag_Callouts[SEGMENT_BLOCKS];
295
296          struct Basis_bits basis_bits[SEGMENT_BLOCKS];
297
298          struct Check_streams check_streams[SEGMENT_BLOCKS];
299
300          struct Xml_names xml_names[SEGMENT_BLOCKS];
301};
302
303  Validate_utf8 validate_utf8;
304  Parse_refs parse_refs;
305  Parse_tags parse_tags;
306  Parse_CtCDPI parse_CtCDPI;
307  Do_check_streams do_check_streams;
308  Classify_bytes classify_bytes;
309  Add_scope_streams add_scope_streams;
310  Validate_xml_names validate_xml_names;
311
312
313int file_segs=0;
314FILE *infile;
315FILE *outfile;
316
317template<typename T>
318class spsc_queue
319{
320public:
321  spsc_queue()
322  {
323          head1_ =  head2_ = head3_ = head4_ = 1;
324      tail_ = 0;
325  }
326
327  ~spsc_queue()
328  {
329
330  }
331
332  bool pass1()
333  {
334          while(head1_==tail_)
335          __sync_synchronize();
336
337#ifdef ANALYZE
338PERF_SEC_START(T1_work_timer);
339#endif
340          char * srcptr = n[head1_].srcbuf + OVERLAP_BUFSIZE;
341          memmove(n[head1_].srcbuf, overlap_buffer, 2*overlap_bytes);
342          int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
343          memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
344          overlap_bytes = OVERLAP_BUFSIZE;
345          s2p_do_segment((BytePack *) srcptr, n[head1_].basis_bits);
346          classify_bytes.do_segment(n[head1_].basis_bits, n[head1_].lex);
347
348#ifdef ANALYZE
349PERF_SEC_END(T1_work_timer, SEGMENT_SIZE);
350#endif
351          head1_ = (head1_+1)%8;
352
353  }
354
355  bool pass2()
356  {
357          while(head2_==head1_)
358          __sync_synchronize();
359
360#ifdef ANALYZE
361PERF_SEC_START(T2_work_timer);
362#endif
363          validate_utf8.do_segment(n[head2_].basis_bits, n[head2_].u8);
364          add_scope_streams.do_segment(n[head2_].lex, n[head2_].scope1);
365          parse_CtCDPI.do_segment(n[head2_].ctCDPI_Callouts, n[head2_].lex, n[head2_].scope1, n[head2_].check_streams);
366          parse_refs.do_segment(n[head2_].lex, n[head2_].scope1, n[head2_].ctCDPI_Callouts, n[head2_].ref_Callouts);
367
368#ifdef ANALYZE
369PERF_SEC_END(T2_work_timer, SEGMENT_SIZE);
370#endif
371          head2_ = (head2_+1)%8;
372
373  }
374
375  bool pass3()
376  {
377          while(head3_==head2_)
378          __sync_synchronize();
379
380#ifdef ANALYZE
381PERF_SEC_START(T3_work_timer);
382#endif
383         parse_tags.do_segment(n[head3_].lex, n[head3_].scope1, n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts);
384         validate_xml_names.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].ref_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].xml_names, n[head3_].check_streams);
385                  do_check_streams.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].scope1, n[head3_].ref_Callouts, n[head3_].xml_names, n[head3_].check_streams);
386          head3_ = (head3_+1)%8;
387
388#ifdef ANALYZE
389PERF_SEC_END(T3_work_timer, SEGMENT_SIZE);
390#endif
391  }
392
393  bool pass4()
394  {
395          while(head4_==head3_)
396          __sync_synchronize();
397
398#ifdef ANALYZE
399PERF_SEC_START(T4_work_timer);
400#endif
401          source = n[head4_].srcbuf+OVERLAP_BUFSIZE;
402          matcher.setSrc(source);
403          postprocess_do_segment(n[head4_].lex, n[head4_].ctCDPI_Callouts, n[head4_].ref_Callouts, n[head4_].check_streams);
404          block_base = 0;
405
406#ifdef ANALYZE
407PERF_SEC_END(T4_work_timer, SEGMENT_SIZE);
408#endif
409          head4_ = (head4_+1)%8;
410          tail_ = (tail_+1)%8;
411
412  }
413
414  int overlap_bytes;
415
416  char overlap_buffer[2*OVERLAP_BUFSIZE];
417
418
419private:
420
421  int tail_; // tail of the queue
422  int head1_; // head of the queue
423  int head2_;
424  int head3_;
425  int head4_;
426  T  n[8];
427
428
429  spsc_queue(spsc_queue const&);
430  spsc_queue& operator = (spsc_queue const&);
431};
432
433spsc_queue<struct Para_data> q;
434
435void *stageOne(void *threadid)
436{
437   long tid = (long)threadid;
438   for(int i=0;i<file_segs;i++){
439#ifdef ANALYZE
440PERF_SEC_START(T1_total_timer);
441#endif
442           q.pass1();
443//         printf("thread %ld: %i\n",tid,i);
444#ifdef ANALYZE
445PERF_SEC_END(T1_total_timer, SEGMENT_SIZE);
446#endif
447   }
448   pthread_exit(NULL);
449}
450void *stageTwo(void *threadid)
451{
452   long tid = (long)threadid;
453   for(int i=0;i<file_segs;i++){
454#ifdef ANALYZE
455PERF_SEC_START(T2_total_timer);
456#endif
457           q.pass2();
458//         printf("thread %ld: %i\n",tid,i);
459#ifdef ANALYZE
460PERF_SEC_END(T2_total_timer, SEGMENT_SIZE);
461#endif
462   }
463   pthread_exit(NULL);
464}
465
466void *stageThree(void *threadid)
467{
468   long tid = (long)threadid;
469   for(int i=0;i<file_segs;i++){
470#ifdef ANALYZE
471PERF_SEC_START(T3_total_timer);
472#endif
473           q.pass3();
474//         printf("thread %ld: %i\n",tid,i);
475#ifdef ANALYZE
476PERF_SEC_END(T3_total_timer, SEGMENT_SIZE);
477#endif
478   }
479   pthread_exit(NULL);
480}
481void *stageFour(void *threadid)
482{
483   long tid = (long)threadid;
484   for(int i=0;i<file_segs;i++){
485#ifdef ANALYZE
486PERF_SEC_START(T4_total_timer);
487#endif
488           q.pass4();
489//         printf("thread %ld: %i\n",tid,i);
490#ifdef ANALYZE
491PERF_SEC_END(T4_total_timer, SEGMENT_SIZE);
492#endif
493   }
494   pthread_exit(NULL);
495}
496
497#define NUM_THREADS 4
498
499int
500main(int argc, char * argv[]) {
501        char * infilename, * outfilename;
502        FILE *tmpinfile, *tmpoutfile;
503        struct stat fileinfo;
504
505    pthread_t threads[NUM_THREADS];
506    int rc;
507    long t=0;
508    void *status;
509
510        if (argc < 2) {
511                printf("Usage: %s <filename> [<outputfile>]\n", argv[0]);
512                exit(-1);
513        }
514
515        infilename = argv[1];
516        stat(infilename, &fileinfo);
517        file_segs = fileinfo.st_size/SEGMENT_SIZE;
518        infile = fopen(infilename, "rb");
519        if (!infile) {
520                fprintf(stderr, "Error: cannot open %s for input.\n", infilename);
521                exit(-1);
522        }
523
524        if (argc < 3) outfile = stdout;
525        else {
526                outfilename = argv[2];
527                outfile = fopen(outfilename, "wb");
528                if (!outfile) {
529                        fprintf(stderr, "Error: cannot open %s for writing.\n", outfilename);
530                        exit(-1);
531                }
532        }
533
534        PERF_SEC_INIT(parser_timer);
535#ifdef ANALYZE
536        PERF_SEC_INIT(T1_total_timer);
537        PERF_SEC_INIT(T2_total_timer);
538        PERF_SEC_INIT(T3_total_timer);
539        PERF_SEC_INIT(T4_total_timer);
540        PERF_SEC_INIT(T1_work_timer);
541        PERF_SEC_INIT(T2_work_timer);
542        PERF_SEC_INIT(T3_work_timer);
543        PERF_SEC_INIT(T4_work_timer);
544#endif
545
546        PERF_SEC_START(parser_timer);
547
548
549          rc = pthread_create(&threads[t], NULL, stageOne, (void *)t);
550          if (rc){
551                  printf("ERROR; return code from pthread_create() is %d\n", rc);
552                  exit(-1);
553          }
554          t++;
555          rc = pthread_create(&threads[t], NULL, stageTwo, (void *)t);
556          if (rc){
557                  printf("ERROR; return code from pthread_create() is %d\n", rc);
558                  exit(-1);
559          }
560
561          t++;
562          rc = pthread_create(&threads[t], NULL, stageThree, (void *)t);
563          if (rc){
564                  printf("ERROR; return code from pthread_create() is %d\n", rc);
565                  exit(-1);
566          }
567
568          t++;
569          rc = pthread_create(&threads[t], NULL, stageFour, (void *)t);
570          if (rc){
571                  printf("ERROR; return code from pthread_create() is %d\n", rc);
572                  exit(-1);
573          }
574
575          for(t=0; t<NUM_THREADS; t++) {
576             rc = pthread_join(threads[t], &status);
577             if (rc) {
578                printf("ERROR; return code from pthread_join() is %d\n", rc);
579                exit(-1);
580             }
581//           printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
582          }
583
584
585        PERF_SEC_END(parser_timer, fileinfo.st_size/SEGMENT_SIZE*SEGMENT_SIZE);
586
587        PERF_SEC_DUMP(parser_timer);
588#ifdef ANALYZE
589printf("\nT1_total_timer\n");
590        PERF_SEC_DUMP(T1_total_timer);
591printf("\nT2_total_timer\n");
592        PERF_SEC_DUMP(T2_total_timer);
593printf("\nT3_total_timer\n");
594        PERF_SEC_DUMP(T3_total_timer);
595printf("\nT4_total_timer\n");
596        PERF_SEC_DUMP(T4_total_timer);
597printf("\nT1_work_timer\n");
598        PERF_SEC_DUMP(T1_work_timer);
599printf("\nT2_work_timer\n");
600        PERF_SEC_DUMP(T2_work_timer);
601printf("\nT3_work_timer\n");
602        PERF_SEC_DUMP(T3_work_timer);
603printf("\nT4_work_timer\n");
604        PERF_SEC_DUMP(T4_work_timer);
605#endif
606
607        PERF_SEC_DESTROY(parser_timer);
608#ifdef ANALYZE
609        PERF_SEC_DESTROY(T1_total_timer);
610        PERF_SEC_DESTROY(T2_total_timer);
611        PERF_SEC_DESTROY(T3_total_timer);
612        PERF_SEC_DESTROY(T4_total_timer);
613        PERF_SEC_DESTROY(T1_work_timer);
614        PERF_SEC_DESTROY(T2_work_timer);
615        PERF_SEC_DESTROY(T3_work_timer);
616        PERF_SEC_DESTROY(T4_work_timer);
617#endif
618//      printf("count = %i\n", q.count);
619
620        pthread_exit(NULL);
621
622        fclose(infile);
623        fclose(outfile);
624        return(0);
625}
Note: See TracBrowser for help on using the repository browser.