source: proto/parabix2/pablo_template_multithreads.cpp @ 2022

Last change on this file since 2022 was 1906, checked in by cameron, 8 years ago

Clear out cfzl/cfbl.

File size: 16.4 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <errno.h>
4#include <sys/types.h>
5#include <sys/stat.h>
6#include <pthread.h>
7#include "../lib/lib_simd.h"
8#define BLOCK_SIZE 128
9#define SEGMENT_BLOCKS 128
10#define SEGMENT_SIZE (BLOCK_SIZE * SEGMENT_BLOCKS)
11#define OVERLAP_BUFSIZE 16
12
13#include "../lib/builtins.hpp"
14#include "../lib/carryQ.h"
15#include "xmldecl.h"
16#include "xml_error.c"
17#include "xmldecl.c"
18#include "namechars.h"
19
20#include "../lib/perflib/perfsec.h"
21#include "../lib/s2p.h"
22
23#include "TagMatcher_multithreads.h"
24#include "LineColTracker_multithreads.h"
25BitBlock EOF_mask = simd_const_1(1);
26
27#ifdef BUFFER_PROFILING
28        BOM_Table * parser_timer;
29#ifdef ANALYZE
30        BOM_Table * T1_total_timer;
31        BOM_Table * T2_total_timer;
32        BOM_Table * T3_total_timer;
33        BOM_Table * T4_total_timer;
34        BOM_Table * T1_work_timer;
35        BOM_Table * T2_work_timer;
36        BOM_Table * T3_work_timer;
37        BOM_Table * T4_work_timer;     
38#endif
39#endif
40LineColTracker tracker;
41TagMatcher matcher;
42
43int block_base=0;
44int buffer_base=0;
45char * source;
46
47static inline int StreamScan(ScanBlock * stream, int blk_count, int ProcessPos(int)) {
48        int blk;
49        int block_pos = 0;
50
51        for (blk = 0; blk < blk_count; blk++) {
52                scanword_t s = stream[blk];
53                while(s) {
54                        int code = (ProcessPos(scan_forward_zeroes(s) + block_pos));
55                        if (code) return code;
56                        s = s & (s-1);  // clear rightmost bit.
57                }
58                block_pos += 8 * sizeof(ScanBlock);
59        }
60        return 0;
61}
62
63static inline void ReportError(const char * error_msg, int error_pos_in_block) {
64  int error_line, error_column;
65  tracker.get_Line_and_Column(error_pos_in_block, error_line, error_column);
66  fprintf(stderr, "%s at line %i, column %i\n", error_msg, error_line, error_column);
67}
68
69
70static inline int NameStrt_check(int pos) {
71        int block_pos = block_base + pos;
72        if(XML_10_UTF8_NameStrt_bytes((unsigned char*)&source[block_pos]) == 0){
73              ReportError("name start error", pos);
74              exit(-1);
75        }
76        return 0;
77}
78
79static inline int Name_check(int pos) {
80        int block_pos = block_base + pos;
81        if(XML_10_UTF8_NameChar_bytes((unsigned char*)&source[block_pos]) == 0){
82              ReportError("name error", pos);
83              exit(-1);
84        }
85        return 0;
86}
87
88static inline int PIName_check(int pos) {
89        int block_pos = block_base + pos;
90        int file_pos = block_pos+buffer_base;
91        if (at_XxMmLll<ASCII>((unsigned char*)&source[block_pos]) && (source[block_pos+3]=='?' || source[block_pos+3]<= ' ')) {
92              // "<?xml" legal at start of file.
93              if ((file_pos == 2) && at_XmlDecl_start<ASCII>((unsigned char*)&source[0])) return 0;
94              ReportError("[Xx][Mm][Ll] illegal as PI name", pos);
95              exit(-1);
96        }
97        return 0;
98}
99
100static inline int CD_check(int pos) {
101        int block_pos = block_base + pos;
102        if (!at_CDATA1<ASCII>((unsigned char*)&source[block_pos])){
103              ReportError("CDATA error", pos);
104              exit(-1);
105        }
106        return 0;
107}
108
109static inline int GenRef_check(int pos) {
110        int block_pos = block_base + pos;
111        unsigned char* s = (unsigned char*)&source[block_pos];
112        if (!(at_Ref_gt<ASCII>(s)||at_Ref_lt<ASCII>(s)||at_Ref_amp<ASCII>(s)||at_Ref_quot<ASCII>(s)||at_Ref_apos<ASCII>(s))){
113              ReportError("Undefined reference", pos);
114              exit(-1);
115        }
116        return 0;
117}
118
119static inline int HexRef_check(int pos) {
120        int block_pos = block_base + pos;
121        unsigned char* s = (unsigned char*)&source[block_pos];
122        int ch_val = 0;
123        while(at_HexDigit<ASCII>(s)){
124          ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
125          if (ch_val> 0x10FFFF ){
126            ReportError("Illegal character reference", pos);
127            exit(-1);
128          }
129          s++;
130        }
131        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
132          ReportError("Illegal character reference", pos);
133          exit(-1);
134        }
135        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
136          ReportError("Illegal XML 1.0 character reference", pos);
137          exit(-1);
138        }
139        return 0;
140}
141
142static inline int DecRef_check(int pos) {
143        int block_pos = block_base + pos;
144        unsigned char* s = (unsigned char*)&source[block_pos];
145        int ch_val = 0;
146        while(at_HexDigit<ASCII>(s)){
147          ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
148          if (ch_val> 0x10FFFF ){
149            ReportError("Illegal character reference", pos);
150            exit(-1);
151          }
152          s++;
153        }
154        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
155          ReportError("Illegal character reference", pos);
156          exit(-1);
157        }
158        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
159          ReportError("Illegal XML 1.0 character reference", pos);
160          exit(-1);
161        }
162        return 0;
163}
164
165static inline int AttRef_check(int pos) {
166        int block_pos = block_base + pos;
167        unsigned char* s = (unsigned char*)&source[block_pos];
168        int ch_val = 0;
169        if(s[0]=='#'){
170          s++;
171          if(s[0]=='x' || s[0]=='X'){
172            s++;
173            while(at_HexDigit<ASCII>(s)){
174              ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
175              s++;
176            }
177          }
178          else{
179            while(at_HexDigit<ASCII>(s)){
180              ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
181              s++;
182            }
183          }
184          if (ch_val==60){
185            ReportError("Attribute values contain '<' characters after reference expansion", pos);
186            exit(-1);
187          }
188        }
189        else if(at_Ref_lt<ASCII>(s)){
190          ReportError("Attribute values contain '<' characters after reference expansion", pos);
191          exit(-1);
192        }
193        return 0;
194}
195
196
197
198@global
199
200static inline void s2p_do_block(BytePack U8[], Basis_bits & basis_bits) {
201  s2p(U8[0], U8[1], U8[2], U8[3], U8[4], U8[5], U8[6], U8[7],
202        basis_bits.bit_0, basis_bits.bit_1, basis_bits.bit_2, basis_bits.bit_3, basis_bits.bit_4, basis_bits.bit_5, basis_bits.bit_6, basis_bits.bit_7);
203}
204
205static inline void s2p_do_final_block(BytePack U8[], Basis_bits & basis_bits, BitBlock EOF_mask) {
206  s2p_do_block(U8, basis_bits);
207  basis_bits.bit_0 = simd_and(basis_bits.bit_0, EOF_mask);
208  basis_bits.bit_1 = simd_and(basis_bits.bit_1, EOF_mask);
209  basis_bits.bit_2 = simd_and(basis_bits.bit_2, EOF_mask);
210  basis_bits.bit_3 = simd_and(basis_bits.bit_3, EOF_mask);
211  basis_bits.bit_4 = simd_and(basis_bits.bit_4, EOF_mask);
212  basis_bits.bit_5 = simd_and(basis_bits.bit_5, EOF_mask);
213  basis_bits.bit_6 = simd_and(basis_bits.bit_6, EOF_mask);
214  basis_bits.bit_7 = simd_and(basis_bits.bit_7, EOF_mask);
215}
216
217static inline void s2p_do_segment(BytePack U8[], Basis_bits basis_bits[]) {
218  for (int i = 0; i < SEGMENT_BLOCKS; i++)
219           s2p_do_block(&U8[8*i], basis_bits[i]);
220}
221
222static inline void postprocess_do_block(Lex & lex, CtCDPI_Callouts & ctCDPI_Callouts, Ref_Callouts & ref_Callouts, Check_streams & check_streams, int chars_avail){
223            tracker.StoreNewlines(lex.LF);
224
225                if (bitblock_has_bit(simd_or(check_streams.non_ascii_name_starts, check_streams.non_ascii_names))) {
226                  StreamScan((ScanBlock *) &check_streams.non_ascii_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), NameStrt_check);
227                  StreamScan((ScanBlock *) &check_streams.non_ascii_names, sizeof(BitBlock)/sizeof(ScanBlock), Name_check);
228                }
229
230                if (bitblock_has_bit(ctCDPI_Callouts.PI_name_starts)){
231                  StreamScan((ScanBlock *) &ctCDPI_Callouts.PI_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), PIName_check);
232                }
233
234                if (bitblock_has_bit(ctCDPI_Callouts.CD_starts)){
235                  StreamScan((ScanBlock *) &ctCDPI_Callouts.CD_starts, sizeof(BitBlock)/sizeof(ScanBlock), CD_check);
236                }
237
238                if (bitblock_has_bit(ref_Callouts.GenRef_starts)){
239                  StreamScan((ScanBlock *) &ref_Callouts.GenRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), GenRef_check);
240                }
241
242                if (bitblock_has_bit(ref_Callouts.DecRef_starts)){
243                  StreamScan((ScanBlock *) &ref_Callouts.DecRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), DecRef_check);
244                }
245
246                if (bitblock_has_bit(ref_Callouts.HexRef_starts)){
247                  StreamScan((ScanBlock *) &ref_Callouts.HexRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), HexRef_check);
248                }
249
250                if (bitblock_has_bit(check_streams.att_refs)){
251                  StreamScan((ScanBlock *) &check_streams.att_refs, sizeof(BitBlock)/sizeof(ScanBlock), AttRef_check);
252                }
253
254                if (bitblock_has_bit(check_streams.error_mask)) {
255                  int errpos = count_forward_zeroes(check_streams.error_mask);
256                  ReportError("error found", errpos);
257              exit(-1);
258                }
259
260                matcher.store_streams(check_streams.tag_marks, check_streams.name_follows, check_streams.misc_mask, chars_avail);
261                tracker.AdvanceBlock();
262}
263
264static inline void postprocess_do_segment(Lex lex[], CtCDPI_Callouts ctCDPI_Callouts[], Ref_Callouts ref_Callouts[], Check_streams check_streams[]){
265
266  for (int i = 0; i < SEGMENT_BLOCKS; i++){
267           postprocess_do_block(lex[i], ctCDPI_Callouts[i], ref_Callouts[i], check_streams[i], SEGMENT_SIZE);
268           block_base += BLOCK_SIZE;
269  }
270
271        matcher.StreamScan(SEGMENT_SIZE);
272        matcher.Advance_segment();
273        tracker.Advance_buffer();
274}
275
276struct Para_data{
277
278          char srcbuf[BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2];
279
280          struct U8 u8[SEGMENT_BLOCKS];
281
282          struct Lex lex[SEGMENT_BLOCKS];
283
284          struct Scope1 scope1[SEGMENT_BLOCKS];
285
286          struct CtCDPI_Callouts ctCDPI_Callouts[SEGMENT_BLOCKS];
287
288          struct Ref_Callouts ref_Callouts[SEGMENT_BLOCKS];
289
290          struct Tag_Callouts tag_Callouts[SEGMENT_BLOCKS];
291
292          struct Basis_bits basis_bits[SEGMENT_BLOCKS];
293
294          struct Check_streams check_streams[SEGMENT_BLOCKS];
295
296          struct Xml_names xml_names[SEGMENT_BLOCKS];
297};
298
299  Validate_utf8 validate_utf8;
300  Parse_refs parse_refs;
301  Parse_tags parse_tags;
302  Parse_CtCDPI parse_CtCDPI;
303  Do_check_streams do_check_streams;
304  Classify_bytes classify_bytes;
305  Add_scope_streams add_scope_streams;
306  Validate_xml_names validate_xml_names;
307
308
309int file_segs=0;
310FILE *infile;
311FILE *outfile;
312
313template<typename T>
314class spsc_queue
315{
316public:
317  spsc_queue()
318  {
319          head1_ =  head2_ = head3_ = head4_ = 1;
320      tail_ = 0;
321  }
322
323  ~spsc_queue()
324  {
325
326  }
327
328  bool pass1()
329  {
330          while(head1_==tail_)
331          __sync_synchronize();
332
333#ifdef ANALYZE
334PERF_SEC_START(T1_work_timer);
335#endif
336          char * srcptr = n[head1_].srcbuf + OVERLAP_BUFSIZE;
337          memmove(n[head1_].srcbuf, overlap_buffer, 2*overlap_bytes);
338          int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
339          memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
340          overlap_bytes = OVERLAP_BUFSIZE;
341          s2p_do_segment((BytePack *) srcptr, n[head1_].basis_bits);
342          classify_bytes.do_segment(n[head1_].basis_bits, n[head1_].lex);
343
344#ifdef ANALYZE
345PERF_SEC_END(T1_work_timer, SEGMENT_SIZE);
346#endif
347          head1_ = (head1_+1)%8;
348
349  }
350
351  bool pass2()
352  {
353          while(head2_==head1_)
354          __sync_synchronize();
355
356#ifdef ANALYZE
357PERF_SEC_START(T2_work_timer);
358#endif
359          validate_utf8.do_segment(n[head2_].basis_bits, n[head2_].u8);
360          add_scope_streams.do_segment(n[head2_].lex, n[head2_].scope1);
361          parse_CtCDPI.do_segment(n[head2_].ctCDPI_Callouts, n[head2_].lex, n[head2_].scope1, n[head2_].check_streams);
362          parse_refs.do_segment(n[head2_].lex, n[head2_].scope1, n[head2_].ctCDPI_Callouts, n[head2_].ref_Callouts);
363
364#ifdef ANALYZE
365PERF_SEC_END(T2_work_timer, SEGMENT_SIZE);
366#endif
367          head2_ = (head2_+1)%8;
368
369  }
370
371  bool pass3()
372  {
373          while(head3_==head2_)
374          __sync_synchronize();
375
376#ifdef ANALYZE
377PERF_SEC_START(T3_work_timer);
378#endif
379         parse_tags.do_segment(n[head3_].lex, n[head3_].scope1, n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts);
380         validate_xml_names.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].ref_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].xml_names, n[head3_].check_streams);
381                  do_check_streams.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].scope1, n[head3_].ref_Callouts, n[head3_].xml_names, n[head3_].check_streams);
382          head3_ = (head3_+1)%8;
383
384#ifdef ANALYZE
385PERF_SEC_END(T3_work_timer, SEGMENT_SIZE);
386#endif
387  }
388
389  bool pass4()
390  {
391          while(head4_==head3_)
392          __sync_synchronize();
393
394#ifdef ANALYZE
395PERF_SEC_START(T4_work_timer);
396#endif
397          source = n[head4_].srcbuf+OVERLAP_BUFSIZE;
398          matcher.setSrc(source);
399          postprocess_do_segment(n[head4_].lex, n[head4_].ctCDPI_Callouts, n[head4_].ref_Callouts, n[head4_].check_streams);
400          block_base = 0;
401
402#ifdef ANALYZE
403PERF_SEC_END(T4_work_timer, SEGMENT_SIZE);
404#endif
405          head4_ = (head4_+1)%8;
406          tail_ = (tail_+1)%8;
407
408  }
409
410  int overlap_bytes;
411
412  char overlap_buffer[2*OVERLAP_BUFSIZE];
413
414
415private:
416
417  int tail_; // tail of the queue
418  int head1_; // head of the queue
419  int head2_;
420  int head3_;
421  int head4_;
422  T  n[8];
423
424
425  spsc_queue(spsc_queue const&);
426  spsc_queue& operator = (spsc_queue const&);
427};
428
429spsc_queue<struct Para_data> q;
430
431void *stageOne(void *threadid)
432{
433   long tid = (long)threadid;
434   for(int i=0;i<file_segs;i++){
435#ifdef ANALYZE
436PERF_SEC_START(T1_total_timer);
437#endif
438           q.pass1();
439//         printf("thread %ld: %i\n",tid,i);
440#ifdef ANALYZE
441PERF_SEC_END(T1_total_timer, SEGMENT_SIZE);
442#endif
443   }
444   pthread_exit(NULL);
445}
446void *stageTwo(void *threadid)
447{
448   long tid = (long)threadid;
449   for(int i=0;i<file_segs;i++){
450#ifdef ANALYZE
451PERF_SEC_START(T2_total_timer);
452#endif
453           q.pass2();
454//         printf("thread %ld: %i\n",tid,i);
455#ifdef ANALYZE
456PERF_SEC_END(T2_total_timer, SEGMENT_SIZE);
457#endif
458   }
459   pthread_exit(NULL);
460}
461
462void *stageThree(void *threadid)
463{
464   long tid = (long)threadid;
465   for(int i=0;i<file_segs;i++){
466#ifdef ANALYZE
467PERF_SEC_START(T3_total_timer);
468#endif
469           q.pass3();
470//         printf("thread %ld: %i\n",tid,i);
471#ifdef ANALYZE
472PERF_SEC_END(T3_total_timer, SEGMENT_SIZE);
473#endif
474   }
475   pthread_exit(NULL);
476}
477void *stageFour(void *threadid)
478{
479   long tid = (long)threadid;
480   for(int i=0;i<file_segs;i++){
481#ifdef ANALYZE
482PERF_SEC_START(T4_total_timer);
483#endif
484           q.pass4();
485//         printf("thread %ld: %i\n",tid,i);
486#ifdef ANALYZE
487PERF_SEC_END(T4_total_timer, SEGMENT_SIZE);
488#endif
489   }
490   pthread_exit(NULL);
491}
492
493#define NUM_THREADS 4
494
495int
496main(int argc, char * argv[]) {
497        char * infilename, * outfilename;
498        FILE *tmpinfile, *tmpoutfile;
499        struct stat fileinfo;
500
501    pthread_t threads[NUM_THREADS];
502    int rc;
503    long t=0;
504    void *status;
505
506        if (argc < 2) {
507                printf("Usage: %s <filename> [<outputfile>]\n", argv[0]);
508                exit(-1);
509        }
510
511        infilename = argv[1];
512        stat(infilename, &fileinfo);
513        file_segs = fileinfo.st_size/SEGMENT_SIZE;
514        infile = fopen(infilename, "rb");
515        if (!infile) {
516                fprintf(stderr, "Error: cannot open %s for input.\n", infilename);
517                exit(-1);
518        }
519
520        if (argc < 3) outfile = stdout;
521        else {
522                outfilename = argv[2];
523                outfile = fopen(outfilename, "wb");
524                if (!outfile) {
525                        fprintf(stderr, "Error: cannot open %s for writing.\n", outfilename);
526                        exit(-1);
527                }
528        }
529
530        PERF_SEC_INIT(parser_timer);
531#ifdef ANALYZE
532        PERF_SEC_INIT(T1_total_timer);
533        PERF_SEC_INIT(T2_total_timer);
534        PERF_SEC_INIT(T3_total_timer);
535        PERF_SEC_INIT(T4_total_timer);
536        PERF_SEC_INIT(T1_work_timer);
537        PERF_SEC_INIT(T2_work_timer);
538        PERF_SEC_INIT(T3_work_timer);
539        PERF_SEC_INIT(T4_work_timer);
540#endif
541
542        PERF_SEC_START(parser_timer);
543
544
545          rc = pthread_create(&threads[t], NULL, stageOne, (void *)t);
546          if (rc){
547                  printf("ERROR; return code from pthread_create() is %d\n", rc);
548                  exit(-1);
549          }
550          t++;
551          rc = pthread_create(&threads[t], NULL, stageTwo, (void *)t);
552          if (rc){
553                  printf("ERROR; return code from pthread_create() is %d\n", rc);
554                  exit(-1);
555          }
556
557          t++;
558          rc = pthread_create(&threads[t], NULL, stageThree, (void *)t);
559          if (rc){
560                  printf("ERROR; return code from pthread_create() is %d\n", rc);
561                  exit(-1);
562          }
563
564          t++;
565          rc = pthread_create(&threads[t], NULL, stageFour, (void *)t);
566          if (rc){
567                  printf("ERROR; return code from pthread_create() is %d\n", rc);
568                  exit(-1);
569          }
570
571          for(t=0; t<NUM_THREADS; t++) {
572             rc = pthread_join(threads[t], &status);
573             if (rc) {
574                printf("ERROR; return code from pthread_join() is %d\n", rc);
575                exit(-1);
576             }
577//           printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
578          }
579
580
581        PERF_SEC_END(parser_timer, fileinfo.st_size/SEGMENT_SIZE*SEGMENT_SIZE);
582
583        PERF_SEC_DUMP(parser_timer);
584#ifdef ANALYZE
585printf("\nT1_total_timer\n");
586        PERF_SEC_DUMP(T1_total_timer);
587printf("\nT2_total_timer\n");
588        PERF_SEC_DUMP(T2_total_timer);
589printf("\nT3_total_timer\n");
590        PERF_SEC_DUMP(T3_total_timer);
591printf("\nT4_total_timer\n");
592        PERF_SEC_DUMP(T4_total_timer);
593printf("\nT1_work_timer\n");
594        PERF_SEC_DUMP(T1_work_timer);
595printf("\nT2_work_timer\n");
596        PERF_SEC_DUMP(T2_work_timer);
597printf("\nT3_work_timer\n");
598        PERF_SEC_DUMP(T3_work_timer);
599printf("\nT4_work_timer\n");
600        PERF_SEC_DUMP(T4_work_timer);
601#endif
602
603        PERF_SEC_DESTROY(parser_timer);
604#ifdef ANALYZE
605        PERF_SEC_DESTROY(T1_total_timer);
606        PERF_SEC_DESTROY(T2_total_timer);
607        PERF_SEC_DESTROY(T3_total_timer);
608        PERF_SEC_DESTROY(T4_total_timer);
609        PERF_SEC_DESTROY(T1_work_timer);
610        PERF_SEC_DESTROY(T2_work_timer);
611        PERF_SEC_DESTROY(T3_work_timer);
612        PERF_SEC_DESTROY(T4_work_timer);
613#endif
614//      printf("count = %i\n", q.count);
615
616        pthread_exit(NULL);
617
618        fclose(infile);
619        fclose(outfile);
620        return(0);
621}
Note: See TracBrowser for help on using the repository browser.