Best unofficial Apache Server developers community
Username
Forgot password?
Sign in with Twitter account
Sign in with Facebook account
List archives

questions about the order of the map and reduce and the shuffle error

Created: (MAPREDUCE-2035) Enable -Wall and fix warnings in task-controller build
(3 lines)
Created: (MAPREDUCE-2036) Enable Erasure Code in Tool similar to Hadoop Archive
(3 lines)
Aug 27, 2010
Xu Cheng
Xu Cheng
hello guys:
   I'm doing some experiences on my 3 node virtual machine cluster, one
for
namenode and jobtracker while the other tow for datanode and
tasktracker.with a 0.21.0 hadoop
   and when  I 'm running a job ,I got such message


10/08/27 17:28:58 INFO mapreduce.Job:  map 0% reduce 0%
10/08/27 17:29:10 INFO mapreduce.Job:  map 50% reduce 0%
10/08/27 17:29:12 INFO mapreduce.Job:  map 83% reduce 0%
10/08/27 17:29:19 INFO mapreduce.Job:  map 83% reduce 16%
10/08/27 17:29:24 INFO mapreduce.Job:  map 100% reduce 16%


the reduce runs while the map task hasn't finished!! ( I read from the
books
that reduce task runs exactlly after the maps finish!!)

is there something wrong with the cluster or my knowledge?

by the way ,the reduce job stuck while it is in the progress 16.63%, I
found
that people on the internet also got this problem but I haven't found the
solution.

however, after some time , after the system report the error message , the
job began to run again! like this


10/08/27 17:29:58 INFO mapreduce.Job:  map 100% reduce 16%
10/08/27 17:30:01 INFO mapreduce.Job:  map 50% reduce 16%
10/08/27 17:30:07 INFO mapreduce.Job:  map 83% reduce 16%
10/08/27 17:30:19 INFO mapreduce.Job:  map 100% reduce 16%
10/08/27 17:30:25 INFO mapreduce.Job:  map 100% reduce 66%
10/08/27 17:30:31 INFO mapreduce.Job:  map 100% reduce 100%


and it runs pretty well!  does someone know about this?

belows are the message on the console, If the logs are needed ,let me
know.thanks

any suggestions and references are appreciated
best regards
xu








10/08/27 17:28:56 INFO security.Groups: Group mapping
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
10/08/27 17:28:56 WARN conf.Configuration: mapred.task.id is deprecated.
Instead, use mapreduce.task.attempt.id
10/08/27 17:28:56 WARN mapreduce.JobSubmitter: Use GenericOptionsParser
for
parsing the arguments. Applications should implement Tool for the same.
10/08/27 17:28:56 INFO input.FileInputFormat: Total input paths to process
:
1
10/08/27 17:28:56 WARN conf.Configuration: mapred.map.tasks is deprecated.
Instead, use mapreduce.job.maps
10/08/27 17:28:56 INFO mapreduce.JobSubmitter: number of splits:2
10/08/27 17:28:57 INFO mapreduce.JobSubmitter: adding the following
namenodes' delegation tokens:null
10/08/27 17:28:57 INFO mapreduce.Job: Running job: job_201008271725_0001
10/08/27 17:28:58 INFO mapreduce.Job:  map 0% reduce 0%
10/08/27 17:29:10 INFO mapreduce.Job:  map 50% reduce 0%
10/08/27 17:29:12 INFO mapreduce.Job:  map 83% reduce 0%
10/08/27 17:29:19 INFO mapreduce.Job:  map 83% reduce 16%
10/08/27 17:29:24 INFO mapreduce.Job:  map 100% reduce 16%
10/08/27 17:29:48 INFO mapreduce.Job: Task Id :
attempt_201008271725_0001_r_000000_0, Status : FAILED
org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in
shuffle in fetcher#1
 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:124)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:362)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
 at org.apache.hadoop.mapred.Child.main(Child.java:211)
Caused by: java.io.IOException: Exceeded MAX_FAILED_UNIQUE_FETCHES;
bailing-out.
 at
org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler.checkReducerHealth(ShuffleScheduler.java:253)
 at
org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler.copyFailed(ShuffleScheduler.java:187)
 at
org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:234)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:149)
10/08/27 17:29:48 WARN mapreduce.Job: Error reading task outputConnection
refused
10/08/27 17:29:48 WARN mapreduce.Job: Error reading task outputConnection
refused
10/08/27 17:29:49 INFO mapreduce.Job:  map 100% reduce 0%
10/08/27 17:29:57 INFO mapreduce.Job: Task Id :
attempt_201008271725_0001_m_000000_0, Status : FAILED
Too many fetch-failures
10/08/27 17:29:57 WARN mapreduce.Job: Error reading task outputConnection
refused
10/08/27 17:29:57 WARN mapreduce.Job: Error reading task outputConnection
refused
10/08/27 17:29:58 INFO mapreduce.Job:  map 100% reduce 16%
10/08/27 17:30:01 INFO mapreduce.Job:  map 50% reduce 16%
10/08/27 17:30:07 INFO mapreduce.Job:  map 83% reduce 16%
10/08/27 17:30:19 INFO mapreduce.Job:  map 100% reduce 16%
10/08/27 17:30:25 INFO mapreduce.Job:  map 100% reduce 66%
10/08/27 17:30:31 INFO mapreduce.Job:  map 100% reduce 100%
10/08/27 17:30:33 INFO mapreduce.Job: Job complete: job_201008271725_0001
10/08/27 17:30:33 INFO mapreduce.Job: Counters: 33
 FileInputFormatCounters
  BYTES_READ=76420532
 FileSystemCounters
  FILE_BYTES_READ=155979268
  FILE_BYTES_WRITTEN=239598906
  HDFS_BYTES_READ=76424828
  HDFS_BYTES_WRITTEN=78386951
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=2
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 Job Counters
  Data-local map tasks=3
  Total time spent by all maps waiting after reserving slots (ms)=0
  Total time spent by all reduces waiting after reserving slots (ms)=0
  SLOTS_MILLIS_MAPS=47747
  SLOTS_MILLIS_REDUCES=77241
  Launched map tasks=3
  Launched reduce tasks=2
 Map-Reduce Framework
  Combine input records=999998
  Combine output records=994179
  Failed Shuffles=1
  GC time elapsed (ms)=667
  Map input records=499999
  Map output bytes=80759850
  Map output records=999998
  Merged Map outputs=2
  Reduce input groups=993814
  Reduce input records=994179
  Reduce output records=993814
  Reduce shuffle bytes=83049802
  Shuffled Maps =2
  Spilled Records=2861309
  SPLIT_RAW_BYTES=200


Reply
Tags: messagejobrunning
Messages in this thread
questions about the order of the map and reduce and the shuffle error
Similar Threads
Getting reduce overflow error
I received 'reduce_overflow_error' when querying views, after entering
largish (2-4kb) docs to couch. The problem was due to a specific view, and
when I set 'reduce_limit = false' it went away.

Specifically, each of my documents is related to a logical entity in my
app,
and the purpose of this view is to aggregate information per entity - it
maps the documents by the entities to which they are related, and reduces
the results (with group=true) so that each entity ends up with its own
document.

The reduce mainly does counts and sums, but it may also accept quite a bit
of text (~4kb) from one of the documents it reduces. I make sure that
these
blobs of text don't pile up during reduce and rereduce - not more than one
will be kept at each stage.

Is 4kb too big to keep when reducing? or maybe the problem is that the
product of the reduce can be equal to or larger than any of the docs it
reduces?

Thanks,
 a.


Created: (HIVE-1519) Insertion should throw an error when partition order is different than create t
Insertion should throw an error when partition order is different than
create table

UUIDs whose alphanumeric order is the same as their chronological order
I want to use UUIDs whose alphanumeric order is the same as their
chronological order. So I'm generating Version 4 UUIDs (
http://en.wikipedia.org/wiki/Universa...on_4_.28random.29)
as follows:

public class Id
{
   static Random random = new Random();

   public static String next()
   {
      // Format: xxxxxxxx-xxxx-4xxx-8xxx-xxxxxxxxxxxx

      long high = (System.currentTimeMillis() << 16) | 0x4000 |
random.nextInt(4096);
      long low = (random.nextLong() >>> 4) | 0x8000000000000000L;

      UUID uuid = new UUID(high, low);

      return uuid.toString();
   }
}

Is there anything wrong with this idea?


Re: Two questions
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 08/17/10 20:28, fee### @gdls.com wrote:
 With appolgies for the "new guy" questions...
 
 I can't figure out how to search the dev message archives.  Is there
a 
 mechanism?

You can use e.g. the markmail archives to search it:
http://directory.markmail.org/search/...che.directory.dev

e.g. search for kerberos:
http://directory.markmail.org/search/...che.directory.dev+kerberos

HTH
Felix
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2.0.16 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAkxq5McACgkQ2lZVCB08qHHhBgCfbixK+C08z11IXvpFptu02Ah1
rbIAnjPNJGPYoXq8icMdSv4a9hZPU/2E
=IBo0
-----END PGP SIGNATURE-----


PBC API questions
hi,

I'm new to Riak and learning it rapidly. Other than Riak, I also tried
MongoDB, CouchDB, and Voldemort (and also Cassandra ) briefly. Riak is
quite
suitable for my needs. Thanks for releasing it as an open source project.

I got a number of questions but let me ask those about the PBC API first
only. Just some backgrounds: I am implementing my own Java ProtoBuf
client.
I do it for several reasons: Kresten claims his Java PB client performs
~10X
better than the Java REST client, PB is preferred to HTTP/REST
"officially"
for Erlang development, and I want to take a different approach to use PB
by
using Java NIO/Netty instead of making direct socket connection.

My questions are as follows:

   1. For put and delete operations, the 'w' and 'dw' parameters are
defined
   as integer, but in the documentation, it says "possible values include
   'default', 'one', 'quorum', 'all', or any integer <= N". so what
value
   represents 'default', 'quorum', and 'all'? does 0 means default? how
about
   -1?

   2. REST API allows "If-None-Match, If-Match, If-Modified-Since, and
   If-Unmodified-Since" checking. PBC API does not have equivalent, right?
I
   read that for Erlang, PB is the preferred client. But it doesn't
provide all
   features that REST API provide?

   It seems to me the REST and BPC APIs are not very consistent atm. for
   example, REST API doesn't provide a way to list buckets (Issue 78);
other
   than those "If-None-Match .." criterias, the document also mentioned a
   number of missing features in BPC API and suggested us to use REST API
for
   those cases.

   3. I want to clarify the expected response for a "Store a new or
existing
   object with a key" cases
   - From the doc, "Riak can allow the last update to automatically "win"
or
      Riak can return both versions of the object to the client.", I
suppose it
      refer to the "allow_multi" bucket parameter. I watched the
Wriaki video and
      roughly know how allow_multi=true works in keeping versions.
      - for a bucket with allow_multi=false, when an update is executed:
         - if vclock is not provided, Riak won't know I try to update an
         existing object so any update shall always be successful.

         - if supplied vclock doesn't match the the vlock value of the
         current object, would it return error(RpbErrorResp)? or just
"allow the last
         update to automatically win"? or would it return the sibling like
         allow_multi=true?

         The following sequence is for illustrating the scenario of the
         above question:
         1) client A (or whoever) put a new k1,v1, with vclock c1
         2) client A and B both read the entry with c1
         3) client A updates the entry to k1,v2, supplied c1, the vclock
is
         updated to c2
         4) client B tries to update k1, supplied v3 and c1. Because the
         vclock is updated to c2 in step 3, i expect it to return an
error, or at
         least there should an easy way for to know about conflict.

         4. I wonder what interface will be added to the PBC API in your
   roadmap. there is essential no administrative interface (except set
bucket
   properties) or any SPI. Take Voldemort as an example, they provides an
   AdminClient interface (
  
http://project-voldemort.com/javadoc/.../AdminClient.html)
   and they also have API to get the node list for a key.
   (DefaultStoreClient.getResponsibleNodes() ) Riak doesn't provide these
kind
   of interface.In Erlang console, I could get the virtual node list in
the
   ring with riak_core_ring_manager:get_my_ring(), could this kind of
   information be exposed as an admin client interface?

   There are two related issues from my search:
   https://issues.basho.com/show_bug.cgi?id=271
   https://issues.basho.com/show_bug.cgi?id=421

   For Issue 421, does HTTP APIs refers to the the admin interface that I
   mentioned above?

   Riak provides a map reduce framework/feature, but I don't really prefer
   to write complex mapreduce job in javascript or to learn Erlang. If
there
   are SPI for resolving the physical node for keys, I may use an external
map
   reduce framework like GridGain (or Hadoop) and run map-reduce process
the
   process data in Riak with local data affinity.

Please don't get me wrong as I like Riak very much so far. I am writing to
clarify things rather than ranting, ;-) as I'm new and probably may
understand the things incorrectly . thank you very much.

regards,
mingfai

Re: Two questions
On Tue, Aug 17, 2010 at 11:58 PM,  <feez### @gdls.com> wrote:

 With appolgies for the "new guy" questions...

 I can't figure out how to search the dev message archives.  Is there
a
 mechanism?

 I'm particularly interested in seeing full-featured Kerberos and in
looking
 through the code and the pending issues I'm curious as to the planned
 approach for the way the Kerberos service will use the LDAP.  I
haven't
the module named protocol-kerberos contains all the kerberos related
implementation
classes [1]

 found mention of it (thus the reference to searching the message
archives)
 but has there been any consideration of the draft LDAP Schema that
the
 Kerberos WG was developing a few years ago (which seems to have
stagnated)?
am not sure about this

Kiran Ayyagari

[1]
http://svn.apache.org/repos/asf/direc...protocol-kerberos


Some questions about using Cassandra
This is a multi-part message in MIME format.
We are currently looking at a distributed database option and so far
Cassandra ticks all the boxes. However, I still have some questions.

 

Is there any need for archiving of Cassandra and what backup options are
available? As it is a no-data-loss system I'm guessing archiving is not
exactly relevant.

 

Is there any concept of Listeners such that when data is added to
Cassandra we can fire off another process to do something with that
data? E.g. create a copy in a secondary database for Business
Intelligence reports? Send the data to an LDAP server?

 

 

Anthony Ikeda

Java Analyst/Programmer

Cardlink Services Limited

Level 4, 3 Rider Boulevard

Rhodes NSW 2138

 

Web: www.cardlink.com.au | Tel: + 61 2 9646 9221 | Fax: + 61 2 9646 9283

 

 


**********************************************************************
This e-mail message and any attachments are intended only for the use of
the addressee(s) named above and may contain information that is privileged
and confidential. If you are not the intended recipient, any display,
dissemination, distribution, or copying is strictly prohibited.   If you
believe you have received this e-mail message in error, please immediately
notify the sender by replying to this e-mail message or by telephone to
(02) 9646 9222. Please delete the email and any attachments and do not
retain the email or any attachments in any form.
**********************************************************************

Production questions
We are in the process of moving an extremely large data set to Mongo,
over 500M records.

Working with a test set of about 100M we noticed the following:

 * Slow updates
 * Slow queries

Inserts (initial) were pretty fast, about 10k / sec with no bulk
insert batching. Our M/R queries are taking forever, simple
find({marked:{$type:3}}).count() queries are on the order of minutes.
Updates are slow, between 500ms and 2000ms.

This is currently being run in a cloud, but we have physical boxes on
the way. Would it be best to run a virtual machine on each core and
then a mongod process within that? Or simply bind one or two mongod
processes to the cores instead?

What are some of the common pittfalls when sharding a database? What
are some common performance issues and what can be done to solve
them?

Thanks,

-Josh





Admin questions
How can I change a field's name?  from fieldname1 to fieldname2?

How can I drop a database that has an odd characters in its name?
Users accidentally create databases called  l $Mongo  and '$Mongo'.
I'd like to drop these.






Questions on HBase...
Can you guys help me with these questions?

1) Consistency guarantees for reads in Hbase:
        What happens when you issue a direct bulk incremental update
without using the API?
   Say, a new storefile is created in a region through the bulk tool.
Already existing scanners will not have an effect on the new updates. But
new scanners would. Is this correct?
    And what will happen to the block cache? Are they marked dirty after
the new upload?

2) Regionserver failures:
     I know that when a region server is running properly but is
unreachable for some time (a few minutes), then zk will change its state to
expired. And when the RS is reachable again, it will access the zk state,
know that it is viewed dead and will throw an exception. Can you guys let
me know if I am correct?
    What if a regionserver is unreachable for a longer time (say an hour)
and then is again reachable? Does it have the same effect as the previous
case?

Thank you
Vidhya


General Questions
Forgive me if I am spamming.  Is this the correct e-mail address to ask
general questions and discuss Buildr development problems?
My acknowledgment e-mail was a little vague when I joined the list.

--Ed