source: proto/parabix2/pablo_template_multithreads.cpp @ 1055

Last change on this file since 1055 was 1055, checked in by lindanl, 8 years ago

multithreads

File size: 17.1 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <errno.h>
4#include <sys/types.h>
5#include <sys/stat.h>
6#include <pthread.h>
7#include "../lib/lib_simd.h"
8#define BLOCK_SIZE 128
9#define SEGMENT_BLOCKS 128
10#define SEGMENT_SIZE (BLOCK_SIZE * SEGMENT_BLOCKS)
11#define OVERLAP_BUFSIZE 16
12
13typedef uint64_t ScanBlock;
14typedef SIMD_type BytePack;
15typedef SIMD_type BitBlock;
16
17#include "../lib/carryQ.h"
18#include "xmldecl.h"
19#include "xml_error.c"
20#include "xmldecl.c"
21#include "namechars.h"
22
23#include "../lib/perflib/perfsec.h"
24#include "../lib/s2p.h"
25
26#include "TagMatcher_multithreads.h"
27#include "LineColTracker.h"
28BitBlock EOF_mask = simd_const_1(1);
29
30#ifdef BUFFER_PROFILING
31        BOM_Table * parser_timer;
32#endif
33LineColTracker tracker;
34TagMatcher matcher;
35
36int block_base=0;
37int buffer_base=0;
38char * source;
39
40static inline int StreamScan(ScanBlock * stream, int blk_count, int ProcessPos(int)) {
41        int blk;
42        int block_pos = 0;
43
44        for (blk = 0; blk < blk_count; blk++) {
45                ScanBlock s = stream[blk];
46                while(s) {
47                        int code = (ProcessPos(cfzl(s) + block_pos));
48                        if (code) return code;
49                        s = s & (s-1);  // clear rightmost bit.
50                }
51                block_pos += 8 * sizeof(ScanBlock);
52        }
53        return 0;
54}
55
56
57static inline void ReportError(const char * error_msg, int error_pos_in_block) {
58  int error_line, error_column;
59  tracker.get_Line_and_Column(error_pos_in_block, error_line, error_column);
60  fprintf(stderr, "%s at line %i, column %i\n", error_msg, error_line, error_column);
61}
62
63
64static inline int NameStrt_check(int pos) {
65        int block_pos = block_base + pos;
66        if(XML_10_UTF8_NameStrt_bytes((unsigned char*)&source[block_pos]) == 0){
67              ReportError("name start error", pos);
68              exit(-1);
69        }
70        return 0;
71}
72
73static inline int Name_check(int pos) {
74        int block_pos = block_base + pos;
75        if(XML_10_UTF8_NameChar_bytes((unsigned char*)&source[block_pos]) == 0){
76              ReportError("name error", pos);
77              exit(-1);
78        }
79        return 0;
80}
81
82static inline int PIName_check(int pos) {
83        int block_pos = block_base + pos;
84        int file_pos = block_pos+buffer_base;
85        if (at_XxMmLll<ASCII>((unsigned char*)&source[block_pos]) && (source[block_pos+3]=='?' || source[block_pos+3]<= ' ')) {
86              // "<?xml" legal at start of file.
87              if ((file_pos == 2) && at_XmlDecl_start<ASCII>((unsigned char*)&source[0])) return 0;
88              ReportError("[Xx][Mm][Ll] illegal as PI name", pos);
89              exit(-1);
90        }
91        return 0;
92}
93
94static inline int CD_check(int pos) {
95        int block_pos = block_base + pos;
96        if (!at_CDATA1<ASCII>((unsigned char*)&source[block_pos])){
97              ReportError("CDATA error", pos);
98              exit(-1);
99        }
100        return 0;
101}
102
103static inline int GenRef_check(int pos) {
104        int block_pos = block_base + pos;
105        unsigned char* s = (unsigned char*)&source[block_pos];
106        if (!(at_Ref_gt<ASCII>(s)||at_Ref_lt<ASCII>(s)||at_Ref_amp<ASCII>(s)||at_Ref_quot<ASCII>(s)||at_Ref_apos<ASCII>(s))){
107              ReportError("Undefined reference", pos);
108              exit(-1);
109        }
110        return 0;
111}
112
113static inline int HexRef_check(int pos) {
114        int block_pos = block_base + pos;
115        unsigned char* s = (unsigned char*)&source[block_pos];
116        int ch_val = 0;
117        while(at_HexDigit<ASCII>(s)){
118          ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
119          if (ch_val> 0x10FFFF ){
120            ReportError("Illegal character reference", pos);
121            exit(-1);
122          }
123          s++;
124        }
125        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
126          ReportError("Illegal character reference", pos);
127          exit(-1);
128        }
129        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
130          ReportError("Illegal XML 1.0 character reference", pos);
131          exit(-1);
132        }
133        return 0;
134}
135
136static inline int DecRef_check(int pos) {
137        int block_pos = block_base + pos;
138        unsigned char* s = (unsigned char*)&source[block_pos];
139        int ch_val = 0;
140        while(at_HexDigit<ASCII>(s)){
141          ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
142          if (ch_val> 0x10FFFF ){
143            ReportError("Illegal character reference", pos);
144            exit(-1);
145          }
146          s++;
147        }
148        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
149          ReportError("Illegal character reference", pos);
150          exit(-1);
151        }
152        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
153          ReportError("Illegal XML 1.0 character reference", pos);
154          exit(-1);
155        }
156        return 0;
157}
158
159static inline int AttRef_check(int pos) {
160        int block_pos = block_base + pos;
161        unsigned char* s = (unsigned char*)&source[block_pos];
162        int ch_val = 0;
163        if(s[0]=='#'){
164          s++;
165          if(s[0]=='x' || s[0]=='X'){
166            s++;
167            while(at_HexDigit<ASCII>(s)){
168              ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
169              s++;
170            }
171          }
172          else{
173            while(at_HexDigit<ASCII>(s)){
174              ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
175              s++;
176            }
177          }
178          if (ch_val==60){
179            ReportError("Attribute values contain '<' characters after reference expansion", pos);
180            exit(-1);
181          }
182        }
183        else if(at_Ref_lt<ASCII>(s)){
184          ReportError("Attribute values contain '<' characters after reference expansion", pos);
185          exit(-1);
186        }
187        return 0;
188}
189
190
191
192@global
193
194static inline void s2p_do_block(BytePack U8[], Basis_bits & basis_bits) {
195  s2p(U8[0], U8[1], U8[2], U8[3], U8[4], U8[5], U8[6], U8[7],
196        basis_bits.bit_0, basis_bits.bit_1, basis_bits.bit_2, basis_bits.bit_3, basis_bits.bit_4, basis_bits.bit_5, basis_bits.bit_6, basis_bits.bit_7);
197}
198
199static inline void s2p_do_final_block(BytePack U8[], Basis_bits & basis_bits, BitBlock EOF_mask) {
200  s2p_do_block(U8, basis_bits);
201  basis_bits.bit_0 = simd_and(basis_bits.bit_0, EOF_mask);
202  basis_bits.bit_1 = simd_and(basis_bits.bit_1, EOF_mask);
203  basis_bits.bit_2 = simd_and(basis_bits.bit_2, EOF_mask);
204  basis_bits.bit_3 = simd_and(basis_bits.bit_3, EOF_mask);
205  basis_bits.bit_4 = simd_and(basis_bits.bit_4, EOF_mask);
206  basis_bits.bit_5 = simd_and(basis_bits.bit_5, EOF_mask);
207  basis_bits.bit_6 = simd_and(basis_bits.bit_6, EOF_mask);
208  basis_bits.bit_7 = simd_and(basis_bits.bit_7, EOF_mask);
209}
210
211static inline void s2p_do_segment(BytePack U8[], Basis_bits basis_bits[]) {
212  for (int i = 0; i < SEGMENT_BLOCKS; i++)
213           s2p_do_block(&U8[8*i], basis_bits[i]);
214}
215
216static inline void postprocess_do_block(Lex & lex, CtCDPI_Callouts & ctCDPI_Callouts, Ref_Callouts & ref_Callouts, Check_streams & check_streams, int chars_avail){
217            tracker.StoreNewlines(lex.LF);
218
219                if (bitblock_has_bit(simd_or(check_streams.non_ascii_name_starts, check_streams.non_ascii_names))) {
220                  StreamScan((ScanBlock *) &check_streams.non_ascii_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), NameStrt_check);
221                  StreamScan((ScanBlock *) &check_streams.non_ascii_names, sizeof(BitBlock)/sizeof(ScanBlock), Name_check);
222                }
223
224                if (bitblock_has_bit(ctCDPI_Callouts.PI_name_starts)){
225                  StreamScan((ScanBlock *) &ctCDPI_Callouts.PI_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), PIName_check);
226                }
227
228                if (bitblock_has_bit(ctCDPI_Callouts.CD_starts)){
229                  StreamScan((ScanBlock *) &ctCDPI_Callouts.CD_starts, sizeof(BitBlock)/sizeof(ScanBlock), CD_check);
230                }
231
232                if (bitblock_has_bit(ref_Callouts.GenRef_starts)){
233                  StreamScan((ScanBlock *) &ref_Callouts.GenRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), GenRef_check);
234                }
235
236                if (bitblock_has_bit(ref_Callouts.DecRef_starts)){
237                  StreamScan((ScanBlock *) &ref_Callouts.DecRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), DecRef_check);
238                }
239
240                if (bitblock_has_bit(ref_Callouts.HexRef_starts)){
241                  StreamScan((ScanBlock *) &ref_Callouts.HexRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), HexRef_check);
242                }
243
244                if (bitblock_has_bit(check_streams.att_refs)){
245                  StreamScan((ScanBlock *) &check_streams.att_refs, sizeof(BitBlock)/sizeof(ScanBlock), AttRef_check);
246                }
247
248                if (bitblock_has_bit(check_streams.error_mask)) {
249                  int errpos = count_forward_zeroes(check_streams.error_mask);
250                  ReportError("error found", errpos);
251              exit(-1);
252                }
253
254                matcher.store_streams(check_streams.tag_marks, check_streams.name_follows, check_streams.misc_mask, chars_avail);
255                tracker.AdvanceBlock();
256}
257
258static inline void postprocess_do_segment(Lex lex[], CtCDPI_Callouts ctCDPI_Callouts[], Ref_Callouts ref_Callouts[], Check_streams check_streams[]){
259
260  for (int i = 0; i < SEGMENT_BLOCKS; i++){
261           postprocess_do_block(lex[i], ctCDPI_Callouts[i], ref_Callouts[i], check_streams[i], SEGMENT_SIZE);
262           block_base += BLOCK_SIZE;
263  }
264
265        matcher.StreamScan(SEGMENT_SIZE);
266        matcher.Advance_segment();
267        tracker.Advance_buffer();
268}
269
270struct Para_data{
271
272          char srcbuf[BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2];
273
274          struct U8 u8[SEGMENT_BLOCKS];
275
276          struct Lex lex[SEGMENT_BLOCKS];
277
278          struct Scope1 scope1[SEGMENT_BLOCKS];
279
280          struct CtCDPI_Callouts ctCDPI_Callouts[SEGMENT_BLOCKS];
281
282          struct Ref_Callouts ref_Callouts[SEGMENT_BLOCKS];
283
284          struct Tag_Callouts tag_Callouts[SEGMENT_BLOCKS];
285
286          struct Basis_bits basis_bits[SEGMENT_BLOCKS];
287
288          struct Check_streams check_streams[SEGMENT_BLOCKS];
289
290          struct Xml_names xml_names[SEGMENT_BLOCKS];
291};
292
293  Validate_utf8 validate_utf8;
294  Parse_refs parse_refs;
295  Parse_tags parse_tags;
296  Parse_CtCDPI parse_CtCDPI;
297  Do_check_streams do_check_streams;
298  Classify_bytes classify_bytes;
299  Add_scope_streams add_scope_streams;
300  Validate_xml_names validate_xml_names;
301
302
303int file_segs=0;
304FILE *infile;
305FILE *outfile;
306// load with 'consume' (data-dependent) memory ordering
307template<typename T>
308T load_consume(T const* addr)
309{
310  // hardware fence is implicit on x86
311  T v = *const_cast<T const volatile*>(addr);
312#ifdef GCC
313 __sync_synchronize(); // compiler fence
314#endif
315#ifdef ICC
316 __memory_barrier(); // compiler fence
317#endif
318  return v;
319}
320
321// store with 'release' memory ordering
322template<typename T>
323void store_release(T* addr, T v)
324{
325  // hardware fence is implicit on x86
326#ifdef GCC
327 __sync_synchronize(); // compiler fence
328#endif
329#ifdef ICC
330 __memory_barrier(); // compiler fence
331#endif
332  *const_cast<T volatile*>(addr) = v;
333}
334
335// cache line size on modern x86 processors (in bytes)
336size_t const cache_line_size = 64;
337
338// single-producer/single-consumer queue
339template<typename T>
340class spsc_queue
341{
342public:
343  spsc_queue()
344  {
345      node* n = new node;
346      n->next_ = 0;
347      overlap_bytes = 0;
348      tail_ = head1_ = head2_ = head3_ = first_= tail_copy_ = n;
349      count=0;
350  }
351
352  ~spsc_queue()
353  {
354      node* n = first_;
355      do
356      {
357          node* next = n->next_;
358          delete n;
359          n = next;
360      }
361      while (n);
362  }
363
364  void pass1()
365  {
366      node* n = alloc_node();
367      n->next_ = 0;
368      //do pass1
369      char * srcptr = n->para_data.srcbuf + OVERLAP_BUFSIZE;
370      memmove(n->para_data.srcbuf, overlap_buffer, 2*overlap_bytes);
371      int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
372      memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
373      overlap_bytes = OVERLAP_BUFSIZE;
374      s2p_do_segment((BytePack *) srcptr, n->para_data.basis_bits);
375      classify_bytes.do_segment(n->para_data.basis_bits, n->para_data.lex);
376
377
378      store_release(&head1_->next_, n);
379      head1_ = n;
380  }
381  bool pass2()
382  {
383          if(load_consume(&head2_->next_))
384          {
385              //do pass2
386
387                  validate_utf8.do_segment(head2_->next_->para_data.basis_bits, head2_->next_->para_data.u8);
388                  add_scope_streams.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1);
389                  parse_CtCDPI.do_segment(head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.check_streams);
390                  parse_refs.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.ref_Callouts);
391
392                  store_release(&head2_, head2_->next_);
393                  return true;
394          }
395      else
396      {
397          return false;
398      }
399  }
400
401  bool pass3()
402  {
403      if ((head3_!= head2_) && load_consume(&head3_->next_))
404      {
405          //do pass3
406          parse_tags.do_segment(head3_->next_->para_data.lex, head3_->next_->para_data.scope1, head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts);
407                  validate_xml_names.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
408                  do_check_streams.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.scope1, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
409
410          store_release(&head3_, head3_->next_);
411          return true;
412      }
413      else
414      {
415          return false;
416      }
417  }
418
419  bool pass4()
420  {
421      if ((tail_!= head3_)  && load_consume(&tail_->next_))
422      {
423          //do pass4
424          source = tail_->next_->para_data.srcbuf+OVERLAP_BUFSIZE;
425          matcher.setSrc(source);
426          postprocess_do_segment(tail_->next_->para_data.lex, tail_->next_->para_data.ctCDPI_Callouts, tail_->next_->para_data.ref_Callouts, tail_->next_->para_data.check_streams);
427          block_base = 0;
428          store_release(&tail_, tail_->next_);
429          return true;
430      }
431      else
432      {
433          return false;
434      }
435  }
436
437  int count;
438private:
439  // internal node structure
440  struct node
441  {
442      node* next_;
443      T para_data;
444  };
445
446  // consumer part
447  // accessed mainly by consumer, infrequently be producer
448  node* tail_; // tail of the queue
449
450  // delimiter between consumer part and producer part,
451  // so that they situated on different cache lines
452  char cache_line_pad_ [cache_line_size];
453
454  int overlap_bytes;
455
456  char overlap_buffer[2*OVERLAP_BUFSIZE];
457
458  // producer part
459  // accessed only by producer
460  node* head1_; // head of the queue
461  node* head2_;
462  node* head3_;
463  node* first_; // last unused node (tail of node cache)
464  node* tail_copy_; // helper (points somewhere between first_ and tail_)
465
466
467  node* alloc_node()
468  {
469      // first tries to allocate node from internal node cache,
470      // if attempt fails, allocates node via ::operator new()
471while(1){
472      if (first_ != tail_copy_)
473      {
474          node* n = first_;
475          first_ = first_->next_;
476          return n;
477      }
478      tail_copy_ = load_consume(&tail_);
479      if (first_ != tail_copy_)
480      {
481          node* n = first_;
482          first_ = first_->next_;
483          return n;
484      }
485      node* n = new node;
486      if(count>8)
487        continue;
488      else
489        count++;
490      return n;
491}
492  }
493
494  spsc_queue(spsc_queue const&);
495  spsc_queue& operator = (spsc_queue const&);
496};
497
498
499spsc_queue<struct Para_data> q;
500
501void *stageOne(void *threadid)
502{
503   long tid = (long)threadid;
504   for(int i=0;i<file_segs;i++){
505           q.pass1();
506//         printf("thread %ld: %i\n",tid,i);
507   }
508   pthread_exit(NULL);
509}
510void *stageTwo(void *threadid)
511{
512   long tid = (long)threadid;
513   for(int i=0;i<file_segs;i++){
514           while(!q.pass2());
515//         printf("thread %ld: %i\n",tid,i);
516   }
517   pthread_exit(NULL);
518}
519
520void *stageThree(void *threadid)
521{
522   long tid = (long)threadid;
523   for(int i=0;i<file_segs;i++){
524           while(!q.pass3());
525//         printf("thread %ld: %i\n",tid,i);
526   }
527   pthread_exit(NULL);
528}
529void *stageFour(void *threadid)
530{
531   long tid = (long)threadid;
532   for(int i=0;i<file_segs;i++){
533           while(!q.pass4());
534//         printf("thread %ld: %i\n",tid,i);
535   }
536   pthread_exit(NULL);
537}
538
539#define NUM_THREADS 4
540
541int
542main(int argc, char * argv[]) {
543        char * infilename, * outfilename;
544        FILE *tmpinfile, *tmpoutfile;
545        struct stat fileinfo;
546
547    pthread_t threads[NUM_THREADS];
548    int rc;
549    long t=0;
550    void *status;
551
552        if (argc < 2) {
553                printf("Usage: %s <filename> [<outputfile>]\n", argv[0]);
554                exit(-1);
555        }
556
557        infilename = argv[1];
558        stat(infilename, &fileinfo);
559        file_segs = fileinfo.st_size/SEGMENT_SIZE;
560        infile = fopen(infilename, "rb");
561        if (!infile) {
562                fprintf(stderr, "Error: cannot open %s for input.\n", infilename);
563                exit(-1);
564        }
565
566        if (argc < 3) outfile = stdout;
567        else {
568                outfilename = argv[2];
569                outfile = fopen(outfilename, "wb");
570                if (!outfile) {
571                        fprintf(stderr, "Error: cannot open %s for writing.\n", outfilename);
572                        exit(-1);
573                }
574        }
575
576        PERF_SEC_INIT(parser_timer);
577
578        PERF_SEC_START(parser_timer);
579
580
581          rc = pthread_create(&threads[t], NULL, stageOne, (void *)t);
582          if (rc){
583                  printf("ERROR; return code from pthread_create() is %d\n", rc);
584                  exit(-1);
585          }
586          t++;
587          rc = pthread_create(&threads[t], NULL, stageTwo, (void *)t);
588          if (rc){
589                  printf("ERROR; return code from pthread_create() is %d\n", rc);
590                  exit(-1);
591          }
592
593          t++;
594          rc = pthread_create(&threads[t], NULL, stageThree, (void *)t);
595          if (rc){
596                  printf("ERROR; return code from pthread_create() is %d\n", rc);
597                  exit(-1);
598          }
599
600          t++;
601          rc = pthread_create(&threads[t], NULL, stageFour, (void *)t);
602          if (rc){
603                  printf("ERROR; return code from pthread_create() is %d\n", rc);
604                  exit(-1);
605          }
606
607          for(t=0; t<NUM_THREADS; t++) {
608             rc = pthread_join(threads[t], &status);
609             if (rc) {
610                printf("ERROR; return code from pthread_join() is %d\n", rc);
611                exit(-1);
612             }
613//           printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
614          }
615
616
617        PERF_SEC_END(parser_timer, fileinfo.st_size/SEGMENT_SIZE*SEGMENT_SIZE);
618
619        PERF_SEC_DUMP(parser_timer);
620
621        PERF_SEC_DESTROY(parser_timer);
622
623//      printf("count = %i\n", q.count);
624
625        pthread_exit(NULL);
626
627        fclose(infile);
628        fclose(outfile);
629        return(0);
630}
Note: See TracBrowser for help on using the repository browser.